]> Entropealabs - cluster_kv.git/commitdiff
rebalance cluster when nodes go up and down
authorChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 04:01:41 +0000 (22:01 -0600)
committerChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 04:01:41 +0000 (22:01 -0600)
lib/cluster_kv/db.ex
lib/cluster_kv/ring.ex

index 3d48b99f207ca88759b3045d0c81f7cec0feadc7..f334c47b7e16183c5a64edd9d95a3256f4fa78fa 100644 (file)
@@ -6,10 +6,14 @@ defmodule ClusterKV.DB do
 
   @db_options [:set, :public]
 
-  defstruct [:db]
+  defstruct [:db, :batch, :batch_chunk, :batch_fun, last_batch: 0]
 
   @type t :: %__MODULE__{
-          db: :ets.tid() | atom()
+          db: :ets.tid() | atom(),
+          batch: list(),
+          batch_chunk: integer(),
+          batch_fun: fun(),
+          last_batch: integer()
         }
 
   @spec get(name :: module(), key :: binary()) :: [tuple()]
@@ -27,8 +31,8 @@ defmodule ClusterKV.DB do
     GenServer.cast(name, {:upsert, key, value, fun})
   end
 
-  def batch(name, batch, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do
-    GenServer.cast(name, {:batch, batch, fun})
+  def batch(name, batch, chunk \\ 100, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do
+    GenServer.cast(name, {:batch, batch, chunk, fun})
   end
 
   def stream(name, timeout \\ :infinity) do
@@ -63,12 +67,34 @@ defmodule ClusterKV.DB do
     {:noreply, state}
   end
 
-  def handle_cast({:batch, batch, fun}, %DB{db: db} = state) do
-    Enum.each(batch, fn {k, v} ->
+  def handle_cast({:batch, batch, chunk, fun}, %DB{db: db, last_batch: lb} = state) do
+    {batch, lb} = handle_next_batch_chunk(db, batch, chunk, lb, fun)
+    {:noreply, %DB{state | batch: batch, batch_chunk: chunk, batch_fun: fun, last_batch: lb}}
+  end
+
+  def handle_info(
+        :process_batch,
+        %DB{db: db, batch_chunk: chunk, batch_fun: fun, batch: batch, last_batch: lb} = state
+      ) do
+    {batch, lb} = handle_next_batch_chunk(db, batch, chunk, lb, fun)
+    {:noreply, %DB{state | batch: batch, last_batch: lb}}
+  end
+
+  defp handle_next_batch_chunk(db, batch, chunk, last_batch, fun) do
+    batch
+    |> Enum.slice(last_batch, chunk)
+    |> Enum.each(fn {k, v} ->
       do_upsert(db, k, v, fun)
     end)
 
-    {:noreply, state}
+    case last_batch + chunk do
+      lb when lb > length(batch) ->
+        {[], 0}
+
+      lb ->
+        Process.send_after(self(), :process_batch, 0)
+        {batch, lb}
+    end
   end
 
   defp do_upsert(db, key, value, fun) do
index 74082c85375f1c1e816969a91f7b5246cb882be7..4feea0022d043b45a876c0d8af26d0a99a380665 100644 (file)
@@ -277,26 +277,27 @@ defmodule ClusterKV.Ring do
   def handle_info(
         {:nodeup, node, info},
         _,
-        %SL{data: %Ring{name: _n, node: _me, db: _db, ring: r, replicas: _repls} = data} = sl
+        %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl
       ) do
     Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
     ring = HashRing.add_node(r, node)
-    # sync_db(n, me, db, ring, repls)
+    populate_new_node(node, n, db, ring, repls)
     {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]}
   end
 
   def handle_info(
         {:nodedown, node, info},
         _,
-        %SL{data: %Ring{name: _n, node: _me, db: _db, ring: r, replicas: _repls} = data} = sl
+        %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl
       ) do
     Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
     ring = HashRing.remove_node(r, node)
-    # sync_db(n, me, db, ring, repls)
+    redistribute_old_node(n, db, r, ring, repls)
     {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
   end
 
   def handle_info({:handle_batch, batch}, @ready, %SL{data: %Ring{db: db}} = sl) do
+    Logger.info("Handling Batch")
     DB.batch(db, batch)
     {:ok, sl, []}
   end
@@ -334,6 +335,42 @@ defmodule ClusterKV.Ring do
     end)
   end
 
+  def populate_new_node(node, name, db, ring, repls) do
+    batch =
+      db
+      |> DB.stream()
+      |> Enum.reduce(%{}, fn {k, v}, acc ->
+        nodes = HashRing.key_to_nodes(ring, k, repls)
+
+        case node in nodes do
+          true ->
+            Map.update(acc, node, [], &[{k, v} | &1])
+
+          false ->
+            acc
+        end
+      end)
+
+    send_batch(name, batch)
+  end
+
+  def redistribute_old_node(name, db, old_ring, new_ring, repls) do
+    batch =
+      db
+      |> DB.stream()
+      |> Enum.reduce(%{}, fn {k, v}, acc ->
+        old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new()
+        new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new()
+        nodes = MapSet.intersection(new_nodes, old_nodes) |> MapSet.to_list()
+
+        Enum.reduce(nodes, acc, fn n, a ->
+          Map.update(a, n, [], &[{k, v} | &1])
+        end)
+      end)
+
+    send_batch(name, batch)
+  end
+
   defp maybe_reply(ref, reqs, val) do
     case get_request(ref, reqs) do
       {_, from, 1, res} ->