@type element :: {key :: String.t(), value :: any()} | :not_found
+ @type update_function ::
+ :diff
+ | :not_in
+ | :keystore
+ | :replace
+ | :replace_list
+ | :last_30
+ | :unique
+ | :remove
+ | fun()
+
@type t :: %__MODULE__{
db: :ets.tid() | atom(),
batch: [element()],
name :: module(),
key :: binary(),
value :: any(),
- fun :: fun(),
+ fun :: update_function(),
timeout :: non_neg_integer() | :infinity
) ::
element() | :not_found
end)
end
- @spec upsert(name :: module(), key :: binary(), value :: term(), fun :: fun()) :: :ok
+ @spec upsert(name :: module(), key :: binary(), value :: term(), fun :: update_function()) ::
+ :ok
def upsert(name, key, value, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do
:poolboy.transaction(name, fn w ->
GenServer.cast(w, {:upsert, key, value, fun})
end)
end
- @spec batch(name :: module(), batch :: [element()], chunk :: integer(), fun :: fun()) :: :ok
+ @spec batch(
+ name :: module(),
+ batch :: [element()],
+ chunk :: integer(),
+ fun :: update_function()
+ ) :: :ok
def batch(name, batch, chunk \\ 10, fun \\ &{elem(&1, 0), &2}) do
:poolboy.transaction(name, fn w ->
GenServer.cast(w, {:batch, batch, chunk, fun})
batch :: [element()],
chunk :: integer(),
last_batch :: integer(),
- fun :: fun()
+ fun :: update_function()
) :: {[element()], integer(), reference()}
defp handle_next_batch_chunk(db, batch, chunk, last_batch, fun) do
Logger.debug("processing batch of #{length(batch)} from #{last_batch} with chunk of #{chunk}")
end
end
- @spec do_upsert(db :: module(), key :: String.t(), value :: any(), fun :: fun()) :: true
+ @spec do_upsert(db :: module(), key :: String.t(), value :: any(), fun :: update_function()) ::
+ true
defp do_upsert(db, key, value, fun) do
get_lock(db, key)
+ fun = get_update_function(fun)
case :ets.lookup(db, key) do
[] ->
release_lock(db, key)
end
- @spec do_update_if(db :: module(), key :: String.t(), value :: any(), fun :: fun()) ::
+ @spec do_update_if(db :: module(), key :: String.t(), value :: any(), fun :: update_function()) ::
:updated | :noop
defp do_update_if(db, key, value, fun) do
get_lock(db, key)
+ fun = get_update_function(fun)
case :ets.lookup(db, key) do
[] ->
end
end
+ defp get_update_function(:diff), do: &diff/2
+ defp get_update_function(:not_in), do: ¬_in/2
+ defp get_update_function(:keystore), do: &keystore/2
+ defp get_update_function(:replace), do: &replace/2
+ defp get_update_function(:replace_list), do: &replace_list/2
+ defp get_update_function(:last_30), do: &last_30/2
+ defp get_update_function(:unique), do: &unique/2
+ defp get_update_function(:remove), do: &remove/2
+ defp get_update_function(fun) when is_function(fun), do: fun
+ defp get_update_function(_), do: fn _, _ -> :noop end
+
+ defp diff({key, [old]}, val) do
+ case old == val do
+ true -> :noop
+ false -> {:update, {key, [val]}}
+ end
+ end
+
+ defp not_in({key, old_vals}, val) do
+ case val in old_vals do
+ true -> :noop
+ false -> {:update, {key, Enum.slice([val | old_vals], 0..10)}}
+ end
+ end
+
+ defp keystore({k, old}, {key, _} = new), do: {k, List.keystore(old, key, 0, new)}
+ defp replace({k, _old}, value), do: {k, [value]}
+ defp replace_list({k, _old}, value), do: {k, value}
+ defp last_30({k, old}, value), do: {k, [value | old] |> Enum.take(30)}
+ defp unique({k, old}, value), do: {k, [value | old] |> Enum.uniq()}
+ defp remove({k, values}, value), do: {k, List.delete(values, value)}
+
@spec do_get(db :: module(), key :: String.t()) :: element() | :not_found
defp do_get(db, key) do
case :ets.lookup(db, key) do
""
)
- ClusterKV.update(db, @keyspace, "test1:test", :cruel, fn {id, values}, value ->
- {id, List.delete(values, value)}
- end)
+ ClusterKV.update(db, @keyspace, "test1:test", :cruel, :remove)
assert {"test1:test", [:world, :hello]} = ClusterKV.get(db, @keyspace, "test1:test")
- end
- test "update_if", %{db: db} do
- assert :updated =
- ClusterKV.update_if(db, @keyspace, "test.profile", :initial_value, &update_if/2)
+ ClusterKV.update(db, @keyspace, "test1:test", :hi, :replace)
+
+ assert {"test1:test", [:hi]} = ClusterKV.get(db, @keyspace, "test1:test")
+
+ ClusterKV.update(db, @keyspace, "test1:test", [:hello, :beautiful, :world], :replace_list)
+
+ assert {"test1:test", [:hello, :beautiful, :world]} =
+ ClusterKV.get(db, @keyspace, "test1:test")
- assert :noop =
- ClusterKV.update_if(db, @keyspace, "test.profile", :initial_value, &update_if/2)
+ ClusterKV.update(db, @keyspace, "test1:test", :beautiful, :last_30)
- assert :updated = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2)
- assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2)
- assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, &update_if/2)
+ assert {"test1:test", [:beautiful, :hello, :beautiful, :world]} =
+ ClusterKV.get(db, @keyspace, "test1:test")
+
+ ClusterKV.update(db, @keyspace, "test1:test", :beautiful, :unique)
+
+ assert {"test1:test", [:beautiful, :hello, :world]} =
+ ClusterKV.get(db, @keyspace, "test1:test")
end
- defp update_if({key, [old]}, val) do
- case old == val do
- true -> :noop
- false -> {:update, {key, [val]}}
- end
+ test "update_if", %{db: db} do
+ assert :updated = ClusterKV.update_if(db, @keyspace, "test.profile", :initial_value, :diff)
+
+ assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :initial_value, :diff)
+
+ assert :updated = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, :diff)
+ assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, :diff)
+ assert :noop = ClusterKV.update_if(db, @keyspace, "test.profile", :new_value, :diff)
end
end