From 31620edf6ecd07f0fb2e6eacaae1999628aaaf4e Mon Sep 17 00:00:00 2001 From: Christopher Date: Sat, 7 Mar 2020 09:16:33 -0600 Subject: [PATCH] separate stream from sync --- lib/cluster_kv/ring.ex | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 085e060..2d44a8a 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -104,10 +104,9 @@ defmodule ClusterKV.Ring do :stream, from, @ready, - %SL{data: %Ring{name: n, node: me, db: db, ring: ring, replicas: repls}} = sl + %SL{data: %Ring{db: db}} = sl ) do - s = sync_db(n, me, db, ring, repls) - {:ok, sl, [{:reply, from, s}]} + {:ok, sl, [{:reply, from, do_stream(db)}]} end def handle_call( @@ -218,14 +217,19 @@ defmodule ClusterKV.Ring do end end + defp do_stream(db) do + db + |> DB.stream() + |> Stream.take_while(fn + :"$end_of_table" -> false + _ -> true + end) + end + defp sync_db(name, me, db, ring, repls) do Task.start(fn -> db - |> DB.stream() - |> Stream.take_while(fn - :"$end_of_table" -> false - _ -> true - end) + |> do_stream() |> Stream.chunk_every(10) |> Stream.each(fn chunk -> Enum.each(chunk, fn key -> @@ -260,12 +264,11 @@ defmodule ClusterKV.Ring do defp get_final_value(res, val) when is_list(res), do: - {:ok, - [val | res] - |> Enum.filter(fn - :not_found -> false - _ -> true - end)} + [val | res] + |> Enum.filter(fn + :not_found -> false + _ -> true + end) defp get_request(ref, reqs) do Enum.find(reqs, fn -- 2.45.3