From d689a702118ad32d4b6faa132a0e16567eafd93b Mon Sep 17 00:00:00 2001 From: Christopher Date: Sat, 7 Mar 2020 22:01:41 -0600 Subject: [PATCH] rebalance cluster when nodes go up and down --- lib/cluster_kv/db.ex | 40 ++++++++++++++++++++++++++++++------- lib/cluster_kv/ring.ex | 45 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index 3d48b99..f334c47 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -6,10 +6,14 @@ defmodule ClusterKV.DB do @db_options [:set, :public] - defstruct [:db] + defstruct [:db, :batch, :batch_chunk, :batch_fun, last_batch: 0] @type t :: %__MODULE__{ - db: :ets.tid() | atom() + db: :ets.tid() | atom(), + batch: list(), + batch_chunk: integer(), + batch_fun: fun(), + last_batch: integer() } @spec get(name :: module(), key :: binary()) :: [tuple()] @@ -27,8 +31,8 @@ defmodule ClusterKV.DB do GenServer.cast(name, {:upsert, key, value, fun}) end - def batch(name, batch, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do - GenServer.cast(name, {:batch, batch, fun}) + def batch(name, batch, chunk \\ 100, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do + GenServer.cast(name, {:batch, batch, chunk, fun}) end def stream(name, timeout \\ :infinity) do @@ -63,12 +67,34 @@ defmodule ClusterKV.DB do {:noreply, state} end - def handle_cast({:batch, batch, fun}, %DB{db: db} = state) do - Enum.each(batch, fn {k, v} -> + def handle_cast({:batch, batch, chunk, fun}, %DB{db: db, last_batch: lb} = state) do + {batch, lb} = handle_next_batch_chunk(db, batch, chunk, lb, fun) + {:noreply, %DB{state | batch: batch, batch_chunk: chunk, batch_fun: fun, last_batch: lb}} + end + + def handle_info( + :process_batch, + %DB{db: db, batch_chunk: chunk, batch_fun: fun, batch: batch, last_batch: lb} = state + ) do + {batch, lb} = handle_next_batch_chunk(db, batch, chunk, lb, fun) + {:noreply, %DB{state | batch: batch, last_batch: lb}} + end + + defp handle_next_batch_chunk(db, batch, chunk, last_batch, fun) do + batch + |> Enum.slice(last_batch, chunk) + |> Enum.each(fn {k, v} -> do_upsert(db, k, v, fun) end) - {:noreply, state} + case last_batch + chunk do + lb when lb > length(batch) -> + {[], 0} + + lb -> + Process.send_after(self(), :process_batch, 0) + {batch, lb} + end end defp do_upsert(db, key, value, fun) do diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 74082c8..4feea00 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -277,26 +277,27 @@ defmodule ClusterKV.Ring do def handle_info( {:nodeup, node, info}, _, - %SL{data: %Ring{name: _n, node: _me, db: _db, ring: r, replicas: _repls} = data} = sl + %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl ) do Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}") ring = HashRing.add_node(r, node) - # sync_db(n, me, db, ring, repls) + populate_new_node(node, n, db, ring, repls) {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]} end def handle_info( {:nodedown, node, info}, _, - %SL{data: %Ring{name: _n, node: _me, db: _db, ring: r, replicas: _repls} = data} = sl + %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl ) do Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}") ring = HashRing.remove_node(r, node) - # sync_db(n, me, db, ring, repls) + redistribute_old_node(n, db, r, ring, repls) {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]} end def handle_info({:handle_batch, batch}, @ready, %SL{data: %Ring{db: db}} = sl) do + Logger.info("Handling Batch") DB.batch(db, batch) {:ok, sl, []} end @@ -334,6 +335,42 @@ defmodule ClusterKV.Ring do end) end + def populate_new_node(node, name, db, ring, repls) do + batch = + db + |> DB.stream() + |> Enum.reduce(%{}, fn {k, v}, acc -> + nodes = HashRing.key_to_nodes(ring, k, repls) + + case node in nodes do + true -> + Map.update(acc, node, [], &[{k, v} | &1]) + + false -> + acc + end + end) + + send_batch(name, batch) + end + + def redistribute_old_node(name, db, old_ring, new_ring, repls) do + batch = + db + |> DB.stream() + |> Enum.reduce(%{}, fn {k, v}, acc -> + old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new() + new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new() + nodes = MapSet.intersection(new_nodes, old_nodes) |> MapSet.to_list() + + Enum.reduce(nodes, acc, fn n, a -> + Map.update(a, n, [], &[{k, v} | &1]) + end) + end) + + send_batch(name, batch) + end + defp maybe_reply(ref, reqs, val) do case get_request(ref, reqs) do {_, from, 1, res} -> -- 2.45.3