From: Christopher Date: Sun, 8 Mar 2020 05:53:28 +0000 (-0600) Subject: use saame redistribution for node up and down X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=1d6a497397461fa115904936a3e593d56319791f;p=cluster_kv.git use saame redistribution for node up and down --- diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index b4b5157..90d9da1 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -281,7 +281,7 @@ defmodule ClusterKV.Ring do ) do Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}") ring = HashRing.add_node(r, node) - populate_new_node(node, n, db, ring, repls) + redistribute_data(n, db, r, ring, repls) {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]} end @@ -292,7 +292,7 @@ defmodule ClusterKV.Ring do ) do Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}") ring = HashRing.remove_node(r, node) - redistribute_old_node(n, db, r, ring, repls) + redistribute_data(n, db, r, ring, repls) {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]} end @@ -335,26 +335,7 @@ defmodule ClusterKV.Ring do end) end - def populate_new_node(node, name, db, ring, repls) do - batch = - db - |> DB.stream() - |> Enum.reduce(%{}, fn {k, v}, acc -> - nodes = HashRing.key_to_nodes(ring, k, repls) - - case node in nodes do - true -> - Map.update(acc, node, [], &[{k, v} | &1]) - - false -> - acc - end - end) - - send_batch(name, batch) - end - - def redistribute_old_node(name, db, old_ring, new_ring, repls) do + def redistribute_data(name, db, old_ring, new_ring, repls) do batch = db |> DB.stream()