:stream,
from,
@ready,
- %SL{data: %Ring{name: n, node: me, db: db, ring: ring, replicas: repls}} = sl
+ %SL{data: %Ring{db: db}} = sl
) do
- s = sync_db(n, me, db, ring, repls)
- {:ok, sl, [{:reply, from, s}]}
+ {:ok, sl, [{:reply, from, do_stream(db)}]}
end
def handle_call(
end
end
+ defp do_stream(db) do
+ db
+ |> DB.stream()
+ |> Stream.take_while(fn
+ :"$end_of_table" -> false
+ _ -> true
+ end)
+ end
+
defp sync_db(name, me, db, ring, repls) do
Task.start(fn ->
db
- |> DB.stream()
- |> Stream.take_while(fn
- :"$end_of_table" -> false
- _ -> true
- end)
+ |> do_stream()
|> Stream.chunk_every(10)
|> Stream.each(fn chunk ->
Enum.each(chunk, fn key ->
defp get_final_value(res, val) when is_list(res),
do:
- {:ok,
- [val | res]
- |> Enum.filter(fn
- :not_found -> false
- _ -> true
- end)}
+ [val | res]
+ |> Enum.filter(fn
+ :not_found -> false
+ _ -> true
+ end)
defp get_request(ref, reqs) do
Enum.find(reqs, fn