From de1cffe6dd381e88bdf5f8c245b8da986cf068f0 Mon Sep 17 00:00:00 2001 From: Christopher Date: Sun, 8 Mar 2020 16:11:14 -0500 Subject: [PATCH] don't send rebalancing records to self, run in async task --- lib/cluster_kv/ring.ex | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 8b16df6..7b133ba 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -277,22 +277,22 @@ defmodule ClusterKV.Ring do def handle_info( {:nodeup, node, info}, _, - %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl + %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl ) do Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}") ring = HashRing.add_node(r, node) - redistribute_data(n, db, r, ring, repls) + redistribute_data(n, me, db, r, ring, repls) {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]} end def handle_info( {:nodedown, node, info}, _, - %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl + %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl ) do Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}") ring = HashRing.remove_node(r, node) - redistribute_data(n, db, r, ring, repls) + redistribute_data(n, me, db, r, ring, repls) {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]} end @@ -341,21 +341,27 @@ defmodule ClusterKV.Ring do end) end - def redistribute_data(name, db, old_ring, new_ring, repls) do - batch = - db - |> DB.stream() - |> Enum.reduce(%{}, fn {k, v}, acc -> - old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new() - new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new() - nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list() + def redistribute_data(name, me, db, old_ring, new_ring, repls) do + Task.start(fn -> + batch = + db + |> DB.stream() + |> Enum.reduce(%{}, fn {k, v}, acc -> + old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new() + new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new() + nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list() - Enum.reduce(nodes, acc, fn n, a -> - Map.update(a, n, [], &[{k, v} | &1]) + Enum.reduce(nodes, acc, fn + ^me, a -> + a + + n, a -> + Map.update(a, n, [], &[{k, v} | &1]) + end) end) - end) - send_batch(name, batch) + send_batch(name, batch) + end) end defp maybe_reply(ref, reqs, val) do -- 2.45.3