]> Entropealabs - cluster_kv.git/commitdiff
refinebaatch processing
authorChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 21:53:45 +0000 (16:53 -0500)
committerChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 21:53:45 +0000 (16:53 -0500)
lib/cluster_kv/db.ex

index 37b7eafb8b677148004a7762fac85dab667716b7..a864db884e55f8d1a19dfc4454c1a2accdbe118b 100644 (file)
@@ -6,13 +6,14 @@ defmodule ClusterKV.DB do
 
   @db_options [:set]
 
-  defstruct [:db, :batch_chunk, :batch_fun, last_batch: 0, batch: []]
+  defstruct [:db, :batch_chunk, :batch_fun, :batch_ref, last_batch: 0, batch: []]
 
   @type t :: %__MODULE__{
           db: :ets.tid() | atom(),
           batch: list(),
           batch_chunk: integer(),
           batch_fun: fun(),
+          batch_ref: reference(),
           last_batch: integer()
         }
 
@@ -70,19 +71,31 @@ defmodule ClusterKV.DB do
     {:noreply, state}
   end
 
-  def handle_cast({:batch, batch, chunk, fun}, %DB{batch: b} = state) do
+  def handle_cast({:batch, batch, chunk, fun}, %DB{batch_ref: ref, batch: b} = state) do
     Logger.debug("DB handling BATCH")
     batch = b ++ batch
-    Process.send_after(self(), :process_batch, 0)
-    {:noreply, %DB{state | batch: batch, batch_chunk: chunk, batch_fun: fun}}
+
+    ref =
+      case ref do
+        nil -> Process.send_after(self(), :process_batch, 0)
+        r -> r
+      end
+
+    {:noreply, %DB{state | batch_ref: ref, batch: batch, batch_chunk: chunk, batch_fun: fun}}
   end
 
   def handle_info(
         :process_batch,
-        %DB{db: db, batch_chunk: chunk, batch_fun: fun, batch: batch, last_batch: lb} = state
+        %DB{
+          db: db,
+          batch_chunk: chunk,
+          batch_fun: fun,
+          batch: batch,
+          last_batch: lb
+        } = state
       ) do
-    {batch, lb} = handle_next_batch_chunk(db, batch, chunk, lb, fun)
-    {:noreply, %DB{state | batch: batch, last_batch: lb}}
+    {batch, lb, ref} = handle_next_batch_chunk(db, batch, chunk, lb, fun)
+    {:noreply, %DB{state | batch_ref: ref, batch: batch, last_batch: lb}}
   end
 
   defp handle_next_batch_chunk(db, batch, chunk, last_batch, fun) do
@@ -96,12 +109,12 @@ defmodule ClusterKV.DB do
 
     case last_batch + chunk do
       lb when lb > length(batch) ->
-        {[], 0}
+        Logger.debug("Batch Processed")
+        {[], 0, nil}
 
       lb ->
-        Process.send_after(self(), :process_batch, 0)
-        Logger.debug("Batch Processed")
-        {batch, lb}
+        ref = Process.send_after(self(), :process_batch, 0)
+        {batch, lb, ref}
     end
   end