@db_options [:set]
- defstruct [:db, :batch_chunk, :batch_fun, last_batch: 0, batch: []]
+ defstruct [:db, :batch_chunk, :batch_fun, :batch_ref, last_batch: 0, batch: []]
@type t :: %__MODULE__{
db: :ets.tid() | atom(),
batch: list(),
batch_chunk: integer(),
batch_fun: fun(),
+ batch_ref: reference(),
last_batch: integer()
}
{:noreply, state}
end
- def handle_cast({:batch, batch, chunk, fun}, %DB{batch: b} = state) do
+ def handle_cast({:batch, batch, chunk, fun}, %DB{batch_ref: ref, batch: b} = state) do
Logger.debug("DB handling BATCH")
batch = b ++ batch
- Process.send_after(self(), :process_batch, 0)
- {:noreply, %DB{state | batch: batch, batch_chunk: chunk, batch_fun: fun}}
+
+ ref =
+ case ref do
+ nil -> Process.send_after(self(), :process_batch, 0)
+ r -> r
+ end
+
+ {:noreply, %DB{state | batch_ref: ref, batch: batch, batch_chunk: chunk, batch_fun: fun}}
end
def handle_info(
:process_batch,
- %DB{db: db, batch_chunk: chunk, batch_fun: fun, batch: batch, last_batch: lb} = state
+ %DB{
+ db: db,
+ batch_chunk: chunk,
+ batch_fun: fun,
+ batch: batch,
+ last_batch: lb
+ } = state
) do
- {batch, lb} = handle_next_batch_chunk(db, batch, chunk, lb, fun)
- {:noreply, %DB{state | batch: batch, last_batch: lb}}
+ {batch, lb, ref} = handle_next_batch_chunk(db, batch, chunk, lb, fun)
+ {:noreply, %DB{state | batch_ref: ref, batch: batch, last_batch: lb}}
end
defp handle_next_batch_chunk(db, batch, chunk, last_batch, fun) do
case last_batch + chunk do
lb when lb > length(batch) ->
- {[], 0}
+ Logger.debug("Batch Processed")
+ {[], 0, nil}
lb ->
- Process.send_after(self(), :process_batch, 0)
- Logger.debug("Batch Processed")
- {batch, lb}
+ ref = Process.send_after(self(), :process_batch, 0)
+ {batch, lb, ref}
end
end