]> Entropealabs - cluster_kv.git/commitdiff
add keyspace to external API
authorChristopher <chris@entropealabs.com>
Tue, 17 Mar 2020 18:58:08 +0000 (13:58 -0500)
committerChristopher <chris@entropealabs.com>
Tue, 17 Mar 2020 18:58:08 +0000 (13:58 -0500)
lib/cluster_kv.ex
lib/cluster_kv/ring.ex
test/cluster_kv_test.exs

index 138ebd8a8978af0f5de08542a9c991d6dd26b864..dd784374a41826de065e829d9ff5ac7cd969211a 100644 (file)
@@ -50,14 +50,14 @@ defmodule ClusterKV do
   def ring_name(name), do: Module.concat([name, Ring])
   def cluster_supervisor_name(name), do: Module.concat([name, ClusterSupervisor])
 
-  defdelegate get(name, key, timeout \\ :infinity), to: Ring
-  defdelegate put(name, key, value), to: Ring
-  defdelegate update(name, key, value, fun), to: Ring
-  defdelegate batch(name, batch), to: Ring
-  defdelegate put_wildcard(name, header, key, value, split_on, join, wildcard), to: Ring
-  defdelegate prefix(name, key, split_on, min, timeout \\ :infinity), to: Ring
+  defdelegate get(name, keyspace, key, timeout \\ :infinity), to: Ring
+  defdelegate put(name, keyspace, key, value), to: Ring
+  defdelegate update(name, keyspace, key, value, fun), to: Ring
+  defdelegate batch(name, keyspace, batch), to: Ring
+  defdelegate put_wildcard(name, keyspace, key, value, split_on, join, wildcard), to: Ring
+  defdelegate prefix(name, keyspace, key, split_on, min, timeout \\ :infinity), to: Ring
   defdelegate stream(name, timeout \\ :infinity), to: Ring
 
-  defdelegate wildcard(name, header, key, split_on, join, wildcard, timeout \\ :infinity),
+  defdelegate wildcard(name, keyspace, key, split_on, join, wildcard, timeout \\ :infinity),
     to: Ring
 end
index 875e49a685d1ca6ee3d1a8c89612c314f0ad3cd8..7733775ea22b40389525e6b6fb89280e8c10bfeb 100644 (file)
@@ -39,67 +39,79 @@ defmodule ClusterKV.Ring do
   @handle_node_up "HandleNodeUp"
   @handle_node_down "HandleNodeDown"
 
-  @spec put(name :: module(), key :: String.t(), value :: term()) :: :ok
-  def put(name, key, value) do
-    :gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value})
+  @spec put(name :: module(), keyspace :: String.t(), key :: String.t(), value :: term()) :: :ok
+  def put(name, keyspace, key, value) do
+    :gen_statem.cast(ClusterKV.ring_name(name), {:put, keyspace, key, value})
   end
 
-  @spec update(name :: module(), key :: String.t(), value :: any(), fun :: fun()) :: :ok
-  def update(name, key, value, fun) do
-    :gen_statem.cast(ClusterKV.ring_name(name), {:update, key, value, fun})
+  @spec update(
+          name :: module(),
+          keyspace :: String.t(),
+          key :: String.t(),
+          value :: any(),
+          fun :: fun()
+        ) :: :ok
+  def update(name, keyspace, key, value, fun) do
+    :gen_statem.cast(ClusterKV.ring_name(name), {:update, keyspace, key, value, fun})
   end
 
   @spec put_wildcard(
           name :: module(),
-          header :: String.t(),
+          keyspace :: String.t(),
           key :: String.t(),
           value :: term(),
           split_on :: String.t(),
           join :: String.t(),
           wildcard :: String.t()
         ) :: :ok
-  def put_wildcard(name, header, key, value, split_on, join, wildcard) do
+  def put_wildcard(name, keyspace, key, value, split_on, join, wildcard) do
     :gen_statem.cast(
       ClusterKV.ring_name(name),
-      {:put_wildcard, header, key, value, split_on, join, wildcard}
+      {:put_wildcard, keyspace, key, value, split_on, join, wildcard}
     )
   end
 
-  @spec batch(name :: module(), batch :: [DB.element()]) :: :ok
-  def batch(name, batch) do
-    :gen_statem.cast(ClusterKV.ring_name(name), {:batch, batch})
+  @spec batch(name :: module(), keyspace :: String.t(), batch :: [DB.element()]) :: :ok
+  def batch(name, keyspace, batch) do
+    :gen_statem.cast(ClusterKV.ring_name(name), {:batch, keyspace, batch})
   end
 
-  @spec get(name :: module(), key :: String.t(), timeout :: non_neg_integer() | :infinity) ::
+  @spec get(
+          name :: module(),
+          keyspace :: String.t(),
+          key :: String.t(),
+          timeout :: non_neg_integer() | :infinity
+        ) ::
           DB.element() | :not_found
-  def get(name, key, timeout) do
-    :gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout)
+  def get(name, keyspace, key, timeout) do
+    :gen_statem.call(ClusterKV.ring_name(name), {:get, keyspace, key}, timeout)
   end
 
   @spec prefix(
           name :: module(),
+          keyspace :: String.t(),
           key :: String.t(),
           split_on :: String.t(),
           min :: integer(),
           timeout :: non_neg_integer() | :infinity
         ) :: [DB.element()]
-  def prefix(name, key, split_on, min, timeout) do
-    :gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min}, timeout)
+  def prefix(name, keyspace, key, split_on, min, timeout) do
+    :gen_statem.call(ClusterKV.ring_name(name), {:prefix, keyspace, key, split_on, min}, timeout)
   end
 
   @spec wildcard(
           name :: module(),
-          header :: String.t(),
+          keyspace :: String.t(),
           key :: String.t(),
           split_on :: String.t(),
           join :: String.t(),
           wildcard :: String.t(),
           timeout :: non_neg_integer() | :infinity
         ) :: [DB.element()]
-  def wildcard(name, header, key, split_on, join, wildcard, timeout) do
+  def wildcard(name, keyspace, key, split_on, join, wildcard, timeout) do
     :gen_statem.call(
       ClusterKV.ring_name(name),
-      {:wildcard, header, key, split_on, join, wildcard},
+      {:wildcard, keyspace, key, split_on, join, wildcard},
       timeout
     )
   end
@@ -162,26 +174,29 @@ defmodule ClusterKV.Ring do
   end
 
   def handle_call(
-        {:get, key},
+        {:get, keyspace, key},
         from,
         @ready,
         %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl
       ) do
+    key = "#{keyspace}:#{key}"
     node = get_node(key, r, me, repls)
     ref = make_ref()
     send({n, node}, {:get_key, key, ref, me})
     {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, [node], []} | reqs]}}, []}
   end
 
-  def handle_call({:get, _key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
+  def handle_call({:get, _keyspace, _key}, from, _, sl),
+    do: {:ok, sl, [{:reply, from, :no_quorum}]}
 
   def handle_call(
-        {:prefix, key, split_on, min},
+        {:prefix, keyspace, key, split_on, min},
         from,
         @ready,
         %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl
       ) do
     ref = make_ref()
+    key = "#{keyspace}:#{key}"
     parts = String.split(key, split_on)
     head = Enum.slice(parts, 0..(min - 1))
     itr = Enum.slice(parts, min..-2)
@@ -197,10 +212,10 @@ defmodule ClusterKV.Ring do
     {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, nodes, []} | reqs]}}, []}
   end
 
-  def handle_call({:prefix, _, _, _}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
+  def handle_call({:prefix, _, _, _, _}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
 
   def handle_call(
-        {:wildcard, header, key, split_on, join, wildcard},
+        {:wildcard, keyspace, key, split_on, join, wildcard},
         from,
         @ready,
         %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl
@@ -211,7 +226,7 @@ defmodule ClusterKV.Ring do
 
     nodes =
       Enum.map(Enum.with_index(parts), fn {k, i} ->
-        k = Enum.join([header, k, i], join)
+        k = Enum.join([keyspace, k, i], join)
         get_wildcard(k, parts, wildcard, r, me, repls, n, ref)
       end)
 
@@ -221,16 +236,18 @@ defmodule ClusterKV.Ring do
   def handle_call(_, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
 
   def handle_cast(
-        {:batch, batch},
+        {:batch, keyspace, batch},
         _,
         %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
       ) do
     batches =
-      Enum.reduce(batch, %{}, fn {k, _v} = p, acc ->
+      Enum.reduce(batch, %{}, fn {k, v}, acc ->
+        k = "#{keyspace}:#{k}"
         nodes = HashRing.key_to_nodes(r, k, repls)
+        el = {k, v}
 
         Enum.reduce(nodes, acc, fn n, a ->
-          Map.update(a, n, [p], &[p | &1])
+          Map.update(a, n, [el], &[el | &1])
         end)
       end)
 
@@ -239,38 +256,40 @@ defmodule ClusterKV.Ring do
   end
 
   def handle_cast(
-        {:put_wildcard, header, key, value, split_on, join, wildcard},
+        {:put_wildcard, keyspace, key, value, split_on, join, wildcard},
         _,
         %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
       ) do
     key
     |> String.split(split_on)
-    |> send_wildcard(header, wildcard, join, value, n, r, repls)
+    |> send_wildcard(keyspace, wildcard, join, value, n, r, repls)
 
     {:ok, sl, []}
   end
 
   def handle_cast(
-        {:put, key, value},
+        {:put, keyspace, key, value},
         _,
         %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
       ) do
+    key = "#{keyspace}:#{key}"
     nodes = HashRing.key_to_nodes(r, key, repls)
     send_sync(n, nodes, key, value)
     {:ok, sl, []}
   end
 
   def handle_cast(
-        {:update, key, value, fun},
+        {:update, keyspace, key, value, fun},
         _,
         %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
       ) do
+    key = "#{keyspace}:#{key}"
     nodes = HashRing.key_to_nodes(r, key, repls)
     send_update(n, nodes, key, value, fun)
     {:ok, sl, []}
   end
 
-  def handle_cast({:put, _key, _value}, _, sl), do: {:ok, sl, []}
+  def handle_cast({:put, _keyspace, _key, _value}, _, sl), do: {:ok, sl, []}
 
   def handle_cast(_, _, sl), do: {:ok, sl, []}
 
@@ -285,8 +304,7 @@ defmodule ClusterKV.Ring do
   end
 
   def handle_info({:get_key, key, ref, node}, _, %SL{data: %Ring{name: n, db: db, node: me}} = sl) do
-    val = DB.get(db, key)
-    send({n, node}, {:reply, val, ref, me})
+    send({n, node}, {:reply, get_key(db, key), ref, me})
     {:ok, sl, []}
   end
 
@@ -296,7 +314,7 @@ defmodule ClusterKV.Ring do
         %SL{data: %Ring{name: n, db: db, node: me}} = sl
       ) do
     val =
-      case DB.get(db, key) do
+      case get_key(db, key) do
         :not_found ->
           :not_found
 
@@ -383,6 +401,17 @@ defmodule ClusterKV.Ring do
     node
   end
 
+  def get_key(db, key) do
+    case DB.get(db, key) do
+      :not_found ->
+        :not_found
+
+      {id, val} ->
+        [_keyspace, key] = String.split(id, ":", parts: 2)
+        {key, val}
+    end
+  end
+
   @spec get_wildcard(
           key :: String.t(),
           parts :: [String.t()],
@@ -492,7 +521,12 @@ defmodule ClusterKV.Ring do
         _ -> true
       end)
 
-  @spec send_sync(name :: module(), nodes :: [node()], key :: String.t(), value :: any()) :: :ok
+  @spec send_sync(
+          name :: module(),
+          nodes :: [node()],
+          key :: {String.t(), String.t()},
+          value :: any()
+        ) :: :ok
   defp send_sync(name, nodes, key, value) do
     Enum.each(nodes, fn n ->
       send({name, n}, {:sync, key, value})
index db73d396c50b1927243107c424fda87044b3ea5e..86612622d92b20be3321fdbf9a64051983528797 100644 (file)
@@ -9,22 +9,22 @@ defmodule ClusterKVTest do
     ]
   ]
 
-  @header "com.myrealm"
+  @keyspace "com.myrealm"
 
   setup_all do
-    db = ClusterKV.start_link(name: TestCluster, topologies: @topologies, replicas: 1, quorum: 1)
+    ClusterKV.start_link(name: TestCluster, topologies: @topologies, replicas: 1, quorum: 1)
     [db: TestCluster]
   end
 
   test "test", %{db: db} do
     me = {self(), self()}
-    ClusterKV.put(db, "test1", :hello)
-    ClusterKV.put(db, "test1", :cruel)
-    ClusterKV.put(db, "test1", :world)
+    ClusterKV.put(db, @keyspace, "test1:test", :hello)
+    ClusterKV.put(db, @keyspace, "test1:test", :cruel)
+    ClusterKV.put(db, @keyspace, "test1:test", :world)
 
     ClusterKV.put_wildcard(
       db,
-      @header,
+      @keyspace,
       "com.test..temp",
       {1_003_039_494, me},
       ".",
@@ -32,22 +32,22 @@ defmodule ClusterKVTest do
       ""
     )
 
-    assert {"test1", [:world, :cruel, :hello]} = ClusterKV.get(db, "test1")
+    assert {"test1:test", [:world, :cruel, :hello]} = ClusterKV.get(db, @keyspace, "test1:test")
 
     assert [{["com", "test", "", "temp"], {1_003_039_494, me}}] =
              ClusterKV.wildcard(
                db,
-               @header,
+               @keyspace,
                "com.test.data.temp",
                ".",
                ":",
                ""
              )
 
-    ClusterKV.update(db, "test1", :cruel, fn {id, values}, value ->
+    ClusterKV.update(db, @keyspace, "test1:test", :cruel, fn {id, values}, value ->
       {id, List.delete(values, value)}
     end)
 
-    assert {"test1", [:world, :hello]} = ClusterKV.get(db, "test1")
+    assert {"test1:test", [:world, :hello]} = ClusterKV.get(db, @keyspace, "test1:test")
   end
 end