From 5a94bef7b8089e13d5afe0ee53e290d688033a43 Mon Sep 17 00:00:00 2001 From: Christopher Date: Mon, 16 Mar 2020 13:59:39 -0500 Subject: [PATCH] fix wildcards to allow uris in realms --- lib/cluster_kv.ex | 1 + lib/cluster_kv/ring.ex | 32 +++++++++++++++++++++++++++++--- test/cluster_kv_test.exs | 31 +++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/lib/cluster_kv.ex b/lib/cluster_kv.ex index 64b3754..138ebd8 100644 --- a/lib/cluster_kv.ex +++ b/lib/cluster_kv.ex @@ -52,6 +52,7 @@ defmodule ClusterKV do defdelegate get(name, key, timeout \\ :infinity), to: Ring defdelegate put(name, key, value), to: Ring + defdelegate update(name, key, value, fun), to: Ring defdelegate batch(name, batch), to: Ring defdelegate put_wildcard(name, header, key, value, split_on, join, wildcard), to: Ring defdelegate prefix(name, key, split_on, min, timeout \\ :infinity), to: Ring diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index ac6bc31..875e49a 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -44,6 +44,11 @@ defmodule ClusterKV.Ring do :gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value}) end + @spec update(name :: module(), key :: String.t(), value :: any(), fun :: fun()) :: :ok + def update(name, key, value, fun) do + :gen_statem.cast(ClusterKV.ring_name(name), {:update, key, value, fun}) + end + @spec put_wildcard( name :: module(), header :: String.t(), @@ -202,9 +207,7 @@ defmodule ClusterKV.Ring do ) do ref = make_ref() - [_ | [parts]] = String.split(key, join) - - parts = String.split(parts, split_on) + parts = String.split(key, split_on) nodes = Enum.map(Enum.with_index(parts), fn {k, i} -> @@ -257,6 +260,16 @@ defmodule ClusterKV.Ring do {:ok, sl, []} end + def handle_cast( + {:update, key, value, fun}, + _, + %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl + ) do + nodes = HashRing.key_to_nodes(r, key, repls) + send_update(n, nodes, key, value, fun) + {:ok, sl, []} + end + def handle_cast({:put, _key, _value}, _, sl), do: {:ok, sl, []} def handle_cast(_, _, sl), do: {:ok, sl, []} @@ -266,6 +279,11 @@ defmodule ClusterKV.Ring do {:ok, sl, []} end + def handle_info({:update, key, value, fun}, _, %SL{data: %Ring{db: db}} = sl) do + DB.upsert(db, key, value, fun) + {:ok, sl, []} + end + def handle_info({:get_key, key, ref, node}, _, %SL{data: %Ring{name: n, db: db, node: me}} = sl) do val = DB.get(db, key) send({n, node}, {:reply, val, ref, me}) @@ -412,6 +430,7 @@ defmodule ClusterKV.Ring do k = Enum.join([header, k, i], join) nodes = HashRing.key_to_nodes(ring, k, repls) send_sync(name, nodes, k, {parts, value}) + false end) end @@ -480,6 +499,13 @@ defmodule ClusterKV.Ring do end) end + @spec send_sync(name :: module(), nodes :: [node()], key :: String.t(), value :: any()) :: :ok + defp send_update(name, nodes, key, value, fun) do + Enum.each(nodes, fn n -> + send({name, n}, {:update, key, value, fun}) + end) + end + @spec get_existing_nodes(ring :: HashRing.t()) :: HashRing.t() defp get_existing_nodes(ring) do Enum.reduce(Node.list(), ring, fn n, acc -> diff --git a/test/cluster_kv_test.exs b/test/cluster_kv_test.exs index a08e578..db73d39 100644 --- a/test/cluster_kv_test.exs +++ b/test/cluster_kv_test.exs @@ -9,14 +9,45 @@ defmodule ClusterKVTest do ] ] + @header "com.myrealm" + setup_all do db = ClusterKV.start_link(name: TestCluster, topologies: @topologies, replicas: 1, quorum: 1) [db: TestCluster] end test "test", %{db: db} do + me = {self(), self()} ClusterKV.put(db, "test1", :hello) + ClusterKV.put(db, "test1", :cruel) ClusterKV.put(db, "test1", :world) + + ClusterKV.put_wildcard( + db, + @header, + "com.test..temp", + {1_003_039_494, me}, + ".", + ":", + "" + ) + + assert {"test1", [:world, :cruel, :hello]} = ClusterKV.get(db, "test1") + + assert [{["com", "test", "", "temp"], {1_003_039_494, me}}] = + ClusterKV.wildcard( + db, + @header, + "com.test.data.temp", + ".", + ":", + "" + ) + + ClusterKV.update(db, "test1", :cruel, fn {id, values}, value -> + {id, List.delete(values, value)} + end) + assert {"test1", [:world, :hello]} = ClusterKV.get(db, "test1") end end -- 2.45.3