]> Entropealabs - cluster_kv.git/commitdiff
run batch in task, use saafe_fixtable for db stream
authorChristopher <chris@entropealabs.com>
Mon, 9 Mar 2020 14:52:29 +0000 (09:52 -0500)
committerChristopher <chris@entropealabs.com>
Mon, 9 Mar 2020 14:52:29 +0000 (09:52 -0500)
lib/cluster_kv/db.ex
lib/cluster_kv/ring.ex

index a864db884e55f8d1a19dfc4454c1a2accdbe118b..3d553d93a9b1dda44ccfe0bb2508d489026149a3 100644 (file)
@@ -146,12 +146,18 @@ defmodule ClusterKV.DB do
   end
 
   defp get_stream(db) do
+    :ets.safe_fixtable(db, true)
+
     db
     |> :ets.first()
     |> Stream.iterate(&:ets.next(db, &1))
     |> Stream.take_while(fn
-      :"$end_of_table" -> false
-      _ -> true
+      :"$end_of_table" ->
+        :ets.safe_fixtable(db, false)
+        false
+
+      _ ->
+        true
     end)
     |> Stream.map(fn key ->
       do_get(db, key)
index 505a21a8aec71a73b6a587d0057e2623ff579f7c..2924bf8764e5b8d926f19f8c8c98158832df0a50 100644 (file)
@@ -358,24 +358,26 @@ defmodule ClusterKV.Ring do
   end
 
   def redistribute_data(name, me, db, old_ring, new_ring, repls) do
-    batch =
-      db
-      |> DB.stream()
-      |> Enum.reduce(%{}, fn {k, v}, acc ->
-        old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new()
-        new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new()
-        nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list()
-
-        Enum.reduce(nodes, acc, fn
-          ^me, a ->
-            a
-
-          n, a ->
-            Map.update(a, n, [], &[{k, v} | &1])
+    Task.start(fn ->
+      batch =
+        db
+        |> DB.stream()
+        |> Enum.reduce(%{}, fn {k, v}, acc ->
+          old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new()
+          new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new()
+          nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list()
+
+          Enum.reduce(nodes, acc, fn
+            ^me, a ->
+              a
+
+            n, a ->
+              Map.update(a, n, [], &[{k, v} | &1])
+          end)
         end)
-      end)
 
-    send_batch(name, batch)
+      send_batch(name, batch)
+    end)
   end
 
   defp maybe_reply(ref, reqs, val, node) do