end
def redistribute_data(name, me, db, old_ring, new_ring, repls) do
- Task.start(fn ->
- batch =
- db
- |> DB.stream()
- |> Enum.reduce(%{}, fn {k, v}, acc ->
- old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new()
- new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new()
- nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list()
-
- Enum.reduce(nodes, acc, fn
- ^me, a ->
- a
-
- n, a ->
- Map.update(a, n, [], &[{k, v} | &1])
- end)
+ batch =
+ db
+ |> DB.stream()
+ |> Enum.reduce(%{}, fn {k, v}, acc ->
+ old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new()
+ new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new()
+ nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list()
+
+ Enum.reduce(nodes, acc, fn
+ ^me, a ->
+ a
+
+ n, a ->
+ Map.update(a, n, [], &[{k, v} | &1])
end)
+ end)
- send_batch(name, batch)
- end)
+ send_batch(name, batch)
end
defp maybe_reply(ref, reqs, val, node) do