From: Christopher Date: Tue, 17 Mar 2020 18:58:08 +0000 (-0500) Subject: add keyspace to external API X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=b169f82084f1218134407ab745b0734bbc5ef69c;p=cluster_kv.git add keyspace to external API --- diff --git a/lib/cluster_kv.ex b/lib/cluster_kv.ex index 138ebd8..dd78437 100644 --- a/lib/cluster_kv.ex +++ b/lib/cluster_kv.ex @@ -50,14 +50,14 @@ defmodule ClusterKV do def ring_name(name), do: Module.concat([name, Ring]) def cluster_supervisor_name(name), do: Module.concat([name, ClusterSupervisor]) - defdelegate get(name, key, timeout \\ :infinity), to: Ring - defdelegate put(name, key, value), to: Ring - defdelegate update(name, key, value, fun), to: Ring - defdelegate batch(name, batch), to: Ring - defdelegate put_wildcard(name, header, key, value, split_on, join, wildcard), to: Ring - defdelegate prefix(name, key, split_on, min, timeout \\ :infinity), to: Ring + defdelegate get(name, keyspace, key, timeout \\ :infinity), to: Ring + defdelegate put(name, keyspace, key, value), to: Ring + defdelegate update(name, keyspace, key, value, fun), to: Ring + defdelegate batch(name, keyspace, batch), to: Ring + defdelegate put_wildcard(name, keyspace, key, value, split_on, join, wildcard), to: Ring + defdelegate prefix(name, keyspace, key, split_on, min, timeout \\ :infinity), to: Ring defdelegate stream(name, timeout \\ :infinity), to: Ring - defdelegate wildcard(name, header, key, split_on, join, wildcard, timeout \\ :infinity), + defdelegate wildcard(name, keyspace, key, split_on, join, wildcard, timeout \\ :infinity), to: Ring end diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 875e49a..7733775 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -39,67 +39,79 @@ defmodule ClusterKV.Ring do @handle_node_up "HandleNodeUp" @handle_node_down "HandleNodeDown" - @spec put(name :: module(), key :: String.t(), value :: term()) :: :ok - def put(name, key, value) do - :gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value}) + @spec put(name :: module(), keyspace :: String.t(), key :: String.t(), value :: term()) :: :ok + def put(name, keyspace, key, value) do + :gen_statem.cast(ClusterKV.ring_name(name), {:put, keyspace, key, value}) end - @spec update(name :: module(), key :: String.t(), value :: any(), fun :: fun()) :: :ok - def update(name, key, value, fun) do - :gen_statem.cast(ClusterKV.ring_name(name), {:update, key, value, fun}) + @spec update( + name :: module(), + keyspace :: String.t(), + key :: String.t(), + value :: any(), + fun :: fun() + ) :: :ok + def update(name, keyspace, key, value, fun) do + :gen_statem.cast(ClusterKV.ring_name(name), {:update, keyspace, key, value, fun}) end @spec put_wildcard( name :: module(), - header :: String.t(), + keyspace :: String.t(), key :: String.t(), value :: term(), split_on :: String.t(), join :: String.t(), wildcard :: String.t() ) :: :ok - def put_wildcard(name, header, key, value, split_on, join, wildcard) do + def put_wildcard(name, keyspace, key, value, split_on, join, wildcard) do :gen_statem.cast( ClusterKV.ring_name(name), - {:put_wildcard, header, key, value, split_on, join, wildcard} + {:put_wildcard, keyspace, key, value, split_on, join, wildcard} ) end - @spec batch(name :: module(), batch :: [DB.element()]) :: :ok - def batch(name, batch) do - :gen_statem.cast(ClusterKV.ring_name(name), {:batch, batch}) + @spec batch(name :: module(), keyspace :: String.t(), batch :: [DB.element()]) :: :ok + def batch(name, keyspace, batch) do + :gen_statem.cast(ClusterKV.ring_name(name), {:batch, keyspace, batch}) end - @spec get(name :: module(), key :: String.t(), timeout :: non_neg_integer() | :infinity) :: + @spec get( + name :: module(), + keyspace :: String.t(), + key :: String.t(), + timeout :: non_neg_integer() | :infinity + ) :: DB.element() | :not_found - def get(name, key, timeout) do - :gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout) + def get(name, keyspace, key, timeout) do + :gen_statem.call(ClusterKV.ring_name(name), {:get, keyspace, key}, timeout) end @spec prefix( name :: module(), + keyspace :: String.t(), key :: String.t(), split_on :: String.t(), min :: integer(), timeout :: non_neg_integer() | :infinity ) :: [DB.element()] - def prefix(name, key, split_on, min, timeout) do - :gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min}, timeout) + def prefix(name, keyspace, key, split_on, min, timeout) do + :gen_statem.call(ClusterKV.ring_name(name), {:prefix, keyspace, key, split_on, min}, timeout) end @spec wildcard( name :: module(), - header :: String.t(), + keyspace :: String.t(), key :: String.t(), split_on :: String.t(), join :: String.t(), wildcard :: String.t(), timeout :: non_neg_integer() | :infinity ) :: [DB.element()] - def wildcard(name, header, key, split_on, join, wildcard, timeout) do + def wildcard(name, keyspace, key, split_on, join, wildcard, timeout) do :gen_statem.call( ClusterKV.ring_name(name), - {:wildcard, header, key, split_on, join, wildcard}, + {:wildcard, keyspace, key, split_on, join, wildcard}, timeout ) end @@ -162,26 +174,29 @@ defmodule ClusterKV.Ring do end def handle_call( - {:get, key}, + {:get, keyspace, key}, from, @ready, %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl ) do + key = "#{keyspace}:#{key}" node = get_node(key, r, me, repls) ref = make_ref() send({n, node}, {:get_key, key, ref, me}) {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, [node], []} | reqs]}}, []} end - def handle_call({:get, _key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} + def handle_call({:get, _keyspace, _key}, from, _, sl), + do: {:ok, sl, [{:reply, from, :no_quorum}]} def handle_call( - {:prefix, key, split_on, min}, + {:prefix, keyspace, key, split_on, min}, from, @ready, %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl ) do ref = make_ref() + key = "#{keyspace}:#{key}" parts = String.split(key, split_on) head = Enum.slice(parts, 0..(min - 1)) itr = Enum.slice(parts, min..-2) @@ -197,10 +212,10 @@ defmodule ClusterKV.Ring do {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, nodes, []} | reqs]}}, []} end - def handle_call({:prefix, _, _, _}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} + def handle_call({:prefix, _, _, _, _}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} def handle_call( - {:wildcard, header, key, split_on, join, wildcard}, + {:wildcard, keyspace, key, split_on, join, wildcard}, from, @ready, %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl @@ -211,7 +226,7 @@ defmodule ClusterKV.Ring do nodes = Enum.map(Enum.with_index(parts), fn {k, i} -> - k = Enum.join([header, k, i], join) + k = Enum.join([keyspace, k, i], join) get_wildcard(k, parts, wildcard, r, me, repls, n, ref) end) @@ -221,16 +236,18 @@ defmodule ClusterKV.Ring do def handle_call(_, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} def handle_cast( - {:batch, batch}, + {:batch, keyspace, batch}, _, %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl ) do batches = - Enum.reduce(batch, %{}, fn {k, _v} = p, acc -> + Enum.reduce(batch, %{}, fn {k, v}, acc -> + k = "#{keyspace}:#{k}" nodes = HashRing.key_to_nodes(r, k, repls) + el = {k, v} Enum.reduce(nodes, acc, fn n, a -> - Map.update(a, n, [p], &[p | &1]) + Map.update(a, n, [el], &[el | &1]) end) end) @@ -239,38 +256,40 @@ defmodule ClusterKV.Ring do end def handle_cast( - {:put_wildcard, header, key, value, split_on, join, wildcard}, + {:put_wildcard, keyspace, key, value, split_on, join, wildcard}, _, %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl ) do key |> String.split(split_on) - |> send_wildcard(header, wildcard, join, value, n, r, repls) + |> send_wildcard(keyspace, wildcard, join, value, n, r, repls) {:ok, sl, []} end def handle_cast( - {:put, key, value}, + {:put, keyspace, key, value}, _, %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl ) do + key = "#{keyspace}:#{key}" nodes = HashRing.key_to_nodes(r, key, repls) send_sync(n, nodes, key, value) {:ok, sl, []} end def handle_cast( - {:update, key, value, fun}, + {:update, keyspace, key, value, fun}, _, %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl ) do + key = "#{keyspace}:#{key}" nodes = HashRing.key_to_nodes(r, key, repls) send_update(n, nodes, key, value, fun) {:ok, sl, []} end - def handle_cast({:put, _key, _value}, _, sl), do: {:ok, sl, []} + def handle_cast({:put, _keyspace, _key, _value}, _, sl), do: {:ok, sl, []} def handle_cast(_, _, sl), do: {:ok, sl, []} @@ -285,8 +304,7 @@ defmodule ClusterKV.Ring do end def handle_info({:get_key, key, ref, node}, _, %SL{data: %Ring{name: n, db: db, node: me}} = sl) do - val = DB.get(db, key) - send({n, node}, {:reply, val, ref, me}) + send({n, node}, {:reply, get_key(db, key), ref, me}) {:ok, sl, []} end @@ -296,7 +314,7 @@ defmodule ClusterKV.Ring do %SL{data: %Ring{name: n, db: db, node: me}} = sl ) do val = - case DB.get(db, key) do + case get_key(db, key) do :not_found -> :not_found @@ -383,6 +401,17 @@ defmodule ClusterKV.Ring do node end + def get_key(db, key) do + case DB.get(db, key) do + :not_found -> + :not_found + + {id, val} -> + [_keyspace, key] = String.split(id, ":", parts: 2) + {key, val} + end + end + @spec get_wildcard( key :: String.t(), parts :: [String.t()], @@ -492,7 +521,12 @@ defmodule ClusterKV.Ring do _ -> true end) - @spec send_sync(name :: module(), nodes :: [node()], key :: String.t(), value :: any()) :: :ok + @spec send_sync( + name :: module(), + nodes :: [node()], + key :: {String.t(), String.t()}, + value :: any() + ) :: :ok defp send_sync(name, nodes, key, value) do Enum.each(nodes, fn n -> send({name, n}, {:sync, key, value}) diff --git a/test/cluster_kv_test.exs b/test/cluster_kv_test.exs index db73d39..8661262 100644 --- a/test/cluster_kv_test.exs +++ b/test/cluster_kv_test.exs @@ -9,22 +9,22 @@ defmodule ClusterKVTest do ] ] - @header "com.myrealm" + @keyspace "com.myrealm" setup_all do - db = ClusterKV.start_link(name: TestCluster, topologies: @topologies, replicas: 1, quorum: 1) + ClusterKV.start_link(name: TestCluster, topologies: @topologies, replicas: 1, quorum: 1) [db: TestCluster] end test "test", %{db: db} do me = {self(), self()} - ClusterKV.put(db, "test1", :hello) - ClusterKV.put(db, "test1", :cruel) - ClusterKV.put(db, "test1", :world) + ClusterKV.put(db, @keyspace, "test1:test", :hello) + ClusterKV.put(db, @keyspace, "test1:test", :cruel) + ClusterKV.put(db, @keyspace, "test1:test", :world) ClusterKV.put_wildcard( db, - @header, + @keyspace, "com.test..temp", {1_003_039_494, me}, ".", @@ -32,22 +32,22 @@ defmodule ClusterKVTest do "" ) - assert {"test1", [:world, :cruel, :hello]} = ClusterKV.get(db, "test1") + assert {"test1:test", [:world, :cruel, :hello]} = ClusterKV.get(db, @keyspace, "test1:test") assert [{["com", "test", "", "temp"], {1_003_039_494, me}}] = ClusterKV.wildcard( db, - @header, + @keyspace, "com.test.data.temp", ".", ":", "" ) - ClusterKV.update(db, "test1", :cruel, fn {id, values}, value -> + ClusterKV.update(db, @keyspace, "test1:test", :cruel, fn {id, values}, value -> {id, List.delete(values, value)} end) - assert {"test1", [:world, :hello]} = ClusterKV.get(db, "test1") + assert {"test1:test", [:world, :hello]} = ClusterKV.get(db, @keyspace, "test1:test") end end