From: Christopher Date: Sat, 7 Mar 2020 07:36:52 +0000 (-0600) Subject: database streaming and syncing X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=8bbb704da2ab775d0e6fe5a9187ea75c308026a9;p=cluster_kv.git database streaming and syncing --- diff --git a/lib/cluster_kv.ex b/lib/cluster_kv.ex index 9a40d13..a8fe61b 100644 --- a/lib/cluster_kv.ex +++ b/lib/cluster_kv.ex @@ -45,4 +45,5 @@ defmodule ClusterKV do defdelegate get(name, key), to: Ring defdelegate put(name, key, value), to: Ring defdelegate prefix(name, key, split_on, min), to: Ring + defdelegate stream(name), to: Ring end diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index 9d7ccc1..86e5a88 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -4,7 +4,7 @@ defmodule ClusterKV.DB do alias __MODULE__ - @db_options [:set, :private] + @db_options [:set] defstruct [:db] @@ -22,6 +22,10 @@ defmodule ClusterKV.DB do GenServer.cast(name, {:put, key, value}) end + def stream(name) do + GenServer.call(name, :stream) + end + def start_link(name: name) do GenServer.start_link(__MODULE__, name, name: name) end @@ -32,6 +36,10 @@ defmodule ClusterKV.DB do {:ok, %DB{db: db}} end + def handle_call(:stream, _from, %DB{db: db} = state) do + {:reply, Stream.iterate(:ets.first(db), &:ets.next(db, &1)), state} + end + def handle_call({:get, key}, _from, %DB{db: db} = state) do {:reply, case :ets.lookup(db, key) do diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 1b014eb..085e060 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -59,6 +59,10 @@ defmodule ClusterKV.Ring do :gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min}) end + def stream(name) do + :gen_statem.call(ClusterKV.ring_name(name), :stream) + end + def handle_resource(@handle_init, _, @init, %SL{data: %Ring{} = data} = sl) do node = Node.self() @@ -96,6 +100,16 @@ defmodule ClusterKV.Ring do {:ok, sl, [{:next_event, :internal, :node_removed}]} end + def handle_call( + :stream, + from, + @ready, + %SL{data: %Ring{name: n, node: me, db: db, ring: ring, replicas: repls}} = sl + ) do + s = sync_db(n, me, db, ring, repls) + {:ok, sl, [{:reply, from, s}]} + end + def handle_call( {:get, key}, from, @@ -167,18 +181,26 @@ defmodule ClusterKV.Ring do {:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions} end - def handle_info({:nodeup, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do + def handle_info( + {:nodeup, node, info}, + _, + %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl + ) do Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}") - - {:ok, %SL{sl | data: %Ring{data | ring: HashRing.add_node(r, node)}}, - [{:next_event, :internal, :node_up}]} + ring = HashRing.add_node(r, node) + sync_db(n, me, db, ring, repls) + {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]} end - def handle_info({:nodedown, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do + def handle_info( + {:nodedown, node, info}, + _, + %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl + ) do Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}") - - {:ok, %SL{sl | data: %Ring{data | ring: HashRing.remove_node(r, node)}}, - [{:next_event, :internal, :node_down}]} + ring = HashRing.remove_node(r, node) + sync_db(n, me, db, ring, repls) + {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]} end defp get_prefix(next, key, r, me, repls, n, ref) do @@ -196,6 +218,34 @@ defmodule ClusterKV.Ring do end end + defp sync_db(name, me, db, ring, repls) do + Task.start(fn -> + db + |> DB.stream() + |> Stream.take_while(fn + :"$end_of_table" -> false + _ -> true + end) + |> Stream.chunk_every(10) + |> Stream.each(fn chunk -> + Enum.each(chunk, fn key -> + nodes = + ring + |> HashRing.key_to_nodes(key, repls) + |> Enum.filter(fn + ^me -> false + _ -> true + end) + + v = DB.get(db, key) + Logger.debug("Sync: #{key}: #{inspect(v)}") + cast_sync(name, nodes, key, v) + end) + end) + |> Stream.run() + end) + end + defp maybe_reply(ref, reqs, val) do case get_request(ref, reqs) do {_, from, 1, res} ->