]> Entropealabs - cluster_kv.git/commitdiff
use local node if available for gets
authorChristopher <chris@entropealabs.com>
Fri, 6 Mar 2020 23:47:51 +0000 (17:47 -0600)
committerChristopher <chris@entropealabs.com>
Fri, 6 Mar 2020 23:47:51 +0000 (17:47 -0600)
lib/cluster_kv/db.ex
lib/cluster_kv/ring.ex

index 628c91e2e5e401c20fb370057b2ac8156e9cd3a8..fcb7c522210a86ab55a98258b2af0b975a5854f1 100644 (file)
@@ -42,6 +42,7 @@ defmodule ClusterKV.DB do
   end
 
   def init(db_path) when is_list(db_path) do
+    Logger.info("Opening database at #{db_path}")
     {:ok, db} = :rocksdb.open(db_path, @db_options)
     {:ok, %DB{db: db}}
   end
index 701bc94d70dc934a67374c7d047cf498f3ce0965..2192fca07cfdcd6d8bf1094884a3280f32cce361 100644 (file)
@@ -12,6 +12,7 @@ defmodule ClusterKV.Ring do
     :quorum,
     :ring,
     :db,
+    :node,
     requests: [],
     anti_entropy_interval: 5 * 60_000
   ]
@@ -20,6 +21,7 @@ defmodule ClusterKV.Ring do
           replicas: integer(),
           quorum: integer(),
           db: module(),
+          node: node(),
           ring: HashRing.t(),
           anti_entropy_interval: integer()
         }
@@ -55,13 +57,17 @@ defmodule ClusterKV.Ring do
   end
 
   def handle_resource(@handle_init, _, @init, %SL{data: %Ring{} = data} = sl) do
+    node = Node.self()
+
     ring =
-      Node.self()
+      node
       |> HashRing.new()
       |> get_existing_nodes()
 
     :net_kernel.monitor_nodes(true, node_type: :all)
-    {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :initialized}]}
+
+    {:ok, %SL{sl | data: %Ring{data | node: node, ring: ring}},
+     [{:next_event, :internal, :initialized}]}
   end
 
   def handle_resource(@await_quorum, _, @quorum, %SL{data: %Ring{quorum: q, ring: r}} = sl) do
@@ -91,15 +97,24 @@ defmodule ClusterKV.Ring do
         {:get, key},
         from,
         @ready,
-        %SL{data: %Ring{name: n, requests: reqs, ring: r} = data} = sl
+        %SL{data: %Ring{name: n, node: node, requests: reqs, ring: r, replicas: repls} = data} =
+          sl
       ) do
-    node = HashRing.key_to_node(r, key)
+    nodes = HashRing.key_to_nodes(r, key, repls)
+
+    node =
+      case node in nodes do
+        true -> node
+        false -> Enum.random(nodes)
+      end
+
     ref = make_ref()
-    dest = {n, node}
-    send(dest, {:get_key, key, ref, Node.self()})
+    send({n, node}, {:get_key, key, ref, Node.self()})
     {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from} | reqs]}}, []}
   end
 
+  def handle_call({:get, key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
+
   def handle_cast(
         {:put, key, value},
         @ready,
@@ -110,6 +125,8 @@ defmodule ClusterKV.Ring do
     {:ok, sl, []}
   end
 
+  def handle_cast({:put, key, value}, _, sl), do: {:ok, sl, []}
+
   def handle_cast({:sync, keys}, @ready, %SL{data: %Ring{db: db}} = sl) do
     Enum.each(keys, fn {key, value} ->
       DB.put(db, key, value)
@@ -118,6 +135,8 @@ defmodule ClusterKV.Ring do
     {:ok, sl, []}
   end
 
+  def handle_cast({:sync, keys}, _, sl), do: {:ok, sl, []}
+
   def handle_info({:get_key, key, ref, node}, @ready, %SL{data: %Ring{name: n, db: db}} = sl) do
     val = DB.get(db, key)
     send({n, node}, {:reply, val, ref})