From: Christopher Date: Mon, 9 Mar 2020 14:52:29 +0000 (-0500) Subject: run batch in task, use saafe_fixtable for db stream X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=86480e316839179301efdaad1ed2ed5dc1d621af;p=cluster_kv.git run batch in task, use saafe_fixtable for db stream --- diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index a864db8..3d553d9 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -146,12 +146,18 @@ defmodule ClusterKV.DB do end defp get_stream(db) do + :ets.safe_fixtable(db, true) + db |> :ets.first() |> Stream.iterate(&:ets.next(db, &1)) |> Stream.take_while(fn - :"$end_of_table" -> false - _ -> true + :"$end_of_table" -> + :ets.safe_fixtable(db, false) + false + + _ -> + true end) |> Stream.map(fn key -> do_get(db, key) diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 505a21a..2924bf8 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -358,24 +358,26 @@ defmodule ClusterKV.Ring do end def redistribute_data(name, me, db, old_ring, new_ring, repls) do - batch = - db - |> DB.stream() - |> Enum.reduce(%{}, fn {k, v}, acc -> - old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new() - new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new() - nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list() - - Enum.reduce(nodes, acc, fn - ^me, a -> - a - - n, a -> - Map.update(a, n, [], &[{k, v} | &1]) + Task.start(fn -> + batch = + db + |> DB.stream() + |> Enum.reduce(%{}, fn {k, v}, acc -> + old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new() + new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new() + nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list() + + Enum.reduce(nodes, acc, fn + ^me, a -> + a + + n, a -> + Map.update(a, n, [], &[{k, v} | &1]) + end) end) - end) - send_batch(name, batch) + send_batch(name, batch) + end) end defp maybe_reply(ref, reqs, val, node) do