From: Christopher Date: Sun, 8 Mar 2020 21:53:45 +0000 (-0500) Subject: refinebaatch processing X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=1b02c1f266feed18cf551af7bf911fde4689917e;p=cluster_kv.git refinebaatch processing --- diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index 37b7eaf..a864db8 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -6,13 +6,14 @@ defmodule ClusterKV.DB do @db_options [:set] - defstruct [:db, :batch_chunk, :batch_fun, last_batch: 0, batch: []] + defstruct [:db, :batch_chunk, :batch_fun, :batch_ref, last_batch: 0, batch: []] @type t :: %__MODULE__{ db: :ets.tid() | atom(), batch: list(), batch_chunk: integer(), batch_fun: fun(), + batch_ref: reference(), last_batch: integer() } @@ -70,19 +71,31 @@ defmodule ClusterKV.DB do {:noreply, state} end - def handle_cast({:batch, batch, chunk, fun}, %DB{batch: b} = state) do + def handle_cast({:batch, batch, chunk, fun}, %DB{batch_ref: ref, batch: b} = state) do Logger.debug("DB handling BATCH") batch = b ++ batch - Process.send_after(self(), :process_batch, 0) - {:noreply, %DB{state | batch: batch, batch_chunk: chunk, batch_fun: fun}} + + ref = + case ref do + nil -> Process.send_after(self(), :process_batch, 0) + r -> r + end + + {:noreply, %DB{state | batch_ref: ref, batch: batch, batch_chunk: chunk, batch_fun: fun}} end def handle_info( :process_batch, - %DB{db: db, batch_chunk: chunk, batch_fun: fun, batch: batch, last_batch: lb} = state + %DB{ + db: db, + batch_chunk: chunk, + batch_fun: fun, + batch: batch, + last_batch: lb + } = state ) do - {batch, lb} = handle_next_batch_chunk(db, batch, chunk, lb, fun) - {:noreply, %DB{state | batch: batch, last_batch: lb}} + {batch, lb, ref} = handle_next_batch_chunk(db, batch, chunk, lb, fun) + {:noreply, %DB{state | batch_ref: ref, batch: batch, last_batch: lb}} end defp handle_next_batch_chunk(db, batch, chunk, last_batch, fun) do @@ -96,12 +109,12 @@ defmodule ClusterKV.DB do case last_batch + chunk do lb when lb > length(batch) -> - {[], 0} + Logger.debug("Batch Processed") + {[], 0, nil} lb -> - Process.send_after(self(), :process_batch, 0) - Logger.debug("Batch Processed") - {batch, lb} + ref = Process.send_after(self(), :process_batch, 0) + {batch, lb, ref} end end