end
def handle_call({:get_prefix, prefix}, _from, %DB{db: db} = state) do
- {:reply,
- db
- |> :rocksdb.get(prefix, [])
- |> deserialize(), state}
+ {:reply, seek_iterator(db, prefix), state}
end
def handle_cast({:put, key, value}, %DB{db: db} = state) do
{:noreply, state}
end
+ def seek_iterator(db, prefix, acc \\ []) do
+ {:ok, itr} = :rocksdb.iterator(db, [])
+ seek_loop(:rocksdb.iterator_move(itr, {:seek, prefix}), prefix, itr, acc)
+ end
+
+ def seek_loop({:error, :iterator_closed}, _pre, _itr, acc), do: acc
+ def seek_loop({:error, :invalid_iterator}, _pre, _itr, acc), do: acc
+
+ def seek_loop({:ok, key, val}, prefix, itr, acc) do
+ case String.starts_with?(key, prefix) do
+ true ->
+ seek_loop(
+ :rocksdb.iterator_move(itr, :next),
+ prefix,
+ itr,
+ [{key, :erlang.binary_to_term(val)} | acc]
+ )
+
+ false ->
+ acc
+ end
+ end
+
def deserialize({:ok, val}), do: {:ok, :erlang.binary_to_term(val)}
def deserialize(other), do: other
end
ref = make_ref()
- send({n, node}, {:get_key, key, ref, Node.self()})
- {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from} | reqs]}}, []}
+ send({n, node}, {:get_key, key, ref, node})
+ {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, 1, []} | reqs]}}, []}
end
def handle_call({:get, _key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
+ def handle_call(
+ {:prefix, prefix},
+ from,
+ @ready,
+ %SL{data: %Ring{name: n, requests: reqs, ring: r} = data} = sl
+ ) do
+ nodes = HashRing.nodes(r)
+ ref = make_ref()
+
+ Enum.each(nodes, fn node ->
+ send({n, node}, {:get_prefix, prefix, ref, node})
+ end)
+
+ {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(nodes), []} | reqs]}}, []}
+ end
+
+ def handle_call({:prefix, _prefix}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
+
def handle_cast(
{:put, key, value},
@ready,
{:ok, sl, []}
end
- def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do
- {_, from} = get_request(ref, reqs)
+ def handle_info(
+ {:get_prefix, prefix, ref, node},
+ @ready,
+ %SL{data: %Ring{name: n, db: db}} = sl
+ ) do
+ val = DB.get_prefix(db, prefix)
+ send({n, node}, {:reply, val, ref})
+ {:ok, sl, []}
+ end
- {:ok, %SL{sl | data: %Ring{data | requests: remove_request(ref, reqs)}},
- [{:reply, from, val}]}
+ def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do
+ {requests, actions} = maybe_reply(ref, reqs, val)
+ {:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions}
end
def handle_info({:nodeup, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do
[{:next_event, :internal, :node_down}]}
end
+ defp maybe_reply(ref, reqs, val) do
+ case get_request(ref, reqs) do
+ {_, from, 1, res} ->
+ {remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]}
+
+ _ ->
+ {decrement_request(ref, reqs, val), []}
+ end
+ end
+
+ defp get_final_value([], val), do: val
+
+ defp get_final_value(res, val) when is_list(res),
+ do: {:ok, MapSet.new(List.flatten([val | res]))}
+
defp get_request(ref, reqs) do
Enum.find(reqs, fn
- {^ref, _} -> true
+ {^ref, _, _, _} -> true
_ -> false
end)
end
+ defp decrement_request(ref, reqs, val) do
+ Enum.map(reqs, fn
+ {^ref, f, c, res} -> {ref, f, c - 1, [val | res]}
+ other -> other
+ end)
+ end
+
defp remove_request(ref, requests) do
Enum.filter(requests, fn
- {^ref, _} -> false
- {_, _} -> true
+ {^ref, _, _, _} -> false
+ {_, _, _, _} -> true
end)
end