From dfa882afbba8a3f000e2ee9c401dbf5fb76041a5 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Christopher=20Cot=C3=A9?= Date: Wed, 27 Jan 2021 14:53:47 -0600 Subject: [PATCH] update batch upsert function --- lib/cluster_kv/db.ex | 2 +- lib/cluster_kv/ring.ex | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index 721414f..734e70c 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -73,7 +73,7 @@ defmodule ClusterKV.DB do chunk :: integer(), fun :: update_function() ) :: :ok - def batch(name, batch, chunk \\ 10, fun \\ &{elem(&1, 0), &2}) do + def batch(name, batch, chunk \\ 10, fun \\ &{elem(&1, 0), Enum.uniq([elem(&1, 1) | &2])}) do :poolboy.transaction(name, fn w -> GenServer.cast(w, {:batch, batch, chunk, fun}) end) diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 52267c1..198959e 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -478,7 +478,7 @@ defmodule ClusterKV.Ring do Logger.debug("Sending batches to: #{inspect(Map.keys(batches))}") Enum.each(batches, fn {node, batch} -> - Logger.debug("Sending #{length(batch)} reccords to #{inspect(node)}") + Logger.debug("Sending #{length(batch)} records to #{inspect(node)}") send({name, node}, {:handle_batch, batch}) end) end @@ -598,7 +598,13 @@ defmodule ClusterKV.Ring do end) end - @spec send_sync(name :: module(), nodes :: [node()], key :: String.t(), value :: any()) :: :ok + @spec send_update( + name :: module(), + nodes :: [node()], + key :: String.t(), + value :: any(), + fun :: any() + ) :: :ok defp send_update(name, nodes, key, value, fun) do Enum.each(nodes, fn n -> send({name, n}, {:update, key, value, fun}) -- 2.45.3