]> Entropealabs - cluster_kv.git/commitdiff
add batch endpoint
authorChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 02:44:00 +0000 (20:44 -0600)
committerChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 02:44:00 +0000 (20:44 -0600)
lib/cluster_kv.ex
lib/cluster_kv/db.ex
lib/cluster_kv/ring.ex

index d00aac0c00ff56bd364907df21448c52bb0a92ad..71689a60ee901fb04e6f676dcf8273cacedc1fb0 100644 (file)
@@ -44,6 +44,7 @@ defmodule ClusterKV do
 
   defdelegate get(name, key, timeout \\ :infinity), to: Ring
   defdelegate put(name, key, value), to: Ring
+  defdelegate batch(name, batch), to: Ring
   defdelegate put_wildcard(name, header, key, value, split_on, join, wildcard), to: Ring
   defdelegate prefix(name, key, split_on, min, timeout \\ :infinity), to: Ring
   defdelegate stream(name, timeout \\ :infinity), to: Ring
index f3be208f061a50858f7ced2b75de2f60801055d9..6012b93946c164259c2931f05d7b57e6abe6af7d 100644 (file)
@@ -13,8 +13,8 @@ defmodule ClusterKV.DB do
         }
 
   @spec get(name :: module(), key :: binary()) :: [tuple()]
-  def get(name, key) do
-    GenServer.call(name, {:get, key})
+  def get(name, key, timeout \\ :infinity) do
+    GenServer.call(name, {:get, key}, timeout)
   end
 
   @spec put(name :: module(), key :: binary(), value :: term()) :: :ok
@@ -27,8 +27,12 @@ defmodule ClusterKV.DB do
     GenServer.cast(name, {:upsert, key, value, fun})
   end
 
-  def stream(name) do
-    GenServer.call(name, :stream)
+  def batch(name, batch, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do
+    GenServer.cast(name, {:batch, batch, fun})
+  end
+
+  def stream(name, timeout \\ :infinity) do
+    GenServer.call(name, :stream, timeout)
   end
 
   def start_link(name: name) do
@@ -55,6 +59,19 @@ defmodule ClusterKV.DB do
   end
 
   def handle_cast({:upsert, key, value, fun}, %DB{db: db} = state) do
+    do_upsert(db, key, value, fun)
+    {:noreply, state}
+  end
+
+  def handle_cast({:batch, batch, fun}, %DB{db: db} = state) do
+    Enum.each(batch, fn {k, v} ->
+      :ets.insert(db, {k, [v]})
+    end)
+
+    {:noreply, state}
+  end
+
+  defp do_upsert(db, key, value, fun) do
     case :ets.lookup(db, key) do
       [] ->
         :ets.insert(db, {key, [value]})
@@ -63,8 +80,6 @@ defmodule ClusterKV.DB do
         new = fun.(old, value)
         1 = :ets.select_replace(db, [{old, [], [{:const, new}]}])
     end
-
-    {:noreply, state}
   end
 
   defp do_get(db, key) do
index b22deba0298a16232358d7ff3cf03adce0ce25bc..7bf732ce1dcb54d3fd920dadc83fb66bcaaddad5 100644 (file)
@@ -55,6 +55,10 @@ defmodule ClusterKV.Ring do
     )
   end
 
+  def batch(name, batch) do
+    :gen_statem.cast(ClusterKV.ring_name(name), {:batch, batch})
+  end
+
   def get(name, key, timeout) do
     :gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout)
   end
@@ -180,6 +184,27 @@ defmodule ClusterKV.Ring do
 
   def handle_call(_, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
 
+  def handle_cast(
+        {:batch, batch},
+        @ready,
+        %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
+      ) do
+    Logger.info("Batching...")
+
+    batches =
+      Enum.reduce(batch, %{}, fn {k, _v} = p, acc ->
+        nodes = HashRing.key_to_nodes(r, k, repls)
+        Logger.info("nodes")
+
+        Enum.reduce(nodes, acc, fn n, a ->
+          Map.update(a, n, [p], &[p | &1])
+        end)
+      end)
+
+    send_batch(n, batches)
+    {:ok, sl, []}
+  end
+
   def handle_cast(
         {:put_wildcard, header, key, value, split_on, join, wildcard},
         @ready,
@@ -274,6 +299,11 @@ defmodule ClusterKV.Ring do
     {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
   end
 
+  def handle_info({:handle_batch, batch}, @ready, %SL{data: %Ring{db: db}} = sl) do
+    DB.batch(db, batch)
+    {:ok, sl, []}
+  end
+
   defp get_prefix(next, r, me, repls, n, ref) do
     prefix = Enum.join(next, ".")
     node = get_node(prefix, r, me, repls)
@@ -285,6 +315,14 @@ defmodule ClusterKV.Ring do
     send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me})
   end
 
+  def send_batch(name, batches) do
+    Logger.info("Sending batches: #{inspect(batches)}")
+
+    Enum.each(batches, fn {node, batch} ->
+      send({name, node}, {:handle_batch, batch})
+    end)
+  end
+
   def send_wildcard(parts, header, wildcard, join, value, name, ring, repls) do
     parts
     |> Enum.with_index()