]> Entropealabs - cluster_kv.git/commitdiff
database streaming and syncing
authorChristopher <chris@entropealabs.com>
Sat, 7 Mar 2020 07:36:52 +0000 (01:36 -0600)
committerChristopher <chris@entropealabs.com>
Sat, 7 Mar 2020 07:36:52 +0000 (01:36 -0600)
lib/cluster_kv.ex
lib/cluster_kv/db.ex
lib/cluster_kv/ring.ex

index 9a40d131f9e8fa344f7401b443caa038011636b8..a8fe61bd6bfd889254eb1cf52fb29c54b9c6768b 100644 (file)
@@ -45,4 +45,5 @@ defmodule ClusterKV do
   defdelegate get(name, key), to: Ring
   defdelegate put(name, key, value), to: Ring
   defdelegate prefix(name, key, split_on, min), to: Ring
+  defdelegate stream(name), to: Ring
 end
index 9d7ccc13661175f49e6a0a856d089e512e3a58ef..86e5a88223621ab5b4f41092054dc7407fdfd018 100644 (file)
@@ -4,7 +4,7 @@ defmodule ClusterKV.DB do
 
   alias __MODULE__
 
-  @db_options [:set, :private]
+  @db_options [:set]
 
   defstruct [:db]
 
@@ -22,6 +22,10 @@ defmodule ClusterKV.DB do
     GenServer.cast(name, {:put, key, value})
   end
 
+  def stream(name) do
+    GenServer.call(name, :stream)
+  end
+
   def start_link(name: name) do
     GenServer.start_link(__MODULE__, name, name: name)
   end
@@ -32,6 +36,10 @@ defmodule ClusterKV.DB do
     {:ok, %DB{db: db}}
   end
 
+  def handle_call(:stream, _from, %DB{db: db} = state) do
+    {:reply, Stream.iterate(:ets.first(db), &:ets.next(db, &1)), state}
+  end
+
   def handle_call({:get, key}, _from, %DB{db: db} = state) do
     {:reply,
      case :ets.lookup(db, key) do
index 1b014ebfbebd3bb485e7af2e6e1a1ef4fc8e0b03..085e06072a72bc236c7a4496fa8fadf23972c101 100644 (file)
@@ -59,6 +59,10 @@ defmodule ClusterKV.Ring do
     :gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min})
   end
 
+  def stream(name) do
+    :gen_statem.call(ClusterKV.ring_name(name), :stream)
+  end
+
   def handle_resource(@handle_init, _, @init, %SL{data: %Ring{} = data} = sl) do
     node = Node.self()
 
@@ -96,6 +100,16 @@ defmodule ClusterKV.Ring do
     {:ok, sl, [{:next_event, :internal, :node_removed}]}
   end
 
+  def handle_call(
+        :stream,
+        from,
+        @ready,
+        %SL{data: %Ring{name: n, node: me, db: db, ring: ring, replicas: repls}} = sl
+      ) do
+    s = sync_db(n, me, db, ring, repls)
+    {:ok, sl, [{:reply, from, s}]}
+  end
+
   def handle_call(
         {:get, key},
         from,
@@ -167,18 +181,26 @@ defmodule ClusterKV.Ring do
     {:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions}
   end
 
-  def handle_info({:nodeup, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do
+  def handle_info(
+        {:nodeup, node, info},
+        _,
+        %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl
+      ) do
     Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
-
-    {:ok, %SL{sl | data: %Ring{data | ring: HashRing.add_node(r, node)}},
-     [{:next_event, :internal, :node_up}]}
+    ring = HashRing.add_node(r, node)
+    sync_db(n, me, db, ring, repls)
+    {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]}
   end
 
-  def handle_info({:nodedown, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do
+  def handle_info(
+        {:nodedown, node, info},
+        _,
+        %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl
+      ) do
     Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
-
-    {:ok, %SL{sl | data: %Ring{data | ring: HashRing.remove_node(r, node)}},
-     [{:next_event, :internal, :node_down}]}
+    ring = HashRing.remove_node(r, node)
+    sync_db(n, me, db, ring, repls)
+    {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
   end
 
   defp get_prefix(next, key, r, me, repls, n, ref) do
@@ -196,6 +218,34 @@ defmodule ClusterKV.Ring do
     end
   end
 
+  defp sync_db(name, me, db, ring, repls) do
+    Task.start(fn ->
+      db
+      |> DB.stream()
+      |> Stream.take_while(fn
+        :"$end_of_table" -> false
+        _ -> true
+      end)
+      |> Stream.chunk_every(10)
+      |> Stream.each(fn chunk ->
+        Enum.each(chunk, fn key ->
+          nodes =
+            ring
+            |> HashRing.key_to_nodes(key, repls)
+            |> Enum.filter(fn
+              ^me -> false
+              _ -> true
+            end)
+
+          v = DB.get(db, key)
+          Logger.debug("Sync: #{key}: #{inspect(v)}")
+          cast_sync(name, nodes, key, v)
+        end)
+      end)
+      |> Stream.run()
+    end)
+  end
+
   defp maybe_reply(ref, reqs, val) do
     case get_request(ref, reqs) do
       {_, from, 1, res} ->