From: Christopher Date: Mon, 9 Mar 2020 14:46:20 +0000 (-0500) Subject: don't start task for rebalancing data X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=f7550c5414d8af16db13c39b09f0d9bdd5c01a34;p=cluster_kv.git don't start task for rebalancing data --- diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 2924bf8..505a21a 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -358,26 +358,24 @@ defmodule ClusterKV.Ring do end def redistribute_data(name, me, db, old_ring, new_ring, repls) do - Task.start(fn -> - batch = - db - |> DB.stream() - |> Enum.reduce(%{}, fn {k, v}, acc -> - old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new() - new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new() - nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list() - - Enum.reduce(nodes, acc, fn - ^me, a -> - a - - n, a -> - Map.update(a, n, [], &[{k, v} | &1]) - end) + batch = + db + |> DB.stream() + |> Enum.reduce(%{}, fn {k, v}, acc -> + old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new() + new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new() + nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list() + + Enum.reduce(nodes, acc, fn + ^me, a -> + a + + n, a -> + Map.update(a, n, [], &[{k, v} | &1]) end) + end) - send_batch(name, batch) - end) + send_batch(name, batch) end defp maybe_reply(ref, reqs, val, node) do