]> Entropealabs - cluster_kv.git/commitdiff
start cluster after ring is up
authorChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 20:24:38 +0000 (15:24 -0500)
committerChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 20:24:38 +0000 (15:24 -0500)
lib/cluster_kv.ex
lib/cluster_kv/db.ex
lib/cluster_kv/ring.ex

index 12aa55b6972a7ccf2918820952dc6d54fba419ec..49101226d3957cd9a07368b49f1d9bfe9c963bca 100644 (file)
@@ -31,8 +31,8 @@ defmodule ClusterKV do
 
     children = [
       {DB, [name: db]},
-      {Cluster.Supervisor, [topologies, [name: cluster_supervisor]]},
-      {Ring, [{:local, ring}, ring_data, []]}
+      {Ring, [{:local, ring}, ring_data, []]},
+      {Cluster.Supervisor, [topologies, [name: cluster_supervisor]]}
     ]
 
     Supervisor.init(children, strategy: :rest_for_one, max_restarts: 5, max_seconds: 10)
index e1fba76758d64986cd7bb6405140e42d6cd24a9f..c9a966b75385686e5941df682e9cc114b4cf108c 100644 (file)
@@ -92,7 +92,7 @@ defmodule ClusterKV.DB do
         {[], 0}
 
       lb ->
-        Process.send_after(self(), :process_batch, 10)
+        Process.send_after(self(), :process_batch, 0)
         {batch, lb}
     end
   end
index c69cc97b77588bee3c3c79c513f9d0f0352aa815..f010a0d94d8abe04dbfd2302d6676b3849f82137 100644 (file)
@@ -13,7 +13,6 @@ defmodule ClusterKV.Ring do
     :ring,
     :db,
     :node,
-    init: true,
     requests: [],
     anti_entropy_interval: 5 * 60_000
   ]
@@ -23,7 +22,6 @@ defmodule ClusterKV.Ring do
           quorum: integer(),
           db: module(),
           node: node(),
-          init: boolean(),
           ring: HashRing.t(),
           anti_entropy_interval: integer()
         }
@@ -105,29 +103,13 @@ defmodule ClusterKV.Ring do
         @handle_ready,
         _,
         @ready,
-        %SL{data: %Ring{init: false}} = sl
+        sl
       ) do
     Logger.info("Ready")
 
     {:ok, sl, []}
   end
 
-  def handle_resource(
-        @handle_ready,
-        _,
-        @ready,
-        %SL{data: %Ring{init: true, name: name, node: me} = data} = sl
-      ) do
-    Logger.info("Ready")
-    nodes = Node.list()
-
-    Enum.each(nodes, fn n ->
-      send({name, n}, {:init_node, me})
-    end)
-
-    {:ok, %SL{sl | data: %Ring{data | init: false}}, []}
-  end
-
   def handle_resource(@handle_node_up, _, _, sl) do
     {:ok, sl, [{:next_event, :internal, :node_added}]}
   end
@@ -295,11 +277,11 @@ defmodule ClusterKV.Ring do
   def handle_info(
         {:nodeup, node, info},
         _,
-        %SL{data: %Ring{ring: r} = data} = sl
+        %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl
       ) do
     Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
     ring = HashRing.add_node(r, node)
-    # wait for node to be ready for data, they will send us an :init_node message
+    redistribute_data(n, db, r, ring, repls)
     {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]}
   end
 
@@ -320,23 +302,6 @@ defmodule ClusterKV.Ring do
     {:ok, sl, []}
   end
 
-  def handle_info({:init_node, node}, _, %SL{data: %Ring{name: n, ring: r, db: db}} = sl) do
-    batch =
-      db
-      |> DB.stream()
-      |> Enum.reduce(%{}, fn {k, v}, acc ->
-        d_node = HashRing.key_to_node(r, k)
-
-        case d_node do
-          ^node -> Map.update(acc, node, [{k, v}], &[{k, v} | &1])
-          _ -> acc
-        end
-      end)
-
-    send_batch(n, batch)
-    {:ok, sl, []}
-  end
-
   def handle_info(e, s, sl) do
     Logger.info("Unknown Info Event: #{inspect(e)} in state #{s} with data #{inspect(sl)}")
     {:ok, sl, []}