From 3c3589ed9f35b0cc076f484276dea8b144c1e49a Mon Sep 17 00:00:00 2001 From: Christopher Date: Sun, 8 Mar 2020 15:24:38 -0500 Subject: [PATCH] start cluster after ring is up --- lib/cluster_kv.ex | 4 ++-- lib/cluster_kv/db.ex | 2 +- lib/cluster_kv/ring.ex | 41 +++-------------------------------------- 3 files changed, 6 insertions(+), 41 deletions(-) diff --git a/lib/cluster_kv.ex b/lib/cluster_kv.ex index 12aa55b..4910122 100644 --- a/lib/cluster_kv.ex +++ b/lib/cluster_kv.ex @@ -31,8 +31,8 @@ defmodule ClusterKV do children = [ {DB, [name: db]}, - {Cluster.Supervisor, [topologies, [name: cluster_supervisor]]}, - {Ring, [{:local, ring}, ring_data, []]} + {Ring, [{:local, ring}, ring_data, []]}, + {Cluster.Supervisor, [topologies, [name: cluster_supervisor]]} ] Supervisor.init(children, strategy: :rest_for_one, max_restarts: 5, max_seconds: 10) diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index e1fba76..c9a966b 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -92,7 +92,7 @@ defmodule ClusterKV.DB do {[], 0} lb -> - Process.send_after(self(), :process_batch, 10) + Process.send_after(self(), :process_batch, 0) {batch, lb} end end diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index c69cc97..f010a0d 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -13,7 +13,6 @@ defmodule ClusterKV.Ring do :ring, :db, :node, - init: true, requests: [], anti_entropy_interval: 5 * 60_000 ] @@ -23,7 +22,6 @@ defmodule ClusterKV.Ring do quorum: integer(), db: module(), node: node(), - init: boolean(), ring: HashRing.t(), anti_entropy_interval: integer() } @@ -105,29 +103,13 @@ defmodule ClusterKV.Ring do @handle_ready, _, @ready, - %SL{data: %Ring{init: false}} = sl + sl ) do Logger.info("Ready") {:ok, sl, []} end - def handle_resource( - @handle_ready, - _, - @ready, - %SL{data: %Ring{init: true, name: name, node: me} = data} = sl - ) do - Logger.info("Ready") - nodes = Node.list() - - Enum.each(nodes, fn n -> - send({name, n}, {:init_node, me}) - end) - - {:ok, %SL{sl | data: %Ring{data | init: false}}, []} - end - def handle_resource(@handle_node_up, _, _, sl) do {:ok, sl, [{:next_event, :internal, :node_added}]} end @@ -295,11 +277,11 @@ defmodule ClusterKV.Ring do def handle_info( {:nodeup, node, info}, _, - %SL{data: %Ring{ring: r} = data} = sl + %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl ) do Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}") ring = HashRing.add_node(r, node) - # wait for node to be ready for data, they will send us an :init_node message + redistribute_data(n, db, r, ring, repls) {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]} end @@ -320,23 +302,6 @@ defmodule ClusterKV.Ring do {:ok, sl, []} end - def handle_info({:init_node, node}, _, %SL{data: %Ring{name: n, ring: r, db: db}} = sl) do - batch = - db - |> DB.stream() - |> Enum.reduce(%{}, fn {k, v}, acc -> - d_node = HashRing.key_to_node(r, k) - - case d_node do - ^node -> Map.update(acc, node, [{k, v}], &[{k, v} | &1]) - _ -> acc - end - end) - - send_batch(n, batch) - {:ok, sl, []} - end - def handle_info(e, s, sl) do Logger.info("Unknown Info Event: #{inspect(e)} in state #{s} with data #{inspect(sl)}") {:ok, sl, []} -- 2.45.3