]> Entropealabs - cluster_kv.git/commitdiff
track nodes in requests so we can remove from current requests if node goes down
authorChristopher <chris@entropealabs.com>
Mon, 9 Mar 2020 14:31:40 +0000 (09:31 -0500)
committerChristopher <chris@entropealabs.com>
Mon, 9 Mar 2020 14:31:40 +0000 (09:31 -0500)
lib/cluster_kv/ring.ex

index e2442c9743ecfece458a142cfead7029cb316df0..f189fc0ca4b3ff41b1454698002a1b78736cf93a 100644 (file)
@@ -136,7 +136,7 @@ defmodule ClusterKV.Ring do
     node = get_node(key, r, me, repls)
     ref = make_ref()
     send({n, node}, {:get_key, key, ref, me})
-    {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, 1, []} | reqs]}}, []}
+    {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, [node], []} | reqs]}}, []}
   end
 
   def handle_call({:get, _key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
@@ -151,15 +151,16 @@ defmodule ClusterKV.Ring do
     parts = String.split(key, split_on)
     head = Enum.slice(parts, 0..(min - 1))
     itr = Enum.slice(parts, min..-2)
-    get_prefix(head, r, me, repls, n, ref)
+    node = get_prefix(head, r, me, repls, n, ref)
 
-    Enum.reduce(itr, head, fn ns, acc ->
-      next = acc ++ [ns]
-      get_prefix(next, r, me, repls, n, ref)
-      next
-    end)
+    {_, nodes} =
+      Enum.reduce(itr, {head, [node]}, fn ns, {acc, nodes} ->
+        next = acc ++ [ns]
+        node = get_prefix(next, r, me, repls, n, ref)
+        {next, [node | nodes]}
+      end)
 
-    {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(itr) + 1, []} | reqs]}}, []}
+    {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, nodes, []} | reqs]}}, []}
   end
 
   def handle_call({:prefix, _, _, _}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
@@ -176,12 +177,13 @@ defmodule ClusterKV.Ring do
 
     parts = String.split(parts, split_on)
 
-    Enum.each(Enum.with_index(parts), fn {k, i} ->
-      k = Enum.join([header, k, i], join)
-      get_wildcard(k, parts, wildcard, r, me, repls, n, ref)
-    end)
+    nodes =
+      Enum.map(Enum.with_index(parts), fn {k, i} ->
+        k = Enum.join([header, k, i], join)
+        get_wildcard(k, parts, wildcard, r, me, repls, n, ref)
+      end)
 
-    {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(parts), []} | reqs]}}, []}
+    {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, nodes, []} | reqs]}}, []}
   end
 
   def handle_call(_, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
@@ -235,16 +237,16 @@ defmodule ClusterKV.Ring do
     {:ok, sl, []}
   end
 
-  def handle_info({:get_key, key, ref, node}, _, %SL{data: %Ring{name: n, db: db}} = sl) do
+  def handle_info({:get_key, key, ref, node}, _, %SL{data: %Ring{name: n, db: db, node: me}} = sl) do
     val = DB.get(db, key)
-    send({n, node}, {:reply, val, ref})
+    send({n, node}, {:reply, val, ref, me})
     {:ok, sl, []}
   end
 
   def handle_info(
         {:get_wildcard, key, parts, wildcard, ref, node},
         @ready,
-        %SL{data: %Ring{name: n, db: db}} = sl
+        %SL{data: %Ring{name: n, db: db, node: me}} = sl
       ) do
     val =
       case DB.get(db, key) do
@@ -265,12 +267,12 @@ defmodule ClusterKV.Ring do
           end)
       end
 
-    send({n, node}, {:reply, val, ref})
+    send({n, node}, {:reply, val, ref, me})
     {:ok, sl, []}
   end
 
-  def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do
-    {requests, actions} = maybe_reply(ref, reqs, val)
+  def handle_info({:reply, val, ref, from}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do
+    {requests, actions} = maybe_reply(ref, reqs, val, from)
     {:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions}
   end
 
@@ -288,12 +290,24 @@ defmodule ClusterKV.Ring do
   def handle_info(
         {:nodedown, node, info},
         _,
-        %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl
+        %SL{
+          data: %Ring{name: n, requests: reqs, node: me, db: db, ring: r, replicas: repls} = data
+        } = sl
       ) do
     Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
     ring = HashRing.remove_node(r, node)
     redistribute_data(n, me, db, r, ring, repls)
-    {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
+    reqs = remove_node_from_requests(reqs, node)
+
+    {requests, actions} =
+      Enum.reduce(reqs, {reqs, [{:next_event, :internal, :node_down}]}, fn {ref, _fr, _nodes,
+                                                                            _res},
+                                                                           {rs, acts} ->
+        {requests, actions} = maybe_reply(ref, rs, :not_found, node)
+        {requests, actions ++ acts}
+      end)
+
+    {:ok, %SL{sl | data: %Ring{data | ring: ring, requests: requests}}, actions}
   end
 
   def handle_info({:handle_batch, batch}, _, %SL{data: %Ring{db: db}} = sl) do
@@ -311,18 +325,20 @@ defmodule ClusterKV.Ring do
     prefix = Enum.join(next, ".")
     node = get_node(prefix, r, me, repls)
     send({n, node}, {:get_key, prefix, ref, me})
+    node
   end
 
   defp get_wildcard(key, parts, wildcard, r, me, repls, n, ref) do
     node = get_node(key, r, me, repls)
     send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me})
+    node
   end
 
   def send_batch(name, batches) do
-    Logger.info("Sending batches to: #{inspect(Map.keys(batches))}")
+    Logger.debug("Sending batches to: #{inspect(Map.keys(batches))}")
 
     Enum.each(batches, fn {node, batch} ->
-      Logger.info("Sending #{length(batch)} reccords to #{inspect(node)}")
+      Logger.debug("Sending #{length(batch)} reccords to #{inspect(node)}")
       send({name, node}, {:handle_batch, batch})
     end)
   end
@@ -364,13 +380,16 @@ defmodule ClusterKV.Ring do
     end)
   end
 
-  defp maybe_reply(ref, reqs, val) do
+  defp maybe_reply(ref, reqs, val, node) do
     case get_request(ref, reqs) do
-      {_, from, 1, res} ->
+      {_, from, [_], res} ->
+        {remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]}
+
+      {_, from, [], res} ->
         {remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]}
 
       _ ->
-        {decrement_request(ref, reqs, val), []}
+        {decrement_request(ref, reqs, val, node), []}
     end
   end
 
@@ -392,9 +411,15 @@ defmodule ClusterKV.Ring do
     end)
   end
 
-  defp decrement_request(ref, reqs, val) do
+  defp remove_node_from_requests(reqs, node) do
+    Enum.map(reqs, fn {ref, fr, nodes, res} ->
+      {ref, fr, List.delete(nodes, node), res}
+    end)
+  end
+
+  defp decrement_request(ref, reqs, val, from) do
     Enum.map(reqs, fn
-      {^ref, f, c, res} -> {ref, f, c - 1, [val | res]}
+      {^ref, f, c, res} -> {ref, f, List.delete(c, from), [val | res]}
       other -> other
     end)
   end