]> Entropealabs - cluster_kv.git/commitdiff
adds synchronous update_if function
authorChristopher <chris@entropealabs.com>
Thu, 2 Apr 2020 20:30:17 +0000 (15:30 -0500)
committerChristopher <chris@entropealabs.com>
Thu, 2 Apr 2020 20:30:17 +0000 (15:30 -0500)
lib/cluster_kv.ex
lib/cluster_kv/db.ex
lib/cluster_kv/ring.ex
test/cluster_kv_test.exs

index 623dad75bf1029dfdafa92e952d464d875952bc7..7ec853b1a54313c7ad130a42fbf4d9e707c5975e 100644 (file)
@@ -53,6 +53,7 @@ defmodule ClusterKV do
   defdelegate get(name, keyspace, key, timeout \\ :infinity), to: Ring
   defdelegate put(name, keyspace, key, value), to: Ring
   defdelegate update(name, keyspace, key, value, fun), to: Ring
+  defdelegate update_if(name, keyspace, key, value, fun), to: Ring
   defdelegate batch(name, keyspace, batch), to: Ring
   defdelegate get_wildcard_key(key, split_on, join, wildcard), to: Ring
   defdelegate put_wildcard(name, keyspace, key, value, split_on, join, wildcard), to: Ring
index b0ead963995d26562958d01c00578c113484af81..ad7d9727ab7714bc20ba3b78cd69f9e12df9a778 100644 (file)
@@ -25,6 +25,20 @@ defmodule ClusterKV.DB do
     end)
   end
 
+  @spec update_if(
+          name :: module(),
+          key :: binary(),
+          value :: any(),
+          fun :: fun(),
+          timeout :: non_neg_integer() | :infinity
+        ) ::
+          element() | :not_found
+  def update_if(name, key, value, fun, timeout \\ :infinity) do
+    :poolboy.transaction(name, fn w ->
+      GenServer.call(w, {:update_if, key, value, fun}, timeout)
+    end)
+  end
+
   @spec put(name :: module(), key :: binary(), value :: term()) :: :ok
   def put(name, key, value) do
     :poolboy.transaction(name, fn w ->
@@ -32,7 +46,7 @@ defmodule ClusterKV.DB do
     end)
   end
 
-  @spec upsert(name :: module(), key :: binary(), value :: term(), fun()) :: :ok
+  @spec upsert(name :: module(), key :: binary(), value :: term(), fun :: fun()) :: :ok
   def upsert(name, key, value, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do
     :poolboy.transaction(name, fn w ->
       GenServer.cast(w, {:upsert, key, value, fun})
@@ -71,6 +85,11 @@ defmodule ClusterKV.DB do
     {:reply, do_get(db, key), state}
   end
 
+  def handle_call({:update_if, key, value, fun}, _from, %DB{db: db} = state) do
+    Logger.debug("DB handling UPDATE_IF #{key}")
+    {:reply, do_update_if(db, key, value, fun), state}
+  end
+
   def handle_cast({:put, key, value}, %DB{db: db} = state) do
     Logger.debug("DB handling PUT #{key}")
     :ets.insert(db, {key, [value]})
@@ -158,6 +177,36 @@ defmodule ClusterKV.DB do
     end
   end
 
+  @spec do_update_if(db :: module(), key :: String.t(), value :: any(), fun :: fun()) ::
+          :updated | :noop
+  defp do_update_if(db, key, value, fun) do
+    case :ets.lookup(db, key) do
+      [] ->
+        v =
+          case value do
+            val when is_list(val) -> val
+            v -> [v]
+          end
+
+        :ets.insert(db, {key, v})
+        :updated
+
+      [{_, val} = old] when is_list(val) ->
+        case fun.(old, value) do
+          {:update, new} ->
+            :ets.insert(db, new)
+            :updated
+
+          _ ->
+            :noop
+        end
+
+      [_] ->
+        :ets.insert(db, {key, [value]})
+        :updated
+    end
+  end
+
   @spec do_get(db :: module(), key :: String.t()) :: element() | :not_found
   defp do_get(db, key) do
     case :ets.lookup(db, key) do
index 8b3cf276572f14d83604d221deb8bc3ef3b0362a..df75eb0de0f8a7baf9eaed4b342b9422cc01d824 100644 (file)
@@ -55,6 +55,17 @@ defmodule ClusterKV.Ring do
     :gen_statem.cast(ClusterKV.ring_name(name), {:update, keyspace, key, value, fun})
   end
 
+  @spec update_if(
+          name :: module(),
+          keyspace :: String.t(),
+          key :: String.t(),
+          value :: any(),
+          fun :: fun()
+        ) :: :updated | :noop
+  def update_if(name, keyspace, key, value, fun) do
+    :gen_statem.call(ClusterKV.ring_name(name), {:update_if, keyspace, key, value, fun})
+  end
+
   @spec get_wildcard_key(
           key :: String.t(),
           split_on :: String.t(),
@@ -196,6 +207,19 @@ defmodule ClusterKV.Ring do
     {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, [node], []} | reqs]}}, []}
   end
 
+  def handle_call(
+        {:update_if, keyspace, key, value, fun},
+        from,
+        @ready,
+        %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl
+      ) do
+    key = "#{keyspace}:#{key}"
+    node = get_node(key, r, me, repls)
+    ref = make_ref()
+    send({n, node}, {:update_if, key, value, fun, ref, me})
+    {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, [node], []} | reqs]}}, []}
+  end
+
   def handle_call({:get, _keyspace, _key}, from, _, sl),
     do: {:ok, sl, [{:reply, from, :no_quorum}]}
 
@@ -318,6 +342,15 @@ defmodule ClusterKV.Ring do
     {:ok, sl, []}
   end
 
+  def handle_info(
+        {:update_if, key, value, fun, ref, node},
+        _,
+        %SL{data: %Ring{name: n, db: db, node: me}} = sl
+      ) do
+    send({n, node}, {:reply, DB.update_if(db, key, value, fun), ref, me})
+    {:ok, sl, []}
+  end
+
   def handle_info(
         {:get_wildcard, key, parts, wildcard, ref, node},
         _,
index 2537a534beca910ed548143d4839b6a68d660a28..3335fba3954e133b581e8f99870540243c866722 100644 (file)
@@ -52,4 +52,23 @@ defmodule ClusterKVTest do
 
     assert {"test1:test", [:world, :hello]} = ClusterKV.get(db, @keyspace, "test1:test")
   end
+
+  test "update_if", %{db: db} do
+    assert :updated =
+             ClusterKV.update_if(db, @keyspace, "test.profile", :initial_value, &update_if/2)
+
+    assert :noop =
+             ClusterKV.update_if(db, @keyspace, "test.profile", :initial_value, &update_if/2)
+
+    assert :updated = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2)
+    assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2)
+    assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2)
+  end
+
+  defp update_if({key, [old]}, val) do
+    case old == val do
+      true -> :noop
+      false -> {:update, {key, [val]}}
+    end
+  end
 end