alias __MODULE__
- @db_options [:set, :private]
+ @db_options [:set]
defstruct [:db]
GenServer.cast(name, {:put, key, value})
end
+ def stream(name) do
+ GenServer.call(name, :stream)
+ end
+
def start_link(name: name) do
GenServer.start_link(__MODULE__, name, name: name)
end
{:ok, %DB{db: db}}
end
+ def handle_call(:stream, _from, %DB{db: db} = state) do
+ {:reply, Stream.iterate(:ets.first(db), &:ets.next(db, &1)), state}
+ end
+
def handle_call({:get, key}, _from, %DB{db: db} = state) do
{:reply,
case :ets.lookup(db, key) do
:gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min})
end
+ def stream(name) do
+ :gen_statem.call(ClusterKV.ring_name(name), :stream)
+ end
+
def handle_resource(@handle_init, _, @init, %SL{data: %Ring{} = data} = sl) do
node = Node.self()
{:ok, sl, [{:next_event, :internal, :node_removed}]}
end
+ def handle_call(
+ :stream,
+ from,
+ @ready,
+ %SL{data: %Ring{name: n, node: me, db: db, ring: ring, replicas: repls}} = sl
+ ) do
+ s = sync_db(n, me, db, ring, repls)
+ {:ok, sl, [{:reply, from, s}]}
+ end
+
def handle_call(
{:get, key},
from,
{:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions}
end
- def handle_info({:nodeup, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do
+ def handle_info(
+ {:nodeup, node, info},
+ _,
+ %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl
+ ) do
Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
-
- {:ok, %SL{sl | data: %Ring{data | ring: HashRing.add_node(r, node)}},
- [{:next_event, :internal, :node_up}]}
+ ring = HashRing.add_node(r, node)
+ sync_db(n, me, db, ring, repls)
+ {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]}
end
- def handle_info({:nodedown, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do
+ def handle_info(
+ {:nodedown, node, info},
+ _,
+ %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl
+ ) do
Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
-
- {:ok, %SL{sl | data: %Ring{data | ring: HashRing.remove_node(r, node)}},
- [{:next_event, :internal, :node_down}]}
+ ring = HashRing.remove_node(r, node)
+ sync_db(n, me, db, ring, repls)
+ {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
end
defp get_prefix(next, key, r, me, repls, n, ref) do
end
end
+ defp sync_db(name, me, db, ring, repls) do
+ Task.start(fn ->
+ db
+ |> DB.stream()
+ |> Stream.take_while(fn
+ :"$end_of_table" -> false
+ _ -> true
+ end)
+ |> Stream.chunk_every(10)
+ |> Stream.each(fn chunk ->
+ Enum.each(chunk, fn key ->
+ nodes =
+ ring
+ |> HashRing.key_to_nodes(key, repls)
+ |> Enum.filter(fn
+ ^me -> false
+ _ -> true
+ end)
+
+ v = DB.get(db, key)
+ Logger.debug("Sync: #{key}: #{inspect(v)}")
+ cast_sync(name, nodes, key, v)
+ end)
+ end)
+ |> Stream.run()
+ end)
+ end
+
defp maybe_reply(ref, reqs, val) do
case get_request(ref, reqs) do
{_, from, 1, res} ->