@spec start_link(
name: atom(),
- db_path: charlist() | String.t(),
topologies: list(),
replicas: integer(),
quorum: integer()
def start_link(
name: name,
- db_path: db_path,
topologies: topologies,
replicas: replicas,
quorum: quorum
) do
- Supervisor.start_link(__MODULE__, {name, db_path, topologies, replicas, quorum}, name: name)
+ Supervisor.start_link(__MODULE__, {name, topologies, replicas, quorum}, name: name)
end
- def init({name, db_path, topologies, replicas, quorum}) do
+ def init({name, topologies, replicas, quorum}) do
db = db_name(name)
ring = ring_name(name)
cluster_supervisor = cluster_supervisor_name(name)
ring_data = %Ring{name: ring, replicas: replicas, quorum: quorum, db: db}
children = [
- {DB, [name: db, db_path: db_path]},
+ {DB, [name: db]},
{Cluster.Supervisor, [topologies, [name: cluster_supervisor]]},
{Ring, [{:local, ring}, ring_data, []]}
]
defdelegate get(name, key), to: Ring
defdelegate put(name, key, value), to: Ring
- defdelegate prefix(name, prefix), to: Ring
+ defdelegate prefix(name, key, split_on, min), to: Ring
end
alias __MODULE__
- @db_options [
- {:create_if_missing, true},
- {:merge_operator, :erlang_merge_operator},
- {:capped_prefix_extractor, 255}
- ]
+ @db_options [:set, :private]
defstruct [:db]
@type t :: %__MODULE__{
- db: :rocksdb.db_handle()
+ db: :ets.tid() | atom()
}
- @spec get(name :: module(), key :: binary()) ::
- {:ok, term()} | :not_found | {:error, {:corruption, String.t()}} | {:error, any()}
+ @spec get(name :: module(), key :: binary()) :: [tuple()]
def get(name, key) do
GenServer.call(name, {:get, key})
end
- @spec get_prefix(name :: module(), prefix :: binary()) ::
- [{:ok, term()}] | :not_found | {:error, {:corruption, String.t()}} | {:error, any()}
- def get_prefix(name, prefix) do
- GenServer.call(name, {:get_prefix, prefix})
- end
-
@spec put(name :: module(), key :: binary(), value :: term()) :: :ok
def put(name, key, value) do
GenServer.cast(name, {:put, key, value})
end
- def start_link(name: name, db_path: db_path) do
- GenServer.start_link(__MODULE__, db_path, name: name)
- end
-
- def init(db_path) when is_binary(db_path) do
- init(to_charlist(db_path))
+ def start_link(name: name) do
+ GenServer.start_link(__MODULE__, name, name: name)
end
- def init(db_path) when is_list(db_path) do
- Logger.info("Opening database at #{db_path}")
- {:ok, db} = :rocksdb.open(db_path, @db_options)
+ def init(name) do
+ Logger.debug("Opening ETS Table")
+ db = :ets.new(Module.concat([name, ETSTable]), @db_options)
{:ok, %DB{db: db}}
end
def handle_call({:get, key}, _from, %DB{db: db} = state) do
{:reply,
- db
- |> :rocksdb.get(key, [])
- |> deserialize(), state}
- end
-
- def handle_call({:get_prefix, prefix}, _from, %DB{db: db} = state) do
- {:reply, seek_iterator(db, prefix), state}
+ case :ets.lookup(db, key) do
+ [] -> :not_found
+ [other] -> other
+ end, state}
end
def handle_cast({:put, key, value}, %DB{db: db} = state) do
- :rocksdb.put(db, key, serialize(value), [])
+ :ets.insert(db, {key, value})
{:noreply, state}
end
-
- def seek_iterator(db, prefix, acc \\ []) do
- {:ok, itr} = :rocksdb.iterator(db, [])
- seek_loop(:rocksdb.iterator_move(itr, {:seek, prefix}), prefix, itr, acc)
- end
-
- def seek_loop({:error, :iterator_closed}, _pre, _itr, acc), do: acc
- def seek_loop({:error, :invalid_iterator}, _pre, _itr, acc), do: acc
-
- def seek_loop({:ok, key, val}, prefix, itr, acc) do
- case String.starts_with?(key, prefix) do
- true ->
- seek_loop(
- :rocksdb.iterator_move(itr, :next),
- prefix,
- itr,
- [{key, :erlang.binary_to_term(val)} | acc]
- )
-
- false ->
- acc
- end
- end
-
- def deserialize({:ok, val}), do: {:ok, :erlang.binary_to_term(val)}
- def deserialize(other), do: other
-
- def serialize(val), do: :erlang.term_to_binary(val)
-
- def terminate(_, %{db: db}) do
- :rocksdb.close(db)
- end
end
:gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout)
end
- def prefix(name, prefix) do
- Logger.debug("PREFIX - #{prefix}")
- :gen_statem.call(ClusterKV.ring_name(name), {:prefix, prefix})
+ def prefix(name, key, split_on, min) do
+ Logger.debug("PREFIX - #{key} / #{inspect(split_on)} : #{min}")
+ :gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min})
end
def handle_resource(@handle_init, _, @init, %SL{data: %Ring{} = data} = sl) do
%SL{data: %Ring{name: n, node: node, requests: reqs, ring: r, replicas: repls} = data} =
sl
) do
- nodes = HashRing.key_to_nodes(r, key, repls)
-
- node =
- case node in nodes do
- true -> node
- false -> Enum.random(nodes)
- end
+ node = get_node(key, r, node, repls)
ref = make_ref()
send({n, node}, {:get_key, key, ref, node})
def handle_call({:get, _key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
def handle_call(
- {:prefix, prefix},
+ {:prefix, key, split_on, min},
from,
@ready,
- %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r} = data} = sl
+ %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl
) do
- nodes = HashRing.nodes(r)
ref = make_ref()
-
- Enum.each(nodes, fn node ->
- Logger.debug("Prefix request to #{inspect(node)}")
- send({n, node}, {:get_prefix, prefix, ref, me})
+ parts = String.split(key, split_on)
+ head = Enum.slice(parts, 0..(min - 1))
+ itr = Enum.slice(parts, min..-2)
+ get_prefix(head, key, r, me, repls, n, ref)
+
+ Enum.reduce(itr, head, fn ns, acc ->
+ next = acc ++ [ns]
+ get_prefix(next, key, r, me, repls, n, ref)
+ next
end)
- {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(nodes), []} | reqs]}}, []}
+ {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(itr) + 1, []} | reqs]}}, []}
end
def handle_call({:prefix, _prefix}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
{:ok, sl, []}
end
- def handle_info(
- {:get_prefix, prefix, ref, node},
- @ready,
- %SL{data: %Ring{name: n, db: db}} = sl
- ) do
- val = DB.get_prefix(db, prefix)
- send({n, node}, {:reply, val, ref})
- {:ok, sl, []}
- end
-
def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do
{requests, actions} = maybe_reply(ref, reqs, val)
{:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions}
[{:next_event, :internal, :node_down}]}
end
+ defp get_prefix(next, key, r, me, repls, n, ref) do
+ prefix = Enum.join(next, ".")
+ node = get_node(key, r, me, repls)
+ send({n, node}, {:get_key, prefix, ref, me})
+ end
+
+ defp get_node(key, r, node, repls) do
+ nodes = HashRing.key_to_nodes(r, key, repls)
+
+ case node in nodes do
+ true -> node
+ false -> Enum.random(nodes)
+ end
+ end
+
defp maybe_reply(ref, reqs, val) do
case get_request(ref, reqs) do
{_, from, 1, res} ->
defp get_final_value([], val), do: val
defp get_final_value(res, val) when is_list(res),
- do: {:ok, MapSet.new(List.flatten([val | res]))}
+ do:
+ {:ok,
+ [val | res]
+ |> Enum.filter(fn
+ :not_found -> false
+ _ -> true
+ end)}
defp get_request(ref, reqs) do
Enum.find(reqs, fn
[
{:libcluster, "~> 3.2"},
{:libring, "~> 1.4"},
- {:rocksdb, "~> 1.5"},
{:states_language, "~> 0.2"}
]
end
"libcluster": {:hex, :libcluster, "3.2.0", "ed483eea1b960b89a4cee374898644930b28759226716cd7aeaf707976c5966d", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "350e7c2cd5311b9dfb0156d5f5778561e5833f90fabdaac3cd79ea022c4dc3dc"},
"libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"},
- "rocksdb": {:hex, :rocksdb, "1.5.1", "016861fbfb1f76ec2c48f745fca1f6e140fc0f27adc8c389c4cabce4a79b5f2f", [:rebar3], [], "hexpm", "63db7dfd65082d61df92810f61bd934080c77bc57eb711b36bb56a8926bdf3b0"},
"states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"},
"telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
"xema": {:hex, :xema, "0.11.0", "7b5118418633cffc27092110d02d4faeea938149dd3f6c64299e41e747067e80", [:mix], [{:conv_case, "~> 0.2.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "51491c9a953d65069d4b30aa2f70bc45ff99fd1bc3345bc72ce4e644d01ea14e"},