defdelegate get(name, keyspace, key, timeout \\ :infinity), to: Ring
defdelegate put(name, keyspace, key, value), to: Ring
defdelegate update(name, keyspace, key, value, fun), to: Ring
+ defdelegate update_if(name, keyspace, key, value, fun), to: Ring
defdelegate batch(name, keyspace, batch), to: Ring
defdelegate get_wildcard_key(key, split_on, join, wildcard), to: Ring
defdelegate put_wildcard(name, keyspace, key, value, split_on, join, wildcard), to: Ring
end)
end
+ @spec update_if(
+ name :: module(),
+ key :: binary(),
+ value :: any(),
+ fun :: fun(),
+ timeout :: non_neg_integer() | :infinity
+ ) ::
+ element() | :not_found
+ def update_if(name, key, value, fun, timeout \\ :infinity) do
+ :poolboy.transaction(name, fn w ->
+ GenServer.call(w, {:update_if, key, value, fun}, timeout)
+ end)
+ end
+
@spec put(name :: module(), key :: binary(), value :: term()) :: :ok
def put(name, key, value) do
:poolboy.transaction(name, fn w ->
end)
end
- @spec upsert(name :: module(), key :: binary(), value :: term(), fun()) :: :ok
+ @spec upsert(name :: module(), key :: binary(), value :: term(), fun :: fun()) :: :ok
def upsert(name, key, value, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do
:poolboy.transaction(name, fn w ->
GenServer.cast(w, {:upsert, key, value, fun})
{:reply, do_get(db, key), state}
end
+ def handle_call({:update_if, key, value, fun}, _from, %DB{db: db} = state) do
+ Logger.debug("DB handling UPDATE_IF #{key}")
+ {:reply, do_update_if(db, key, value, fun), state}
+ end
+
def handle_cast({:put, key, value}, %DB{db: db} = state) do
Logger.debug("DB handling PUT #{key}")
:ets.insert(db, {key, [value]})
end
end
+ @spec do_update_if(db :: module(), key :: String.t(), value :: any(), fun :: fun()) ::
+ :updated | :noop
+ defp do_update_if(db, key, value, fun) do
+ case :ets.lookup(db, key) do
+ [] ->
+ v =
+ case value do
+ val when is_list(val) -> val
+ v -> [v]
+ end
+
+ :ets.insert(db, {key, v})
+ :updated
+
+ [{_, val} = old] when is_list(val) ->
+ case fun.(old, value) do
+ {:update, new} ->
+ :ets.insert(db, new)
+ :updated
+
+ _ ->
+ :noop
+ end
+
+ [_] ->
+ :ets.insert(db, {key, [value]})
+ :updated
+ end
+ end
+
@spec do_get(db :: module(), key :: String.t()) :: element() | :not_found
defp do_get(db, key) do
case :ets.lookup(db, key) do
:gen_statem.cast(ClusterKV.ring_name(name), {:update, keyspace, key, value, fun})
end
+ @spec update_if(
+ name :: module(),
+ keyspace :: String.t(),
+ key :: String.t(),
+ value :: any(),
+ fun :: fun()
+ ) :: :updated | :noop
+ def update_if(name, keyspace, key, value, fun) do
+ :gen_statem.call(ClusterKV.ring_name(name), {:update_if, keyspace, key, value, fun})
+ end
+
@spec get_wildcard_key(
key :: String.t(),
split_on :: String.t(),
{:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, [node], []} | reqs]}}, []}
end
+ def handle_call(
+ {:update_if, keyspace, key, value, fun},
+ from,
+ @ready,
+ %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl
+ ) do
+ key = "#{keyspace}:#{key}"
+ node = get_node(key, r, me, repls)
+ ref = make_ref()
+ send({n, node}, {:update_if, key, value, fun, ref, me})
+ {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, [node], []} | reqs]}}, []}
+ end
+
def handle_call({:get, _keyspace, _key}, from, _, sl),
do: {:ok, sl, [{:reply, from, :no_quorum}]}
{:ok, sl, []}
end
+ def handle_info(
+ {:update_if, key, value, fun, ref, node},
+ _,
+ %SL{data: %Ring{name: n, db: db, node: me}} = sl
+ ) do
+ send({n, node}, {:reply, DB.update_if(db, key, value, fun), ref, me})
+ {:ok, sl, []}
+ end
+
def handle_info(
{:get_wildcard, key, parts, wildcard, ref, node},
_,
assert {"test1:test", [:world, :hello]} = ClusterKV.get(db, @keyspace, "test1:test")
end
+
+ test "update_if", %{db: db} do
+ assert :updated =
+ ClusterKV.update_if(db, @keyspace, "test.profile", :initial_value, &update_if/2)
+
+ assert :noop =
+ ClusterKV.update_if(db, @keyspace, "test.profile", :initial_value, &update_if/2)
+
+ assert :updated = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2)
+ assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2)
+ assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2)
+ end
+
+ defp update_if({key, [old]}, val) do
+ case old == val do
+ true -> :noop
+ false -> {:update, {key, [val]}}
+ end
+ end
end