From: Christopher Date: Thu, 2 Apr 2020 20:30:17 +0000 (-0500) Subject: adds synchronous update_if function X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=c0391d2d2c41d0a559dd31dec29cf986b0b7c649;p=cluster_kv.git adds synchronous update_if function --- diff --git a/lib/cluster_kv.ex b/lib/cluster_kv.ex index 623dad7..7ec853b 100644 --- a/lib/cluster_kv.ex +++ b/lib/cluster_kv.ex @@ -53,6 +53,7 @@ defmodule ClusterKV do defdelegate get(name, keyspace, key, timeout \\ :infinity), to: Ring defdelegate put(name, keyspace, key, value), to: Ring defdelegate update(name, keyspace, key, value, fun), to: Ring + defdelegate update_if(name, keyspace, key, value, fun), to: Ring defdelegate batch(name, keyspace, batch), to: Ring defdelegate get_wildcard_key(key, split_on, join, wildcard), to: Ring defdelegate put_wildcard(name, keyspace, key, value, split_on, join, wildcard), to: Ring diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index b0ead96..ad7d972 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -25,6 +25,20 @@ defmodule ClusterKV.DB do end) end + @spec update_if( + name :: module(), + key :: binary(), + value :: any(), + fun :: fun(), + timeout :: non_neg_integer() | :infinity + ) :: + element() | :not_found + def update_if(name, key, value, fun, timeout \\ :infinity) do + :poolboy.transaction(name, fn w -> + GenServer.call(w, {:update_if, key, value, fun}, timeout) + end) + end + @spec put(name :: module(), key :: binary(), value :: term()) :: :ok def put(name, key, value) do :poolboy.transaction(name, fn w -> @@ -32,7 +46,7 @@ defmodule ClusterKV.DB do end) end - @spec upsert(name :: module(), key :: binary(), value :: term(), fun()) :: :ok + @spec upsert(name :: module(), key :: binary(), value :: term(), fun :: fun()) :: :ok def upsert(name, key, value, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do :poolboy.transaction(name, fn w -> GenServer.cast(w, {:upsert, key, value, fun}) @@ -71,6 +85,11 @@ defmodule ClusterKV.DB do {:reply, do_get(db, key), state} end + def handle_call({:update_if, key, value, fun}, _from, %DB{db: db} = state) do + Logger.debug("DB handling UPDATE_IF #{key}") + {:reply, do_update_if(db, key, value, fun), state} + end + def handle_cast({:put, key, value}, %DB{db: db} = state) do Logger.debug("DB handling PUT #{key}") :ets.insert(db, {key, [value]}) @@ -158,6 +177,36 @@ defmodule ClusterKV.DB do end end + @spec do_update_if(db :: module(), key :: String.t(), value :: any(), fun :: fun()) :: + :updated | :noop + defp do_update_if(db, key, value, fun) do + case :ets.lookup(db, key) do + [] -> + v = + case value do + val when is_list(val) -> val + v -> [v] + end + + :ets.insert(db, {key, v}) + :updated + + [{_, val} = old] when is_list(val) -> + case fun.(old, value) do + {:update, new} -> + :ets.insert(db, new) + :updated + + _ -> + :noop + end + + [_] -> + :ets.insert(db, {key, [value]}) + :updated + end + end + @spec do_get(db :: module(), key :: String.t()) :: element() | :not_found defp do_get(db, key) do case :ets.lookup(db, key) do diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 8b3cf27..df75eb0 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -55,6 +55,17 @@ defmodule ClusterKV.Ring do :gen_statem.cast(ClusterKV.ring_name(name), {:update, keyspace, key, value, fun}) end + @spec update_if( + name :: module(), + keyspace :: String.t(), + key :: String.t(), + value :: any(), + fun :: fun() + ) :: :updated | :noop + def update_if(name, keyspace, key, value, fun) do + :gen_statem.call(ClusterKV.ring_name(name), {:update_if, keyspace, key, value, fun}) + end + @spec get_wildcard_key( key :: String.t(), split_on :: String.t(), @@ -196,6 +207,19 @@ defmodule ClusterKV.Ring do {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, [node], []} | reqs]}}, []} end + def handle_call( + {:update_if, keyspace, key, value, fun}, + from, + @ready, + %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl + ) do + key = "#{keyspace}:#{key}" + node = get_node(key, r, me, repls) + ref = make_ref() + send({n, node}, {:update_if, key, value, fun, ref, me}) + {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, [node], []} | reqs]}}, []} + end + def handle_call({:get, _keyspace, _key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} @@ -318,6 +342,15 @@ defmodule ClusterKV.Ring do {:ok, sl, []} end + def handle_info( + {:update_if, key, value, fun, ref, node}, + _, + %SL{data: %Ring{name: n, db: db, node: me}} = sl + ) do + send({n, node}, {:reply, DB.update_if(db, key, value, fun), ref, me}) + {:ok, sl, []} + end + def handle_info( {:get_wildcard, key, parts, wildcard, ref, node}, _, diff --git a/test/cluster_kv_test.exs b/test/cluster_kv_test.exs index 2537a53..3335fba 100644 --- a/test/cluster_kv_test.exs +++ b/test/cluster_kv_test.exs @@ -52,4 +52,23 @@ defmodule ClusterKVTest do assert {"test1:test", [:world, :hello]} = ClusterKV.get(db, @keyspace, "test1:test") end + + test "update_if", %{db: db} do + assert :updated = + ClusterKV.update_if(db, @keyspace, "test.profile", :initial_value, &update_if/2) + + assert :noop = + ClusterKV.update_if(db, @keyspace, "test.profile", :initial_value, &update_if/2) + + assert :updated = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2) + assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2) + assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2) + end + + defp update_if({key, [old]}, val) do + case old == val do + true -> :noop + false -> {:update, {key, [val]}} + end + end end