]> Entropealabs - cluster_kv.git/commitdiff
send :init_node message to peers when node is ready for data
authorChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 15:26:27 +0000 (10:26 -0500)
committerChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 15:26:27 +0000 (10:26 -0500)
lib/cluster_kv/ring.ex
priv/ring.json

index d4e5dd6b8d84f003bfa41362fd569152ae80951b..6106cf64bbc5d3264dba0418c1c158c5fa790eaf 100644 (file)
@@ -29,19 +29,13 @@ defmodule ClusterKV.Ring do
   # States
   @init "Init"
   @ready "Ready"
-  @quorum_node_up "QuorumNodeUp"
-  @quorum_node_down "QuorumNodeDown"
-  @ready_node_up "ReadyNodeUp"
-  @ready_node_down "ReadyNodeDown"
   @quorum "Quorum"
   @anti_entropy "AntiEntropy"
 
   # Resources
   @handle_init "HandleInit"
   @handle_ready "HandleReady"
-  @handle_node_up "HandleNodeUp"
   @await_quorum "AwaitQuorum"
-  @handle_node_down "HandleNodeDown"
   @handle_anti_entropy "HandleAntiEntropy"
 
   def put(name, key, value) do
@@ -103,17 +97,15 @@ defmodule ClusterKV.Ring do
     {:ok, sl, actions}
   end
 
-  def handle_resource(@handle_ready, _, @ready, sl) do
+  def handle_resource(@handle_ready, _, @ready, %SL{data: %Ring{name: name, node: me}} = sl) do
     Logger.info("Ready")
-    {:ok, sl, []}
-  end
+    nodes = Node.list()
 
-  def handle_resource(@handle_node_up, _, _, sl) do
-    {:ok, sl, [{:next_event, :internal, :node_added}]}
-  end
+    Enum.each(nodes, fn n ->
+      send({name, n}, {:init_node, me})
+    end)
 
-  def handle_resource(@handle_node_down, _, _, sl) do
-    {:ok, sl, [{:next_event, :internal, :node_removed}]}
+    {:ok, sl, []}
   end
 
   def handle_call(
@@ -275,11 +267,11 @@ defmodule ClusterKV.Ring do
   def handle_info(
         {:nodeup, node, info},
         _,
-        %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl
+        %SL{data: %Ring{ring: r} = data} = sl
       ) do
     Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
     ring = HashRing.add_node(r, node)
-    redistribute_data(n, db, r, ring, repls)
+    # wait for node to be ready for data, they will send us an :init_node message
     {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]}
   end
 
@@ -300,6 +292,22 @@ defmodule ClusterKV.Ring do
     {:ok, sl, []}
   end
 
+  def handle_info({:init_node, node}, _, %SL{data: %Ring{name: n, ring: r, db: db}}) do
+    batch =
+      db
+      |> DB.stream()
+      |> Enum.reduce(%{}, fn {k, v}, acc ->
+        d_node = HashRing.key_to_node(r, k)
+
+        case d_node do
+          ^node -> Map.update(acc, node, [{k, v}], &[{k, v}, &1])
+          _ -> acc
+        end
+      end)
+
+    send_batch(n, batch)
+  end
+
   def handle_info(e, s, sl) do
     Logger.info("Unknown Info Event: #{inspect(e)} in state #{s} with data #{inspect(sl)}")
     {:ok, sl, []}
index 4a07b37c16c9b48fa0bf5acb11b27ec77c9ebd9f..21675f642d82274ce1fdd710b1527a0becaca65d 100644 (file)
       "Type": "Choice",
       "Resource": "HandleReady",
       "Choices": [
-        {
-          "StringEquals": ":node_up",
-          "Next": "ReadyNodeUp"
-        },
-        {
-          "StringEquals": ":node_down",
-          "Next": "ReadyNodeDown"
-        },
         {
           "StringEquals": ":anti_entropy",
           "Next": "AntiEntropy"
       "InputPath": "",
       "OutputPath": ""
     },
-    "QuorumNodeUp": {
-      "Type": "Task",
-      "Resource": "HandleNodeUp",
-      "TransitionEvent": ":node_added",
-      "Catch": [],
-      "InputPath": "",
-      "OutputPath": "",
-      "ResourcePath": "",
-      "Next": "Quorum",
-      "End": false
-    },
-    "ReadyNodeUp": {
-      "Type": "Task",
-      "Resource": "HandleNodeUp",
-      "TransitionEvent": ":node_added",
-      "Catch": [],
-      "InputPath": "",
-      "OutputPath": "",
-      "ResourcePath": "",
-      "Next": "Ready",
-      "End": false
-    },
     "Quorum": {
       "Type": "Choice",
       "Resource": "AwaitQuorum",
         {
           "StringEquals": ":quorum",
           "Next": "Ready"
-        },
-        {
-          "StringEquals": ":node_up",
-          "Next": "QuorumNodeUp"
-        },
-        {
-          "StringEquals": ":node_down",
-          "Next": "QuorumNodeDown"
         }
       ],
       "InputPath": "",
       "OutputPath": ""
     },
-    "ReadyNodeDown": {
-      "Type": "Task",
-      "Resource": "HandleNodeDown",
-      "TransitionEvent": ":node_removed",
-      "Catch": [],
-      "InputPath": "",
-      "OutputPath": "",
-      "ResourcePath": "",
-      "Next": "Quorum",
-      "End": false
-    },
-    "QuorumNodeDown": {
-      "Type": "Task",
-      "Resource": "HandleNodeDown",
-      "TransitionEvent": ":node_removed",
-      "Catch": [],
-      "InputPath": "",
-      "OutputPath": "",
-      "ResourcePath": "",
-      "Next": "Quorum",
-      "End": false
-    },
     "AntiEntropy": {
       "Type": "Task",
       "Resource": "HandleAntiEntropy",