]> Entropealabs - cluster_kv.git/commitdiff
adds prefix lookup
authorChristopher <chris@entropealabs.com>
Sat, 7 Mar 2020 02:46:21 +0000 (20:46 -0600)
committerChristopher <chris@entropealabs.com>
Sat, 7 Mar 2020 02:46:21 +0000 (20:46 -0600)
lib/cluster_kv/db.ex
lib/cluster_kv/ring.ex

index fcb7c522210a86ab55a98258b2af0b975a5854f1..f49cc399c07093e5b88baca6128fce48251480b0 100644 (file)
@@ -55,10 +55,7 @@ defmodule ClusterKV.DB do
   end
 
   def handle_call({:get_prefix, prefix}, _from, %DB{db: db} = state) do
-    {:reply,
-     db
-     |> :rocksdb.get(prefix, [])
-     |> deserialize(), state}
+    {:reply, seek_iterator(db, prefix), state}
   end
 
   def handle_cast({:put, key, value}, %DB{db: db} = state) do
@@ -66,6 +63,29 @@ defmodule ClusterKV.DB do
     {:noreply, state}
   end
 
+  def seek_iterator(db, prefix, acc \\ []) do
+    {:ok, itr} = :rocksdb.iterator(db, [])
+    seek_loop(:rocksdb.iterator_move(itr, {:seek, prefix}), prefix, itr, acc)
+  end
+
+  def seek_loop({:error, :iterator_closed}, _pre, _itr, acc), do: acc
+  def seek_loop({:error, :invalid_iterator}, _pre, _itr, acc), do: acc
+
+  def seek_loop({:ok, key, val}, prefix, itr, acc) do
+    case String.starts_with?(key, prefix) do
+      true ->
+        seek_loop(
+          :rocksdb.iterator_move(itr, :next),
+          prefix,
+          itr,
+          [{key, :erlang.binary_to_term(val)} | acc]
+        )
+
+      false ->
+        acc
+    end
+  end
+
   def deserialize({:ok, val}), do: {:ok, :erlang.binary_to_term(val)}
   def deserialize(other), do: other
 
index a861ca583ae39b72ba86f5cd58b81e07552d60e3..160281c98f71757d85e1af733031350395a5be52 100644 (file)
@@ -109,12 +109,30 @@ defmodule ClusterKV.Ring do
       end
 
     ref = make_ref()
-    send({n, node}, {:get_key, key, ref, Node.self()})
-    {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from} | reqs]}}, []}
+    send({n, node}, {:get_key, key, ref, node})
+    {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, 1, []} | reqs]}}, []}
   end
 
   def handle_call({:get, _key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
 
+  def handle_call(
+        {:prefix, prefix},
+        from,
+        @ready,
+        %SL{data: %Ring{name: n, requests: reqs, ring: r} = data} = sl
+      ) do
+    nodes = HashRing.nodes(r)
+    ref = make_ref()
+
+    Enum.each(nodes, fn node ->
+      send({n, node}, {:get_prefix, prefix, ref, node})
+    end)
+
+    {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(nodes), []} | reqs]}}, []}
+  end
+
+  def handle_call({:prefix, _prefix}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
+
   def handle_cast(
         {:put, key, value},
         @ready,
@@ -143,11 +161,19 @@ defmodule ClusterKV.Ring do
     {:ok, sl, []}
   end
 
-  def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do
-    {_, from} = get_request(ref, reqs)
+  def handle_info(
+        {:get_prefix, prefix, ref, node},
+        @ready,
+        %SL{data: %Ring{name: n, db: db}} = sl
+      ) do
+    val = DB.get_prefix(db, prefix)
+    send({n, node}, {:reply, val, ref})
+    {:ok, sl, []}
+  end
 
-    {:ok, %SL{sl | data: %Ring{data | requests: remove_request(ref, reqs)}},
-     [{:reply, from, val}]}
+  def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do
+    {requests, actions} = maybe_reply(ref, reqs, val)
+    {:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions}
   end
 
   def handle_info({:nodeup, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do
@@ -164,17 +190,39 @@ defmodule ClusterKV.Ring do
      [{:next_event, :internal, :node_down}]}
   end
 
+  defp maybe_reply(ref, reqs, val) do
+    case get_request(ref, reqs) do
+      {_, from, 1, res} ->
+        {remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]}
+
+      _ ->
+        {decrement_request(ref, reqs, val), []}
+    end
+  end
+
+  defp get_final_value([], val), do: val
+
+  defp get_final_value(res, val) when is_list(res),
+    do: {:ok, MapSet.new(List.flatten([val | res]))}
+
   defp get_request(ref, reqs) do
     Enum.find(reqs, fn
-      {^ref, _} -> true
+      {^ref, _, _, _} -> true
       _ -> false
     end)
   end
 
+  defp decrement_request(ref, reqs, val) do
+    Enum.map(reqs, fn
+      {^ref, f, c, res} -> {ref, f, c - 1, [val | res]}
+      other -> other
+    end)
+  end
+
   defp remove_request(ref, requests) do
     Enum.filter(requests, fn
-      {^ref, _} -> false
-      {_, _} -> true
+      {^ref, _, _, _} -> false
+      {_, _, _, _} -> true
     end)
   end