defdelegate get(name, key, timeout \\ :infinity), to: Ring
defdelegate put(name, key, value), to: Ring
+ defdelegate batch(name, batch), to: Ring
defdelegate put_wildcard(name, header, key, value, split_on, join, wildcard), to: Ring
defdelegate prefix(name, key, split_on, min, timeout \\ :infinity), to: Ring
defdelegate stream(name, timeout \\ :infinity), to: Ring
}
@spec get(name :: module(), key :: binary()) :: [tuple()]
- def get(name, key) do
- GenServer.call(name, {:get, key})
+ def get(name, key, timeout \\ :infinity) do
+ GenServer.call(name, {:get, key}, timeout)
end
@spec put(name :: module(), key :: binary(), value :: term()) :: :ok
GenServer.cast(name, {:upsert, key, value, fun})
end
- def stream(name) do
- GenServer.call(name, :stream)
+ def batch(name, batch, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do
+ GenServer.cast(name, {:batch, batch, fun})
+ end
+
+ def stream(name, timeout \\ :infinity) do
+ GenServer.call(name, :stream, timeout)
end
def start_link(name: name) do
end
def handle_cast({:upsert, key, value, fun}, %DB{db: db} = state) do
+ do_upsert(db, key, value, fun)
+ {:noreply, state}
+ end
+
+ def handle_cast({:batch, batch, fun}, %DB{db: db} = state) do
+ Enum.each(batch, fn {k, v} ->
+ :ets.insert(db, {k, [v]})
+ end)
+
+ {:noreply, state}
+ end
+
+ defp do_upsert(db, key, value, fun) do
case :ets.lookup(db, key) do
[] ->
:ets.insert(db, {key, [value]})
new = fun.(old, value)
1 = :ets.select_replace(db, [{old, [], [{:const, new}]}])
end
-
- {:noreply, state}
end
defp do_get(db, key) do
)
end
+ def batch(name, batch) do
+ :gen_statem.cast(ClusterKV.ring_name(name), {:batch, batch})
+ end
+
def get(name, key, timeout) do
:gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout)
end
def handle_call(_, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
+ def handle_cast(
+ {:batch, batch},
+ @ready,
+ %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
+ ) do
+ Logger.info("Batching...")
+
+ batches =
+ Enum.reduce(batch, %{}, fn {k, _v} = p, acc ->
+ nodes = HashRing.key_to_nodes(r, k, repls)
+ Logger.info("nodes")
+
+ Enum.reduce(nodes, acc, fn n, a ->
+ Map.update(a, n, [p], &[p | &1])
+ end)
+ end)
+
+ send_batch(n, batches)
+ {:ok, sl, []}
+ end
+
def handle_cast(
{:put_wildcard, header, key, value, split_on, join, wildcard},
@ready,
{:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
end
+ def handle_info({:handle_batch, batch}, @ready, %SL{data: %Ring{db: db}} = sl) do
+ DB.batch(db, batch)
+ {:ok, sl, []}
+ end
+
defp get_prefix(next, r, me, repls, n, ref) do
prefix = Enum.join(next, ".")
node = get_node(prefix, r, me, repls)
send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me})
end
+ def send_batch(name, batches) do
+ Logger.info("Sending batches: #{inspect(batches)}")
+
+ Enum.each(batches, fn {node, batch} ->
+ send({name, node}, {:handle_batch, batch})
+ end)
+ end
+
def send_wildcard(parts, header, wildcard, join, value, name, ring, repls) do
parts
|> Enum.with_index()