From 54f501b9c61b5c70467e044e5cd2984b672ee1fa Mon Sep 17 00:00:00 2001 From: Christopher Date: Fri, 6 Mar 2020 20:46:21 -0600 Subject: [PATCH] adds prefix lookup --- lib/cluster_kv/db.ex | 28 +++++++++++++++--- lib/cluster_kv/ring.ex | 66 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 81 insertions(+), 13 deletions(-) diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index fcb7c52..f49cc39 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -55,10 +55,7 @@ defmodule ClusterKV.DB do end def handle_call({:get_prefix, prefix}, _from, %DB{db: db} = state) do - {:reply, - db - |> :rocksdb.get(prefix, []) - |> deserialize(), state} + {:reply, seek_iterator(db, prefix), state} end def handle_cast({:put, key, value}, %DB{db: db} = state) do @@ -66,6 +63,29 @@ defmodule ClusterKV.DB do {:noreply, state} end + def seek_iterator(db, prefix, acc \\ []) do + {:ok, itr} = :rocksdb.iterator(db, []) + seek_loop(:rocksdb.iterator_move(itr, {:seek, prefix}), prefix, itr, acc) + end + + def seek_loop({:error, :iterator_closed}, _pre, _itr, acc), do: acc + def seek_loop({:error, :invalid_iterator}, _pre, _itr, acc), do: acc + + def seek_loop({:ok, key, val}, prefix, itr, acc) do + case String.starts_with?(key, prefix) do + true -> + seek_loop( + :rocksdb.iterator_move(itr, :next), + prefix, + itr, + [{key, :erlang.binary_to_term(val)} | acc] + ) + + false -> + acc + end + end + def deserialize({:ok, val}), do: {:ok, :erlang.binary_to_term(val)} def deserialize(other), do: other diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index a861ca5..160281c 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -109,12 +109,30 @@ defmodule ClusterKV.Ring do end ref = make_ref() - send({n, node}, {:get_key, key, ref, Node.self()}) - {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from} | reqs]}}, []} + send({n, node}, {:get_key, key, ref, node}) + {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, 1, []} | reqs]}}, []} end def handle_call({:get, _key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} + def handle_call( + {:prefix, prefix}, + from, + @ready, + %SL{data: %Ring{name: n, requests: reqs, ring: r} = data} = sl + ) do + nodes = HashRing.nodes(r) + ref = make_ref() + + Enum.each(nodes, fn node -> + send({n, node}, {:get_prefix, prefix, ref, node}) + end) + + {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(nodes), []} | reqs]}}, []} + end + + def handle_call({:prefix, _prefix}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} + def handle_cast( {:put, key, value}, @ready, @@ -143,11 +161,19 @@ defmodule ClusterKV.Ring do {:ok, sl, []} end - def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do - {_, from} = get_request(ref, reqs) + def handle_info( + {:get_prefix, prefix, ref, node}, + @ready, + %SL{data: %Ring{name: n, db: db}} = sl + ) do + val = DB.get_prefix(db, prefix) + send({n, node}, {:reply, val, ref}) + {:ok, sl, []} + end - {:ok, %SL{sl | data: %Ring{data | requests: remove_request(ref, reqs)}}, - [{:reply, from, val}]} + def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do + {requests, actions} = maybe_reply(ref, reqs, val) + {:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions} end def handle_info({:nodeup, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do @@ -164,17 +190,39 @@ defmodule ClusterKV.Ring do [{:next_event, :internal, :node_down}]} end + defp maybe_reply(ref, reqs, val) do + case get_request(ref, reqs) do + {_, from, 1, res} -> + {remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]} + + _ -> + {decrement_request(ref, reqs, val), []} + end + end + + defp get_final_value([], val), do: val + + defp get_final_value(res, val) when is_list(res), + do: {:ok, MapSet.new(List.flatten([val | res]))} + defp get_request(ref, reqs) do Enum.find(reqs, fn - {^ref, _} -> true + {^ref, _, _, _} -> true _ -> false end) end + defp decrement_request(ref, reqs, val) do + Enum.map(reqs, fn + {^ref, f, c, res} -> {ref, f, c - 1, [val | res]} + other -> other + end) + end + defp remove_request(ref, requests) do Enum.filter(requests, fn - {^ref, _} -> false - {_, _} -> true + {^ref, _, _, _} -> false + {_, _, _, _} -> true end) end -- 2.45.3