]> Entropealabs - cluster_kv.git/commitdiff
don't start task for rebalancing data
authorChristopher <chris@entropealabs.com>
Mon, 9 Mar 2020 14:46:20 +0000 (09:46 -0500)
committerChristopher <chris@entropealabs.com>
Mon, 9 Mar 2020 14:46:20 +0000 (09:46 -0500)
lib/cluster_kv/ring.ex

index 2924bf8764e5b8d926f19f8c8c98158832df0a50..505a21a8aec71a73b6a587d0057e2623ff579f7c 100644 (file)
@@ -358,26 +358,24 @@ defmodule ClusterKV.Ring do
   end
 
   def redistribute_data(name, me, db, old_ring, new_ring, repls) do
-    Task.start(fn ->
-      batch =
-        db
-        |> DB.stream()
-        |> Enum.reduce(%{}, fn {k, v}, acc ->
-          old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new()
-          new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new()
-          nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list()
-
-          Enum.reduce(nodes, acc, fn
-            ^me, a ->
-              a
-
-            n, a ->
-              Map.update(a, n, [], &[{k, v} | &1])
-          end)
+    batch =
+      db
+      |> DB.stream()
+      |> Enum.reduce(%{}, fn {k, v}, acc ->
+        old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new()
+        new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new()
+        nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list()
+
+        Enum.reduce(nodes, acc, fn
+          ^me, a ->
+            a
+
+          n, a ->
+            Map.update(a, n, [], &[{k, v} | &1])
         end)
+      end)
 
-      send_batch(name, batch)
-    end)
+    send_batch(name, batch)
   end
 
   defp maybe_reply(ref, reqs, val, node) do