def handle_info(
{:nodeup, node, info},
_,
- %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl
+ %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl
) do
Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
ring = HashRing.add_node(r, node)
- redistribute_data(n, db, r, ring, repls)
+ redistribute_data(n, me, db, r, ring, repls)
{:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]}
end
def handle_info(
{:nodedown, node, info},
_,
- %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl
+ %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl
) do
Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
ring = HashRing.remove_node(r, node)
- redistribute_data(n, db, r, ring, repls)
+ redistribute_data(n, me, db, r, ring, repls)
{:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
end
end)
end
- def redistribute_data(name, db, old_ring, new_ring, repls) do
- batch =
- db
- |> DB.stream()
- |> Enum.reduce(%{}, fn {k, v}, acc ->
- old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new()
- new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new()
- nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list()
+ def redistribute_data(name, me, db, old_ring, new_ring, repls) do
+ Task.start(fn ->
+ batch =
+ db
+ |> DB.stream()
+ |> Enum.reduce(%{}, fn {k, v}, acc ->
+ old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new()
+ new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new()
+ nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list()
- Enum.reduce(nodes, acc, fn n, a ->
- Map.update(a, n, [], &[{k, v} | &1])
+ Enum.reduce(nodes, acc, fn
+ ^me, a ->
+ a
+
+ n, a ->
+ Map.update(a, n, [], &[{k, v} | &1])
+ end)
end)
- end)
- send_batch(name, batch)
+ send_batch(name, batch)
+ end)
end
defp maybe_reply(ref, reqs, val) do