defdelegate get(name, key, timeout \\ :infinity), to: Ring
defdelegate put(name, key, value), to: Ring
+ defdelegate update(name, key, value, fun), to: Ring
defdelegate batch(name, batch), to: Ring
defdelegate put_wildcard(name, header, key, value, split_on, join, wildcard), to: Ring
defdelegate prefix(name, key, split_on, min, timeout \\ :infinity), to: Ring
:gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value})
end
+ @spec update(name :: module(), key :: String.t(), value :: any(), fun :: fun()) :: :ok
+ def update(name, key, value, fun) do
+ :gen_statem.cast(ClusterKV.ring_name(name), {:update, key, value, fun})
+ end
+
@spec put_wildcard(
name :: module(),
header :: String.t(),
) do
ref = make_ref()
- [_ | [parts]] = String.split(key, join)
-
- parts = String.split(parts, split_on)
+ parts = String.split(key, split_on)
nodes =
Enum.map(Enum.with_index(parts), fn {k, i} ->
{:ok, sl, []}
end
+ def handle_cast(
+ {:update, key, value, fun},
+ _,
+ %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
+ ) do
+ nodes = HashRing.key_to_nodes(r, key, repls)
+ send_update(n, nodes, key, value, fun)
+ {:ok, sl, []}
+ end
+
def handle_cast({:put, _key, _value}, _, sl), do: {:ok, sl, []}
def handle_cast(_, _, sl), do: {:ok, sl, []}
{:ok, sl, []}
end
+ def handle_info({:update, key, value, fun}, _, %SL{data: %Ring{db: db}} = sl) do
+ DB.upsert(db, key, value, fun)
+ {:ok, sl, []}
+ end
+
def handle_info({:get_key, key, ref, node}, _, %SL{data: %Ring{name: n, db: db, node: me}} = sl) do
val = DB.get(db, key)
send({n, node}, {:reply, val, ref, me})
k = Enum.join([header, k, i], join)
nodes = HashRing.key_to_nodes(ring, k, repls)
send_sync(name, nodes, k, {parts, value})
+ false
end)
end
end)
end
+ @spec send_sync(name :: module(), nodes :: [node()], key :: String.t(), value :: any()) :: :ok
+ defp send_update(name, nodes, key, value, fun) do
+ Enum.each(nodes, fn n ->
+ send({name, n}, {:update, key, value, fun})
+ end)
+ end
+
@spec get_existing_nodes(ring :: HashRing.t()) :: HashRing.t()
defp get_existing_nodes(ring) do
Enum.reduce(Node.list(), ring, fn n, acc ->
]
]
+ @header "com.myrealm"
+
setup_all do
db = ClusterKV.start_link(name: TestCluster, topologies: @topologies, replicas: 1, quorum: 1)
[db: TestCluster]
end
test "test", %{db: db} do
+ me = {self(), self()}
ClusterKV.put(db, "test1", :hello)
+ ClusterKV.put(db, "test1", :cruel)
ClusterKV.put(db, "test1", :world)
+
+ ClusterKV.put_wildcard(
+ db,
+ @header,
+ "com.test..temp",
+ {1_003_039_494, me},
+ ".",
+ ":",
+ ""
+ )
+
+ assert {"test1", [:world, :cruel, :hello]} = ClusterKV.get(db, "test1")
+
+ assert [{["com", "test", "", "temp"], {1_003_039_494, me}}] =
+ ClusterKV.wildcard(
+ db,
+ @header,
+ "com.test.data.temp",
+ ".",
+ ":",
+ ""
+ )
+
+ ClusterKV.update(db, "test1", :cruel, fn {id, values}, value ->
+ {id, List.delete(values, value)}
+ end)
+
assert {"test1", [:world, :hello]} = ClusterKV.get(db, "test1")
end
end