]> Entropealabs - cluster_kv.git/commitdiff
don't send rebalancing records to self, run in async task
authorChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 21:11:14 +0000 (16:11 -0500)
committerChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 21:11:14 +0000 (16:11 -0500)
lib/cluster_kv/ring.ex

index 8b16df63f032003c823d2bb51fc02e44d672639a..7b133ba4525d81d843d3667ad4b7bfbeefd24556 100644 (file)
@@ -277,22 +277,22 @@ defmodule ClusterKV.Ring do
   def handle_info(
         {:nodeup, node, info},
         _,
-        %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl
+        %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl
       ) do
     Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
     ring = HashRing.add_node(r, node)
-    redistribute_data(n, db, r, ring, repls)
+    redistribute_data(n, me, db, r, ring, repls)
     {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]}
   end
 
   def handle_info(
         {:nodedown, node, info},
         _,
-        %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl
+        %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl
       ) do
     Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
     ring = HashRing.remove_node(r, node)
-    redistribute_data(n, db, r, ring, repls)
+    redistribute_data(n, me, db, r, ring, repls)
     {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
   end
 
@@ -341,21 +341,27 @@ defmodule ClusterKV.Ring do
     end)
   end
 
-  def redistribute_data(name, db, old_ring, new_ring, repls) do
-    batch =
-      db
-      |> DB.stream()
-      |> Enum.reduce(%{}, fn {k, v}, acc ->
-        old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new()
-        new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new()
-        nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list()
+  def redistribute_data(name, me, db, old_ring, new_ring, repls) do
+    Task.start(fn ->
+      batch =
+        db
+        |> DB.stream()
+        |> Enum.reduce(%{}, fn {k, v}, acc ->
+          old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new()
+          new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new()
+          nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list()
 
-        Enum.reduce(nodes, acc, fn n, a ->
-          Map.update(a, n, [], &[{k, v} | &1])
+          Enum.reduce(nodes, acc, fn
+            ^me, a ->
+              a
+
+            n, a ->
+              Map.update(a, n, [], &[{k, v} | &1])
+          end)
         end)
-      end)
 
-    send_batch(name, batch)
+      send_batch(name, batch)
+    end)
   end
 
   defp maybe_reply(ref, reqs, val) do