]> Entropealabs - cluster_kv.git/commitdiff
use saame redistribution for node up and down
authorChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 05:53:28 +0000 (23:53 -0600)
committerChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 05:53:28 +0000 (23:53 -0600)
lib/cluster_kv/ring.ex

index b4b5157b14783926a3cbbdc99588760e22f5b2ae..90d9da17e46b749d10d13ac04e4cc6087eeaf6c1 100644 (file)
@@ -281,7 +281,7 @@ defmodule ClusterKV.Ring do
       ) do
     Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
     ring = HashRing.add_node(r, node)
-    populate_new_node(node, n, db, ring, repls)
+    redistribute_data(n, db, r, ring, repls)
     {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]}
   end
 
@@ -292,7 +292,7 @@ defmodule ClusterKV.Ring do
       ) do
     Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
     ring = HashRing.remove_node(r, node)
-    redistribute_old_node(n, db, r, ring, repls)
+    redistribute_data(n, db, r, ring, repls)
     {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
   end
 
@@ -335,26 +335,7 @@ defmodule ClusterKV.Ring do
     end)
   end
 
-  def populate_new_node(node, name, db, ring, repls) do
-    batch =
-      db
-      |> DB.stream()
-      |> Enum.reduce(%{}, fn {k, v}, acc ->
-        nodes = HashRing.key_to_nodes(ring, k, repls)
-
-        case node in nodes do
-          true ->
-            Map.update(acc, node, [], &[{k, v} | &1])
-
-          false ->
-            acc
-        end
-      end)
-
-    send_batch(name, batch)
-  end
-
-  def redistribute_old_node(name, db, old_ring, new_ring, repls) do
+  def redistribute_data(name, db, old_ring, new_ring, repls) do
     batch =
       db
       |> DB.stream()