) do
Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
ring = HashRing.add_node(r, node)
- populate_new_node(node, n, db, ring, repls)
+ redistribute_data(n, db, r, ring, repls)
{:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]}
end
) do
Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
ring = HashRing.remove_node(r, node)
- redistribute_old_node(n, db, r, ring, repls)
+ redistribute_data(n, db, r, ring, repls)
{:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
end
end)
end
- def populate_new_node(node, name, db, ring, repls) do
- batch =
- db
- |> DB.stream()
- |> Enum.reduce(%{}, fn {k, v}, acc ->
- nodes = HashRing.key_to_nodes(ring, k, repls)
-
- case node in nodes do
- true ->
- Map.update(acc, node, [], &[{k, v} | &1])
-
- false ->
- acc
- end
- end)
-
- send_batch(name, batch)
- end
-
- def redistribute_old_node(name, db, old_ring, new_ring, repls) do
+ def redistribute_data(name, db, old_ring, new_ring, repls) do
batch =
db
|> DB.stream()