From 5674b9c799c71efee6772968fd83fc1e70661425 Mon Sep 17 00:00:00 2001 From: Christopher Date: Fri, 6 Mar 2020 17:47:51 -0600 Subject: [PATCH] use local node if available for gets --- lib/cluster_kv/db.ex | 1 + lib/cluster_kv/ring.ex | 31 +++++++++++++++++++++++++------ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index 628c91e..fcb7c52 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -42,6 +42,7 @@ defmodule ClusterKV.DB do end def init(db_path) when is_list(db_path) do + Logger.info("Opening database at #{db_path}") {:ok, db} = :rocksdb.open(db_path, @db_options) {:ok, %DB{db: db}} end diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 701bc94..2192fca 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -12,6 +12,7 @@ defmodule ClusterKV.Ring do :quorum, :ring, :db, + :node, requests: [], anti_entropy_interval: 5 * 60_000 ] @@ -20,6 +21,7 @@ defmodule ClusterKV.Ring do replicas: integer(), quorum: integer(), db: module(), + node: node(), ring: HashRing.t(), anti_entropy_interval: integer() } @@ -55,13 +57,17 @@ defmodule ClusterKV.Ring do end def handle_resource(@handle_init, _, @init, %SL{data: %Ring{} = data} = sl) do + node = Node.self() + ring = - Node.self() + node |> HashRing.new() |> get_existing_nodes() :net_kernel.monitor_nodes(true, node_type: :all) - {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :initialized}]} + + {:ok, %SL{sl | data: %Ring{data | node: node, ring: ring}}, + [{:next_event, :internal, :initialized}]} end def handle_resource(@await_quorum, _, @quorum, %SL{data: %Ring{quorum: q, ring: r}} = sl) do @@ -91,15 +97,24 @@ defmodule ClusterKV.Ring do {:get, key}, from, @ready, - %SL{data: %Ring{name: n, requests: reqs, ring: r} = data} = sl + %SL{data: %Ring{name: n, node: node, requests: reqs, ring: r, replicas: repls} = data} = + sl ) do - node = HashRing.key_to_node(r, key) + nodes = HashRing.key_to_nodes(r, key, repls) + + node = + case node in nodes do + true -> node + false -> Enum.random(nodes) + end + ref = make_ref() - dest = {n, node} - send(dest, {:get_key, key, ref, Node.self()}) + send({n, node}, {:get_key, key, ref, Node.self()}) {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from} | reqs]}}, []} end + def handle_call({:get, key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} + def handle_cast( {:put, key, value}, @ready, @@ -110,6 +125,8 @@ defmodule ClusterKV.Ring do {:ok, sl, []} end + def handle_cast({:put, key, value}, _, sl), do: {:ok, sl, []} + def handle_cast({:sync, keys}, @ready, %SL{data: %Ring{db: db}} = sl) do Enum.each(keys, fn {key, value} -> DB.put(db, key, value) @@ -118,6 +135,8 @@ defmodule ClusterKV.Ring do {:ok, sl, []} end + def handle_cast({:sync, keys}, _, sl), do: {:ok, sl, []} + def handle_info({:get_key, key, ref, node}, @ready, %SL{data: %Ring{name: n, db: db}} = sl) do val = DB.get(db, key) send({n, node}, {:reply, val, ref}) -- 2.45.3