From e30152fd4d29733688f201aa08d1f7bf625bb6ce Mon Sep 17 00:00:00 2001 From: Christopher Date: Sat, 7 Mar 2020 16:08:11 -0600 Subject: [PATCH] initial wildcard support, it ain't pretty... --- lib/cluster_kv.ex | 2 + lib/cluster_kv/db.ex | 15 ++++++ lib/cluster_kv/ring.ex | 114 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 130 insertions(+), 1 deletion(-) diff --git a/lib/cluster_kv.ex b/lib/cluster_kv.ex index a8fe61b..a8a42fd 100644 --- a/lib/cluster_kv.ex +++ b/lib/cluster_kv.ex @@ -44,6 +44,8 @@ defmodule ClusterKV do defdelegate get(name, key), to: Ring defdelegate put(name, key, value), to: Ring + defdelegate put_wildcard(name, header, key, value, split_on, join, wildcard), to: Ring defdelegate prefix(name, key, split_on, min), to: Ring defdelegate stream(name), to: Ring + defdelegate wildcard(name, header, key, split_on, join, wildcard), to: Ring end diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index 86e5a88..ffd69cc 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -22,6 +22,11 @@ defmodule ClusterKV.DB do GenServer.cast(name, {:put, key, value}) end + @spec put_or_update(name :: module(), key :: binary(), value :: term()) :: :ok + def put_or_update(name, key, value) do + GenServer.cast(name, {:put_or_update, key, value}) + end + def stream(name) do GenServer.call(name, :stream) end @@ -52,4 +57,14 @@ defmodule ClusterKV.DB do :ets.insert(db, {key, value}) {:noreply, state} end + + def handle_cast({:put_or_update, key, value}, %DB{db: db} = state) do + case :ets.lookup(db, key) do + [] -> :ets.insert(db, {key, value}) + [{_, val}] when is_list(val) -> :ets.insert(db, {key, [value | val]}) + [{_, val}] -> :ets.insert(db, {key, [value, val]}) + end + + {:noreply, state} + end end diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 0e61050..0cbdb34 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -48,6 +48,13 @@ defmodule ClusterKV.Ring do :gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value}) end + def put_wildcard(name, header, key, value, split_on, join, wildcard) do + :gen_statem.cast( + ClusterKV.ring_name(name), + {:put_wildcard, header, key, value, split_on, join, wildcard} + ) + end + def get(name, key, timeout \\ 5000) do :gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout) end @@ -56,6 +63,13 @@ defmodule ClusterKV.Ring do :gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min}) end + def wildcard(name, header, key, split_on, join, wildcard) do + :gen_statem.call( + ClusterKV.ring_name(name), + {:wildcard, header, key, split_on, join, wildcard} + ) + end + def stream(name) do :gen_statem.call(ClusterKV.ring_name(name), :stream) end @@ -144,6 +158,53 @@ defmodule ClusterKV.Ring do def handle_call({:prefix, _, _, _}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} + def handle_call( + {:wildcard, header, key, split_on, join, wildcard}, + from, + @ready, + %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl + ) do + ref = make_ref() + + [_ | [parts]] = String.split(key, join) + + parts = String.split(parts, split_on) + + Enum.each(Enum.with_index(parts), fn {k, i} -> + k = Enum.join([header, k, i], join) + get_wildcard(k, parts, wildcard, r, me, repls, n, ref) + end) + + {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(parts), []} | reqs]}}, []} + end + + def handle_call(_, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} + + def handle_cast( + {:put_wildcard, header, key, value, split_on, join, wildcard}, + @ready, + %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl + ) do + parts = + key + |> String.split(split_on) + + Enum.take_while(Enum.with_index(parts), fn {k, i} -> + case k == wildcard do + true -> + true + + false -> + k = Enum.join([header, k, i], join) + nodes = HashRing.key_to_nodes(r, k, repls) + send_sync(n, nodes, k, {parts, value}) + false + end + end) + + {:ok, sl, []} + end + def handle_cast( {:put, key, value}, @ready, @@ -156,9 +217,11 @@ defmodule ClusterKV.Ring do def handle_cast({:put, _key, _value}, _, sl), do: {:ok, sl, []} + def handle_cast(_, _, sl), do: {:ok, sl, []} + def handle_info({:sync, keys}, @ready, %SL{data: %Ring{db: db}} = sl) do Enum.each(keys, fn {key, value} -> - DB.put(db, key, value) + DB.put_or_update(db, key, value) end) {:ok, sl, []} @@ -172,6 +235,50 @@ defmodule ClusterKV.Ring do {:ok, sl, []} end + def handle_info( + {:get_wildcard, key, parts, wildcard, ref, node}, + @ready, + %SL{data: %Ring{name: n, db: db}} = sl + ) do + vals = DB.get(db, key) + + vals = + case vals do + :not_found -> + :not_found + + vals -> + parts = Enum.with_index(parts) + {_k, vals} = vals + + vals = + case vals do + v when is_list(v) -> v + v -> [v] + end + + Enum.filter(vals, fn {p, _val} -> + Enum.all?(parts, fn {k, i} -> + case Enum.at(p, i) do + ^k -> true + w when w == wildcard -> true + _ -> false + end + end) + end) + end + + vals = + case vals do + [] -> :not_found + [o] -> o + o -> o + end + + send({n, node}, {:reply, vals, ref}) + {:ok, sl, []} + end + def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do {requests, actions} = maybe_reply(ref, reqs, val) {:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions} @@ -205,6 +312,11 @@ defmodule ClusterKV.Ring do send({n, node}, {:get_key, prefix, ref, me}) end + defp get_wildcard(key, parts, wildcard, r, me, repls, n, ref) do + node = get_node(key, r, me, repls) + send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me}) + end + defp get_node(key, r, node, repls) do nodes = HashRing.key_to_nodes(r, key, repls) -- 2.45.3