@db_options [:set, :public]
- defstruct [:db]
+ defstruct [:db, :batch, :batch_chunk, :batch_fun, last_batch: 0]
@type t :: %__MODULE__{
- db: :ets.tid() | atom()
+ db: :ets.tid() | atom(),
+ batch: list(),
+ batch_chunk: integer(),
+ batch_fun: fun(),
+ last_batch: integer()
}
@spec get(name :: module(), key :: binary()) :: [tuple()]
GenServer.cast(name, {:upsert, key, value, fun})
end
- def batch(name, batch, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do
- GenServer.cast(name, {:batch, batch, fun})
+ def batch(name, batch, chunk \\ 100, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do
+ GenServer.cast(name, {:batch, batch, chunk, fun})
end
def stream(name, timeout \\ :infinity) do
{:noreply, state}
end
- def handle_cast({:batch, batch, fun}, %DB{db: db} = state) do
- Enum.each(batch, fn {k, v} ->
+ def handle_cast({:batch, batch, chunk, fun}, %DB{db: db, last_batch: lb} = state) do
+ {batch, lb} = handle_next_batch_chunk(db, batch, chunk, lb, fun)
+ {:noreply, %DB{state | batch: batch, batch_chunk: chunk, batch_fun: fun, last_batch: lb}}
+ end
+
+ def handle_info(
+ :process_batch,
+ %DB{db: db, batch_chunk: chunk, batch_fun: fun, batch: batch, last_batch: lb} = state
+ ) do
+ {batch, lb} = handle_next_batch_chunk(db, batch, chunk, lb, fun)
+ {:noreply, %DB{state | batch: batch, last_batch: lb}}
+ end
+
+ defp handle_next_batch_chunk(db, batch, chunk, last_batch, fun) do
+ batch
+ |> Enum.slice(last_batch, chunk)
+ |> Enum.each(fn {k, v} ->
do_upsert(db, k, v, fun)
end)
- {:noreply, state}
+ case last_batch + chunk do
+ lb when lb > length(batch) ->
+ {[], 0}
+
+ lb ->
+ Process.send_after(self(), :process_batch, 0)
+ {batch, lb}
+ end
end
defp do_upsert(db, key, value, fun) do
def handle_info(
{:nodeup, node, info},
_,
- %SL{data: %Ring{name: _n, node: _me, db: _db, ring: r, replicas: _repls} = data} = sl
+ %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl
) do
Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
ring = HashRing.add_node(r, node)
- # sync_db(n, me, db, ring, repls)
+ populate_new_node(node, n, db, ring, repls)
{:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]}
end
def handle_info(
{:nodedown, node, info},
_,
- %SL{data: %Ring{name: _n, node: _me, db: _db, ring: r, replicas: _repls} = data} = sl
+ %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl
) do
Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
ring = HashRing.remove_node(r, node)
- # sync_db(n, me, db, ring, repls)
+ redistribute_old_node(n, db, r, ring, repls)
{:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
end
def handle_info({:handle_batch, batch}, @ready, %SL{data: %Ring{db: db}} = sl) do
+ Logger.info("Handling Batch")
DB.batch(db, batch)
{:ok, sl, []}
end
end)
end
+ def populate_new_node(node, name, db, ring, repls) do
+ batch =
+ db
+ |> DB.stream()
+ |> Enum.reduce(%{}, fn {k, v}, acc ->
+ nodes = HashRing.key_to_nodes(ring, k, repls)
+
+ case node in nodes do
+ true ->
+ Map.update(acc, node, [], &[{k, v} | &1])
+
+ false ->
+ acc
+ end
+ end)
+
+ send_batch(name, batch)
+ end
+
+ def redistribute_old_node(name, db, old_ring, new_ring, repls) do
+ batch =
+ db
+ |> DB.stream()
+ |> Enum.reduce(%{}, fn {k, v}, acc ->
+ old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new()
+ new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new()
+ nodes = MapSet.intersection(new_nodes, old_nodes) |> MapSet.to_list()
+
+ Enum.reduce(nodes, acc, fn n, a ->
+ Map.update(a, n, [], &[{k, v} | &1])
+ end)
+ end)
+
+ send_batch(name, batch)
+ end
+
defp maybe_reply(ref, reqs, val) do
case get_request(ref, reqs) do
{_, from, 1, res} ->