:gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value})
end
+ def put_wildcard(name, header, key, value, split_on, join, wildcard) do
+ :gen_statem.cast(
+ ClusterKV.ring_name(name),
+ {:put_wildcard, header, key, value, split_on, join, wildcard}
+ )
+ end
+
def get(name, key, timeout \\ 5000) do
:gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout)
end
:gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min})
end
+ def wildcard(name, header, key, split_on, join, wildcard) do
+ :gen_statem.call(
+ ClusterKV.ring_name(name),
+ {:wildcard, header, key, split_on, join, wildcard}
+ )
+ end
+
def stream(name) do
:gen_statem.call(ClusterKV.ring_name(name), :stream)
end
def handle_call({:prefix, _, _, _}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
+ def handle_call(
+ {:wildcard, header, key, split_on, join, wildcard},
+ from,
+ @ready,
+ %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl
+ ) do
+ ref = make_ref()
+
+ [_ | [parts]] = String.split(key, join)
+
+ parts = String.split(parts, split_on)
+
+ Enum.each(Enum.with_index(parts), fn {k, i} ->
+ k = Enum.join([header, k, i], join)
+ get_wildcard(k, parts, wildcard, r, me, repls, n, ref)
+ end)
+
+ {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(parts), []} | reqs]}}, []}
+ end
+
+ def handle_call(_, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
+
+ def handle_cast(
+ {:put_wildcard, header, key, value, split_on, join, wildcard},
+ @ready,
+ %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
+ ) do
+ parts =
+ key
+ |> String.split(split_on)
+
+ Enum.take_while(Enum.with_index(parts), fn {k, i} ->
+ case k == wildcard do
+ true ->
+ true
+
+ false ->
+ k = Enum.join([header, k, i], join)
+ nodes = HashRing.key_to_nodes(r, k, repls)
+ send_sync(n, nodes, k, {parts, value})
+ false
+ end
+ end)
+
+ {:ok, sl, []}
+ end
+
def handle_cast(
{:put, key, value},
@ready,
def handle_cast({:put, _key, _value}, _, sl), do: {:ok, sl, []}
+ def handle_cast(_, _, sl), do: {:ok, sl, []}
+
def handle_info({:sync, keys}, @ready, %SL{data: %Ring{db: db}} = sl) do
Enum.each(keys, fn {key, value} ->
- DB.put(db, key, value)
+ DB.put_or_update(db, key, value)
end)
{:ok, sl, []}
{:ok, sl, []}
end
+ def handle_info(
+ {:get_wildcard, key, parts, wildcard, ref, node},
+ @ready,
+ %SL{data: %Ring{name: n, db: db}} = sl
+ ) do
+ vals = DB.get(db, key)
+
+ vals =
+ case vals do
+ :not_found ->
+ :not_found
+
+ vals ->
+ parts = Enum.with_index(parts)
+ {_k, vals} = vals
+
+ vals =
+ case vals do
+ v when is_list(v) -> v
+ v -> [v]
+ end
+
+ Enum.filter(vals, fn {p, _val} ->
+ Enum.all?(parts, fn {k, i} ->
+ case Enum.at(p, i) do
+ ^k -> true
+ w when w == wildcard -> true
+ _ -> false
+ end
+ end)
+ end)
+ end
+
+ vals =
+ case vals do
+ [] -> :not_found
+ [o] -> o
+ o -> o
+ end
+
+ send({n, node}, {:reply, vals, ref})
+ {:ok, sl, []}
+ end
+
def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do
{requests, actions} = maybe_reply(ref, reqs, val)
{:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions}
send({n, node}, {:get_key, prefix, ref, me})
end
+ defp get_wildcard(key, parts, wildcard, r, me, repls, n, ref) do
+ node = get_node(key, r, me, repls)
+ send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me})
+ end
+
defp get_node(key, r, node, repls) do
nodes = HashRing.key_to_nodes(r, key, repls)