From: Christopher Coté Date: Wed, 25 Nov 2020 03:09:15 +0000 (-0600) Subject: use local functions for update lambdas X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=40710a9296fa9ca985aa3ccefb7f5e648c458b40;p=cluster_kv.git use local functions for update lambdas --- diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index f86555a..573aa9d 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -9,6 +9,17 @@ defmodule ClusterKV.DB do @type element :: {key :: String.t(), value :: any()} | :not_found + @type update_function :: + :diff + | :not_in + | :keystore + | :replace + | :replace_list + | :last_30 + | :unique + | :remove + | fun() + @type t :: %__MODULE__{ db: :ets.tid() | atom(), batch: [element()], @@ -30,7 +41,7 @@ defmodule ClusterKV.DB do name :: module(), key :: binary(), value :: any(), - fun :: fun(), + fun :: update_function(), timeout :: non_neg_integer() | :infinity ) :: element() | :not_found @@ -47,14 +58,20 @@ defmodule ClusterKV.DB do end) end - @spec upsert(name :: module(), key :: binary(), value :: term(), fun :: fun()) :: :ok + @spec upsert(name :: module(), key :: binary(), value :: term(), fun :: update_function()) :: + :ok def upsert(name, key, value, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do :poolboy.transaction(name, fn w -> GenServer.cast(w, {:upsert, key, value, fun}) end) end - @spec batch(name :: module(), batch :: [element()], chunk :: integer(), fun :: fun()) :: :ok + @spec batch( + name :: module(), + batch :: [element()], + chunk :: integer(), + fun :: update_function() + ) :: :ok def batch(name, batch, chunk \\ 10, fun \\ &{elem(&1, 0), &2}) do :poolboy.transaction(name, fn w -> GenServer.cast(w, {:batch, batch, chunk, fun}) @@ -135,7 +152,7 @@ defmodule ClusterKV.DB do batch :: [element()], chunk :: integer(), last_batch :: integer(), - fun :: fun() + fun :: update_function() ) :: {[element()], integer(), reference()} defp handle_next_batch_chunk(db, batch, chunk, last_batch, fun) do Logger.debug("processing batch of #{length(batch)} from #{last_batch} with chunk of #{chunk}") @@ -157,9 +174,11 @@ defmodule ClusterKV.DB do end end - @spec do_upsert(db :: module(), key :: String.t(), value :: any(), fun :: fun()) :: true + @spec do_upsert(db :: module(), key :: String.t(), value :: any(), fun :: update_function()) :: + true defp do_upsert(db, key, value, fun) do get_lock(db, key) + fun = get_update_function(fun) case :ets.lookup(db, key) do [] -> @@ -182,10 +201,11 @@ defmodule ClusterKV.DB do release_lock(db, key) end - @spec do_update_if(db :: module(), key :: String.t(), value :: any(), fun :: fun()) :: + @spec do_update_if(db :: module(), key :: String.t(), value :: any(), fun :: update_function()) :: :updated | :noop defp do_update_if(db, key, value, fun) do get_lock(db, key) + fun = get_update_function(fun) case :ets.lookup(db, key) do [] -> @@ -218,6 +238,38 @@ defmodule ClusterKV.DB do end end + defp get_update_function(:diff), do: &diff/2 + defp get_update_function(:not_in), do: ¬_in/2 + defp get_update_function(:keystore), do: &keystore/2 + defp get_update_function(:replace), do: &replace/2 + defp get_update_function(:replace_list), do: &replace_list/2 + defp get_update_function(:last_30), do: &last_30/2 + defp get_update_function(:unique), do: &unique/2 + defp get_update_function(:remove), do: &remove/2 + defp get_update_function(fun) when is_function(fun), do: fun + defp get_update_function(_), do: fn _, _ -> :noop end + + defp diff({key, [old]}, val) do + case old == val do + true -> :noop + false -> {:update, {key, [val]}} + end + end + + defp not_in({key, old_vals}, val) do + case val in old_vals do + true -> :noop + false -> {:update, {key, Enum.slice([val | old_vals], 0..10)}} + end + end + + defp keystore({k, old}, {key, _} = new), do: {k, List.keystore(old, key, 0, new)} + defp replace({k, _old}, value), do: {k, [value]} + defp replace_list({k, _old}, value), do: {k, value} + defp last_30({k, old}, value), do: {k, [value | old] |> Enum.take(30)} + defp unique({k, old}, value), do: {k, [value | old] |> Enum.uniq()} + defp remove({k, values}, value), do: {k, List.delete(values, value)} + @spec do_get(db :: module(), key :: String.t()) :: element() | :not_found defp do_get(db, key) do case :ets.lookup(db, key) do diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 2b50541..6754c34 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -50,7 +50,7 @@ defmodule ClusterKV.Ring do keyspace :: String.t(), key :: String.t(), value :: any(), - fun :: fun() + fun :: atom() ) :: :ok def update(name, keyspace, key, value, fun) do :gen_statem.cast(ClusterKV.ring_name(name), {:update, keyspace, key, value, fun}) @@ -61,7 +61,7 @@ defmodule ClusterKV.Ring do keyspace :: String.t(), key :: String.t(), value :: any(), - fun :: fun() + fun :: atom() ) :: :updated | :noop def update_if(name, keyspace, key, value, fun) do :gen_statem.call(ClusterKV.ring_name(name), {:update_if, keyspace, key, value, fun}) diff --git a/test/cluster_kv_test.exs b/test/cluster_kv_test.exs index 3335fba..e0493e5 100644 --- a/test/cluster_kv_test.exs +++ b/test/cluster_kv_test.exs @@ -46,29 +46,37 @@ defmodule ClusterKVTest do "" ) - ClusterKV.update(db, @keyspace, "test1:test", :cruel, fn {id, values}, value -> - {id, List.delete(values, value)} - end) + ClusterKV.update(db, @keyspace, "test1:test", :cruel, :remove) assert {"test1:test", [:world, :hello]} = ClusterKV.get(db, @keyspace, "test1:test") - end - test "update_if", %{db: db} do - assert :updated = - ClusterKV.update_if(db, @keyspace, "test.profile", :initial_value, &update_if/2) + ClusterKV.update(db, @keyspace, "test1:test", :hi, :replace) + + assert {"test1:test", [:hi]} = ClusterKV.get(db, @keyspace, "test1:test") + + ClusterKV.update(db, @keyspace, "test1:test", [:hello, :beautiful, :world], :replace_list) + + assert {"test1:test", [:hello, :beautiful, :world]} = + ClusterKV.get(db, @keyspace, "test1:test") - assert :noop = - ClusterKV.update_if(db, @keyspace, "test.profile", :initial_value, &update_if/2) + ClusterKV.update(db, @keyspace, "test1:test", :beautiful, :last_30) - assert :updated = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2) - assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2) - assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2) + assert {"test1:test", [:beautiful, :hello, :beautiful, :world]} = + ClusterKV.get(db, @keyspace, "test1:test") + + ClusterKV.update(db, @keyspace, "test1:test", :beautiful, :unique) + + assert {"test1:test", [:beautiful, :hello, :world]} = + ClusterKV.get(db, @keyspace, "test1:test") end - defp update_if({key, [old]}, val) do - case old == val do - true -> :noop - false -> {:update, {key, [val]}} - end + test "update_if", %{db: db} do + assert :updated = ClusterKV.update_if(db, @keyspace, "test.profile", :initial_value, :diff) + + assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :initial_value, :diff) + + assert :updated = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, :diff) + assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, :diff) + assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, :diff) end end