From c65eedbdfe8ece047d031caeba378e1e45f1e4b1 Mon Sep 17 00:00:00 2001 From: Christopher Date: Sun, 8 Mar 2020 10:26:27 -0500 Subject: [PATCH] send :init_node message to peers when node is ready for data --- lib/cluster_kv/ring.ex | 40 +++++++++++++++++----------- priv/ring.json | 60 ------------------------------------------ 2 files changed, 24 insertions(+), 76 deletions(-) diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index d4e5dd6..6106cf6 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -29,19 +29,13 @@ defmodule ClusterKV.Ring do # States @init "Init" @ready "Ready" - @quorum_node_up "QuorumNodeUp" - @quorum_node_down "QuorumNodeDown" - @ready_node_up "ReadyNodeUp" - @ready_node_down "ReadyNodeDown" @quorum "Quorum" @anti_entropy "AntiEntropy" # Resources @handle_init "HandleInit" @handle_ready "HandleReady" - @handle_node_up "HandleNodeUp" @await_quorum "AwaitQuorum" - @handle_node_down "HandleNodeDown" @handle_anti_entropy "HandleAntiEntropy" def put(name, key, value) do @@ -103,17 +97,15 @@ defmodule ClusterKV.Ring do {:ok, sl, actions} end - def handle_resource(@handle_ready, _, @ready, sl) do + def handle_resource(@handle_ready, _, @ready, %SL{data: %Ring{name: name, node: me}} = sl) do Logger.info("Ready") - {:ok, sl, []} - end + nodes = Node.list() - def handle_resource(@handle_node_up, _, _, sl) do - {:ok, sl, [{:next_event, :internal, :node_added}]} - end + Enum.each(nodes, fn n -> + send({name, n}, {:init_node, me}) + end) - def handle_resource(@handle_node_down, _, _, sl) do - {:ok, sl, [{:next_event, :internal, :node_removed}]} + {:ok, sl, []} end def handle_call( @@ -275,11 +267,11 @@ defmodule ClusterKV.Ring do def handle_info( {:nodeup, node, info}, _, - %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl + %SL{data: %Ring{ring: r} = data} = sl ) do Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}") ring = HashRing.add_node(r, node) - redistribute_data(n, db, r, ring, repls) + # wait for node to be ready for data, they will send us an :init_node message {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]} end @@ -300,6 +292,22 @@ defmodule ClusterKV.Ring do {:ok, sl, []} end + def handle_info({:init_node, node}, _, %SL{data: %Ring{name: n, ring: r, db: db}}) do + batch = + db + |> DB.stream() + |> Enum.reduce(%{}, fn {k, v}, acc -> + d_node = HashRing.key_to_node(r, k) + + case d_node do + ^node -> Map.update(acc, node, [{k, v}], &[{k, v}, &1]) + _ -> acc + end + end) + + send_batch(n, batch) + end + def handle_info(e, s, sl) do Logger.info("Unknown Info Event: #{inspect(e)} in state #{s} with data #{inspect(sl)}") {:ok, sl, []} diff --git a/priv/ring.json b/priv/ring.json index 4a07b37..21675f6 100644 --- a/priv/ring.json +++ b/priv/ring.json @@ -17,14 +17,6 @@ "Type": "Choice", "Resource": "HandleReady", "Choices": [ - { - "StringEquals": ":node_up", - "Next": "ReadyNodeUp" - }, - { - "StringEquals": ":node_down", - "Next": "ReadyNodeDown" - }, { "StringEquals": ":anti_entropy", "Next": "AntiEntropy" @@ -33,28 +25,6 @@ "InputPath": "", "OutputPath": "" }, - "QuorumNodeUp": { - "Type": "Task", - "Resource": "HandleNodeUp", - "TransitionEvent": ":node_added", - "Catch": [], - "InputPath": "", - "OutputPath": "", - "ResourcePath": "", - "Next": "Quorum", - "End": false - }, - "ReadyNodeUp": { - "Type": "Task", - "Resource": "HandleNodeUp", - "TransitionEvent": ":node_added", - "Catch": [], - "InputPath": "", - "OutputPath": "", - "ResourcePath": "", - "Next": "Ready", - "End": false - }, "Quorum": { "Type": "Choice", "Resource": "AwaitQuorum", @@ -62,41 +32,11 @@ { "StringEquals": ":quorum", "Next": "Ready" - }, - { - "StringEquals": ":node_up", - "Next": "QuorumNodeUp" - }, - { - "StringEquals": ":node_down", - "Next": "QuorumNodeDown" } ], "InputPath": "", "OutputPath": "" }, - "ReadyNodeDown": { - "Type": "Task", - "Resource": "HandleNodeDown", - "TransitionEvent": ":node_removed", - "Catch": [], - "InputPath": "", - "OutputPath": "", - "ResourcePath": "", - "Next": "Quorum", - "End": false - }, - "QuorumNodeDown": { - "Type": "Task", - "Resource": "HandleNodeDown", - "TransitionEvent": ":node_removed", - "Catch": [], - "InputPath": "", - "OutputPath": "", - "ResourcePath": "", - "Next": "Quorum", - "End": false - }, "AntiEntropy": { "Type": "Task", "Resource": "HandleAntiEntropy", -- 2.45.3