]> Entropealabs - cluster_kv.git/commitdiff
separate stream from sync
authorChristopher <chris@entropealabs.com>
Sat, 7 Mar 2020 15:16:33 +0000 (09:16 -0600)
committerChristopher <chris@entropealabs.com>
Sat, 7 Mar 2020 15:16:33 +0000 (09:16 -0600)
lib/cluster_kv/ring.ex

index 085e06072a72bc236c7a4496fa8fadf23972c101..2d44a8a977a5cacd26712adbce06e89784c2736e 100644 (file)
@@ -104,10 +104,9 @@ defmodule ClusterKV.Ring do
         :stream,
         from,
         @ready,
-        %SL{data: %Ring{name: n, node: me, db: db, ring: ring, replicas: repls}} = sl
+        %SL{data: %Ring{db: db}} = sl
       ) do
-    s = sync_db(n, me, db, ring, repls)
-    {:ok, sl, [{:reply, from, s}]}
+    {:ok, sl, [{:reply, from, do_stream(db)}]}
   end
 
   def handle_call(
@@ -218,14 +217,19 @@ defmodule ClusterKV.Ring do
     end
   end
 
+  defp do_stream(db) do
+    db
+    |> DB.stream()
+    |> Stream.take_while(fn
+      :"$end_of_table" -> false
+      _ -> true
+    end)
+  end
+
   defp sync_db(name, me, db, ring, repls) do
     Task.start(fn ->
       db
-      |> DB.stream()
-      |> Stream.take_while(fn
-        :"$end_of_table" -> false
-        _ -> true
-      end)
+      |> do_stream()
       |> Stream.chunk_every(10)
       |> Stream.each(fn chunk ->
         Enum.each(chunk, fn key ->
@@ -260,12 +264,11 @@ defmodule ClusterKV.Ring do
 
   defp get_final_value(res, val) when is_list(res),
     do:
-      {:ok,
-       [val | res]
-       |> Enum.filter(fn
-         :not_found -> false
-         _ -> true
-       end)}
+      [val | res]
+      |> Enum.filter(fn
+        :not_found -> false
+        _ -> true
+      end)
 
   defp get_request(ref, reqs) do
     Enum.find(reqs, fn