From 86480e316839179301efdaad1ed2ed5dc1d621af Mon Sep 17 00:00:00 2001 From: Christopher Date: Mon, 9 Mar 2020 09:52:29 -0500 Subject: [PATCH] run batch in task, use saafe_fixtable for db stream --- lib/cluster_kv/db.ex | 10 ++++++++-- lib/cluster_kv/ring.ex | 34 ++++++++++++++++++---------------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index a864db8..3d553d9 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -146,12 +146,18 @@ defmodule ClusterKV.DB do end defp get_stream(db) do + :ets.safe_fixtable(db, true) + db |> :ets.first() |> Stream.iterate(&:ets.next(db, &1)) |> Stream.take_while(fn - :"$end_of_table" -> false - _ -> true + :"$end_of_table" -> + :ets.safe_fixtable(db, false) + false + + _ -> + true end) |> Stream.map(fn key -> do_get(db, key) diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 505a21a..2924bf8 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -358,24 +358,26 @@ defmodule ClusterKV.Ring do end def redistribute_data(name, me, db, old_ring, new_ring, repls) do - batch = - db - |> DB.stream() - |> Enum.reduce(%{}, fn {k, v}, acc -> - old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new() - new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new() - nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list() - - Enum.reduce(nodes, acc, fn - ^me, a -> - a - - n, a -> - Map.update(a, n, [], &[{k, v} | &1]) + Task.start(fn -> + batch = + db + |> DB.stream() + |> Enum.reduce(%{}, fn {k, v}, acc -> + old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new() + new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new() + nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list() + + Enum.reduce(nodes, acc, fn + ^me, a -> + a + + n, a -> + Map.update(a, n, [], &[{k, v} | &1]) + end) end) - end) - send_batch(name, batch) + send_batch(name, batch) + end) end defp maybe_reply(ref, reqs, val, node) do -- 2.45.3