From 1d6a497397461fa115904936a3e593d56319791f Mon Sep 17 00:00:00 2001 From: Christopher Date: Sat, 7 Mar 2020 23:53:28 -0600 Subject: [PATCH] use saame redistribution for node up and down --- lib/cluster_kv/ring.ex | 25 +++---------------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index b4b5157..90d9da1 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -281,7 +281,7 @@ defmodule ClusterKV.Ring do ) do Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}") ring = HashRing.add_node(r, node) - populate_new_node(node, n, db, ring, repls) + redistribute_data(n, db, r, ring, repls) {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]} end @@ -292,7 +292,7 @@ defmodule ClusterKV.Ring do ) do Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}") ring = HashRing.remove_node(r, node) - redistribute_old_node(n, db, r, ring, repls) + redistribute_data(n, db, r, ring, repls) {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]} end @@ -335,26 +335,7 @@ defmodule ClusterKV.Ring do end) end - def populate_new_node(node, name, db, ring, repls) do - batch = - db - |> DB.stream() - |> Enum.reduce(%{}, fn {k, v}, acc -> - nodes = HashRing.key_to_nodes(ring, k, repls) - - case node in nodes do - true -> - Map.update(acc, node, [], &[{k, v} | &1]) - - false -> - acc - end - end) - - send_batch(name, batch) - end - - def redistribute_old_node(name, db, old_ring, new_ring, repls) do + def redistribute_data(name, db, old_ring, new_ring, repls) do batch = db |> DB.stream() -- 2.45.3