]> Entropealabs - cluster_kv.git/commitdiff
fix wildcards to allow uris in realms
authorChristopher <chris@entropealabs.com>
Mon, 16 Mar 2020 18:59:39 +0000 (13:59 -0500)
committerChristopher <chris@entropealabs.com>
Mon, 16 Mar 2020 18:59:39 +0000 (13:59 -0500)
lib/cluster_kv.ex
lib/cluster_kv/ring.ex
test/cluster_kv_test.exs

index 64b3754fc93e5bde23df732190da1f72713ca5f6..138ebd8a8978af0f5de08542a9c991d6dd26b864 100644 (file)
@@ -52,6 +52,7 @@ defmodule ClusterKV do
 
   defdelegate get(name, key, timeout \\ :infinity), to: Ring
   defdelegate put(name, key, value), to: Ring
+  defdelegate update(name, key, value, fun), to: Ring
   defdelegate batch(name, batch), to: Ring
   defdelegate put_wildcard(name, header, key, value, split_on, join, wildcard), to: Ring
   defdelegate prefix(name, key, split_on, min, timeout \\ :infinity), to: Ring
index ac6bc31a7e9c4cddb812a10c52e95bddc87e0581..875e49a685d1ca6ee3d1a8c89612c314f0ad3cd8 100644 (file)
@@ -44,6 +44,11 @@ defmodule ClusterKV.Ring do
     :gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value})
   end
 
+  @spec update(name :: module(), key :: String.t(), value :: any(), fun :: fun()) :: :ok
+  def update(name, key, value, fun) do
+    :gen_statem.cast(ClusterKV.ring_name(name), {:update, key, value, fun})
+  end
+
   @spec put_wildcard(
           name :: module(),
           header :: String.t(),
@@ -202,9 +207,7 @@ defmodule ClusterKV.Ring do
       ) do
     ref = make_ref()
 
-    [_ | [parts]] = String.split(key, join)
-
-    parts = String.split(parts, split_on)
+    parts = String.split(key, split_on)
 
     nodes =
       Enum.map(Enum.with_index(parts), fn {k, i} ->
@@ -257,6 +260,16 @@ defmodule ClusterKV.Ring do
     {:ok, sl, []}
   end
 
+  def handle_cast(
+        {:update, key, value, fun},
+        _,
+        %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
+      ) do
+    nodes = HashRing.key_to_nodes(r, key, repls)
+    send_update(n, nodes, key, value, fun)
+    {:ok, sl, []}
+  end
+
   def handle_cast({:put, _key, _value}, _, sl), do: {:ok, sl, []}
 
   def handle_cast(_, _, sl), do: {:ok, sl, []}
@@ -266,6 +279,11 @@ defmodule ClusterKV.Ring do
     {:ok, sl, []}
   end
 
+  def handle_info({:update, key, value, fun}, _, %SL{data: %Ring{db: db}} = sl) do
+    DB.upsert(db, key, value, fun)
+    {:ok, sl, []}
+  end
+
   def handle_info({:get_key, key, ref, node}, _, %SL{data: %Ring{name: n, db: db, node: me}} = sl) do
     val = DB.get(db, key)
     send({n, node}, {:reply, val, ref, me})
@@ -412,6 +430,7 @@ defmodule ClusterKV.Ring do
         k = Enum.join([header, k, i], join)
         nodes = HashRing.key_to_nodes(ring, k, repls)
         send_sync(name, nodes, k, {parts, value})
+        false
     end)
   end
 
@@ -480,6 +499,13 @@ defmodule ClusterKV.Ring do
     end)
   end
 
+  @spec send_sync(name :: module(), nodes :: [node()], key :: String.t(), value :: any()) :: :ok
+  defp send_update(name, nodes, key, value, fun) do
+    Enum.each(nodes, fn n ->
+      send({name, n}, {:update, key, value, fun})
+    end)
+  end
+
   @spec get_existing_nodes(ring :: HashRing.t()) :: HashRing.t()
   defp get_existing_nodes(ring) do
     Enum.reduce(Node.list(), ring, fn n, acc ->
index a08e5787631ed26f2a193bc86ffc6ad909e0355a..db73d396c50b1927243107c424fda87044b3ea5e 100644 (file)
@@ -9,14 +9,45 @@ defmodule ClusterKVTest do
     ]
   ]
 
+  @header "com.myrealm"
+
   setup_all do
     db = ClusterKV.start_link(name: TestCluster, topologies: @topologies, replicas: 1, quorum: 1)
     [db: TestCluster]
   end
 
   test "test", %{db: db} do
+    me = {self(), self()}
     ClusterKV.put(db, "test1", :hello)
+    ClusterKV.put(db, "test1", :cruel)
     ClusterKV.put(db, "test1", :world)
+
+    ClusterKV.put_wildcard(
+      db,
+      @header,
+      "com.test..temp",
+      {1_003_039_494, me},
+      ".",
+      ":",
+      ""
+    )
+
+    assert {"test1", [:world, :cruel, :hello]} = ClusterKV.get(db, "test1")
+
+    assert [{["com", "test", "", "temp"], {1_003_039_494, me}}] =
+             ClusterKV.wildcard(
+               db,
+               @header,
+               "com.test.data.temp",
+               ".",
+               ":",
+               ""
+             )
+
+    ClusterKV.update(db, "test1", :cruel, fn {id, values}, value ->
+      {id, List.delete(values, value)}
+    end)
+
     assert {"test1", [:world, :hello]} = ClusterKV.get(db, "test1")
   end
 end