]> Entropealabs - cluster_kv.git/commitdiff
initial wildcard support, it ain't pretty...
authorChristopher <chris@entropealabs.com>
Sat, 7 Mar 2020 22:08:11 +0000 (16:08 -0600)
committerChristopher <chris@entropealabs.com>
Sat, 7 Mar 2020 22:08:11 +0000 (16:08 -0600)
lib/cluster_kv.ex
lib/cluster_kv/db.ex
lib/cluster_kv/ring.ex

index a8fe61bd6bfd889254eb1cf52fb29c54b9c6768b..a8a42fd4afb993627384b7d52bcea982c796b1f0 100644 (file)
@@ -44,6 +44,8 @@ defmodule ClusterKV do
 
   defdelegate get(name, key), to: Ring
   defdelegate put(name, key, value), to: Ring
+  defdelegate put_wildcard(name, header, key, value, split_on, join, wildcard), to: Ring
   defdelegate prefix(name, key, split_on, min), to: Ring
   defdelegate stream(name), to: Ring
+  defdelegate wildcard(name, header, key, split_on, join, wildcard), to: Ring
 end
index 86e5a88223621ab5b4f41092054dc7407fdfd018..ffd69cc91cea33fdfbfd444d7a14d1c209d2825d 100644 (file)
@@ -22,6 +22,11 @@ defmodule ClusterKV.DB do
     GenServer.cast(name, {:put, key, value})
   end
 
+  @spec put_or_update(name :: module(), key :: binary(), value :: term()) :: :ok
+  def put_or_update(name, key, value) do
+    GenServer.cast(name, {:put_or_update, key, value})
+  end
+
   def stream(name) do
     GenServer.call(name, :stream)
   end
@@ -52,4 +57,14 @@ defmodule ClusterKV.DB do
     :ets.insert(db, {key, value})
     {:noreply, state}
   end
+
+  def handle_cast({:put_or_update, key, value}, %DB{db: db} = state) do
+    case :ets.lookup(db, key) do
+      [] -> :ets.insert(db, {key, value})
+      [{_, val}] when is_list(val) -> :ets.insert(db, {key, [value | val]})
+      [{_, val}] -> :ets.insert(db, {key, [value, val]})
+    end
+
+    {:noreply, state}
+  end
 end
index 0e6105082e4af23051ad757fe3eee5897c6a84ad..0cbdb34389ca0fb31e16e1fb45e13b1b48acdd41 100644 (file)
@@ -48,6 +48,13 @@ defmodule ClusterKV.Ring do
     :gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value})
   end
 
+  def put_wildcard(name, header, key, value, split_on, join, wildcard) do
+    :gen_statem.cast(
+      ClusterKV.ring_name(name),
+      {:put_wildcard, header, key, value, split_on, join, wildcard}
+    )
+  end
+
   def get(name, key, timeout \\ 5000) do
     :gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout)
   end
@@ -56,6 +63,13 @@ defmodule ClusterKV.Ring do
     :gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min})
   end
 
+  def wildcard(name, header, key, split_on, join, wildcard) do
+    :gen_statem.call(
+      ClusterKV.ring_name(name),
+      {:wildcard, header, key, split_on, join, wildcard}
+    )
+  end
+
   def stream(name) do
     :gen_statem.call(ClusterKV.ring_name(name), :stream)
   end
@@ -144,6 +158,53 @@ defmodule ClusterKV.Ring do
 
   def handle_call({:prefix, _, _, _}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
 
+  def handle_call(
+        {:wildcard, header, key, split_on, join, wildcard},
+        from,
+        @ready,
+        %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl
+      ) do
+    ref = make_ref()
+
+    [_ | [parts]] = String.split(key, join)
+
+    parts = String.split(parts, split_on)
+
+    Enum.each(Enum.with_index(parts), fn {k, i} ->
+      k = Enum.join([header, k, i], join)
+      get_wildcard(k, parts, wildcard, r, me, repls, n, ref)
+    end)
+
+    {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(parts), []} | reqs]}}, []}
+  end
+
+  def handle_call(_, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
+
+  def handle_cast(
+        {:put_wildcard, header, key, value, split_on, join, wildcard},
+        @ready,
+        %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
+      ) do
+    parts =
+      key
+      |> String.split(split_on)
+
+    Enum.take_while(Enum.with_index(parts), fn {k, i} ->
+      case k == wildcard do
+        true ->
+          true
+
+        false ->
+          k = Enum.join([header, k, i], join)
+          nodes = HashRing.key_to_nodes(r, k, repls)
+          send_sync(n, nodes, k, {parts, value})
+          false
+      end
+    end)
+
+    {:ok, sl, []}
+  end
+
   def handle_cast(
         {:put, key, value},
         @ready,
@@ -156,9 +217,11 @@ defmodule ClusterKV.Ring do
 
   def handle_cast({:put, _key, _value}, _, sl), do: {:ok, sl, []}
 
+  def handle_cast(_, _, sl), do: {:ok, sl, []}
+
   def handle_info({:sync, keys}, @ready, %SL{data: %Ring{db: db}} = sl) do
     Enum.each(keys, fn {key, value} ->
-      DB.put(db, key, value)
+      DB.put_or_update(db, key, value)
     end)
 
     {:ok, sl, []}
@@ -172,6 +235,50 @@ defmodule ClusterKV.Ring do
     {:ok, sl, []}
   end
 
+  def handle_info(
+        {:get_wildcard, key, parts, wildcard, ref, node},
+        @ready,
+        %SL{data: %Ring{name: n, db: db}} = sl
+      ) do
+    vals = DB.get(db, key)
+
+    vals =
+      case vals do
+        :not_found ->
+          :not_found
+
+        vals ->
+          parts = Enum.with_index(parts)
+          {_k, vals} = vals
+
+          vals =
+            case vals do
+              v when is_list(v) -> v
+              v -> [v]
+            end
+
+          Enum.filter(vals, fn {p, _val} ->
+            Enum.all?(parts, fn {k, i} ->
+              case Enum.at(p, i) do
+                ^k -> true
+                w when w == wildcard -> true
+                _ -> false
+              end
+            end)
+          end)
+      end
+
+    vals =
+      case vals do
+        [] -> :not_found
+        [o] -> o
+        o -> o
+      end
+
+    send({n, node}, {:reply, vals, ref})
+    {:ok, sl, []}
+  end
+
   def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do
     {requests, actions} = maybe_reply(ref, reqs, val)
     {:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions}
@@ -205,6 +312,11 @@ defmodule ClusterKV.Ring do
     send({n, node}, {:get_key, prefix, ref, me})
   end
 
+  defp get_wildcard(key, parts, wildcard, r, me, repls, n, ref) do
+    node = get_node(key, r, me, repls)
+    send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me})
+  end
+
   defp get_node(key, r, node, repls) do
     nodes = HashRing.key_to_nodes(r, key, repls)