From f7550c5414d8af16db13c39b09f0d9bdd5c01a34 Mon Sep 17 00:00:00 2001 From: Christopher Date: Mon, 9 Mar 2020 09:46:20 -0500 Subject: [PATCH] don't start task for rebalancing data --- lib/cluster_kv/ring.ex | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 2924bf8..505a21a 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -358,26 +358,24 @@ defmodule ClusterKV.Ring do end def redistribute_data(name, me, db, old_ring, new_ring, repls) do - Task.start(fn -> - batch = - db - |> DB.stream() - |> Enum.reduce(%{}, fn {k, v}, acc -> - old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new() - new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new() - nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list() - - Enum.reduce(nodes, acc, fn - ^me, a -> - a - - n, a -> - Map.update(a, n, [], &[{k, v} | &1]) - end) + batch = + db + |> DB.stream() + |> Enum.reduce(%{}, fn {k, v}, acc -> + old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new() + new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new() + nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list() + + Enum.reduce(nodes, acc, fn + ^me, a -> + a + + n, a -> + Map.update(a, n, [], &[{k, v} | &1]) end) + end) - send_batch(name, batch) - end) + send_batch(name, batch) end defp maybe_reply(ref, reqs, val, node) do -- 2.45.3