From 28907bcc914807eb648c376892f4327ae3923fb4 Mon Sep 17 00:00:00 2001 From: Christopher Date: Sat, 7 Mar 2020 20:44:00 -0600 Subject: [PATCH] add batch endpoint --- lib/cluster_kv.ex | 1 + lib/cluster_kv/db.ex | 27 +++++++++++++++++++++------ lib/cluster_kv/ring.ex | 38 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/lib/cluster_kv.ex b/lib/cluster_kv.ex index d00aac0..71689a6 100644 --- a/lib/cluster_kv.ex +++ b/lib/cluster_kv.ex @@ -44,6 +44,7 @@ defmodule ClusterKV do defdelegate get(name, key, timeout \\ :infinity), to: Ring defdelegate put(name, key, value), to: Ring + defdelegate batch(name, batch), to: Ring defdelegate put_wildcard(name, header, key, value, split_on, join, wildcard), to: Ring defdelegate prefix(name, key, split_on, min, timeout \\ :infinity), to: Ring defdelegate stream(name, timeout \\ :infinity), to: Ring diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index f3be208..6012b93 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -13,8 +13,8 @@ defmodule ClusterKV.DB do } @spec get(name :: module(), key :: binary()) :: [tuple()] - def get(name, key) do - GenServer.call(name, {:get, key}) + def get(name, key, timeout \\ :infinity) do + GenServer.call(name, {:get, key}, timeout) end @spec put(name :: module(), key :: binary(), value :: term()) :: :ok @@ -27,8 +27,12 @@ defmodule ClusterKV.DB do GenServer.cast(name, {:upsert, key, value, fun}) end - def stream(name) do - GenServer.call(name, :stream) + def batch(name, batch, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do + GenServer.cast(name, {:batch, batch, fun}) + end + + def stream(name, timeout \\ :infinity) do + GenServer.call(name, :stream, timeout) end def start_link(name: name) do @@ -55,6 +59,19 @@ defmodule ClusterKV.DB do end def handle_cast({:upsert, key, value, fun}, %DB{db: db} = state) do + do_upsert(db, key, value, fun) + {:noreply, state} + end + + def handle_cast({:batch, batch, fun}, %DB{db: db} = state) do + Enum.each(batch, fn {k, v} -> + :ets.insert(db, {k, [v]}) + end) + + {:noreply, state} + end + + defp do_upsert(db, key, value, fun) do case :ets.lookup(db, key) do [] -> :ets.insert(db, {key, [value]}) @@ -63,8 +80,6 @@ defmodule ClusterKV.DB do new = fun.(old, value) 1 = :ets.select_replace(db, [{old, [], [{:const, new}]}]) end - - {:noreply, state} end defp do_get(db, key) do diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index b22deba..7bf732c 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -55,6 +55,10 @@ defmodule ClusterKV.Ring do ) end + def batch(name, batch) do + :gen_statem.cast(ClusterKV.ring_name(name), {:batch, batch}) + end + def get(name, key, timeout) do :gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout) end @@ -180,6 +184,27 @@ defmodule ClusterKV.Ring do def handle_call(_, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} + def handle_cast( + {:batch, batch}, + @ready, + %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl + ) do + Logger.info("Batching...") + + batches = + Enum.reduce(batch, %{}, fn {k, _v} = p, acc -> + nodes = HashRing.key_to_nodes(r, k, repls) + Logger.info("nodes") + + Enum.reduce(nodes, acc, fn n, a -> + Map.update(a, n, [p], &[p | &1]) + end) + end) + + send_batch(n, batches) + {:ok, sl, []} + end + def handle_cast( {:put_wildcard, header, key, value, split_on, join, wildcard}, @ready, @@ -274,6 +299,11 @@ defmodule ClusterKV.Ring do {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]} end + def handle_info({:handle_batch, batch}, @ready, %SL{data: %Ring{db: db}} = sl) do + DB.batch(db, batch) + {:ok, sl, []} + end + defp get_prefix(next, r, me, repls, n, ref) do prefix = Enum.join(next, ".") node = get_node(prefix, r, me, repls) @@ -285,6 +315,14 @@ defmodule ClusterKV.Ring do send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me}) end + def send_batch(name, batches) do + Logger.info("Sending batches: #{inspect(batches)}") + + Enum.each(batches, fn {node, batch} -> + send({name, node}, {:handle_batch, batch}) + end) + end + def send_wildcard(parts, header, wildcard, join, value, name, ring, repls) do parts |> Enum.with_index() -- 2.45.3