@handle_node_up "HandleNodeUp"
@handle_node_down "HandleNodeDown"
- @spec put(name :: module(), key :: String.t(), value :: term()) :: :ok
- def put(name, key, value) do
- :gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value})
+ @spec put(name :: module(), keyspace :: String.t(), key :: String.t(), value :: term()) :: :ok
+ def put(name, keyspace, key, value) do
+ :gen_statem.cast(ClusterKV.ring_name(name), {:put, keyspace, key, value})
end
- @spec update(name :: module(), key :: String.t(), value :: any(), fun :: fun()) :: :ok
- def update(name, key, value, fun) do
- :gen_statem.cast(ClusterKV.ring_name(name), {:update, key, value, fun})
+ @spec update(
+ name :: module(),
+ keyspace :: String.t(),
+ key :: String.t(),
+ value :: any(),
+ fun :: fun()
+ ) :: :ok
+ def update(name, keyspace, key, value, fun) do
+ :gen_statem.cast(ClusterKV.ring_name(name), {:update, keyspace, key, value, fun})
end
@spec put_wildcard(
name :: module(),
- header :: String.t(),
+ keyspace :: String.t(),
key :: String.t(),
value :: term(),
split_on :: String.t(),
join :: String.t(),
wildcard :: String.t()
) :: :ok
- def put_wildcard(name, header, key, value, split_on, join, wildcard) do
+ def put_wildcard(name, keyspace, key, value, split_on, join, wildcard) do
:gen_statem.cast(
ClusterKV.ring_name(name),
- {:put_wildcard, header, key, value, split_on, join, wildcard}
+ {:put_wildcard, keyspace, key, value, split_on, join, wildcard}
)
end
- @spec batch(name :: module(), batch :: [DB.element()]) :: :ok
- def batch(name, batch) do
- :gen_statem.cast(ClusterKV.ring_name(name), {:batch, batch})
+ @spec batch(name :: module(), keyspace :: String.t(), batch :: [DB.element()]) :: :ok
+ def batch(name, keyspace, batch) do
+ :gen_statem.cast(ClusterKV.ring_name(name), {:batch, keyspace, batch})
end
- @spec get(name :: module(), key :: String.t(), timeout :: non_neg_integer() | :infinity) ::
+ @spec get(
+ name :: module(),
+ keyspace :: String.t(),
+ key :: String.t(),
+ timeout :: non_neg_integer() | :infinity
+ ) ::
DB.element() | :not_found
- def get(name, key, timeout) do
- :gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout)
+ def get(name, keyspace, key, timeout) do
+ :gen_statem.call(ClusterKV.ring_name(name), {:get, keyspace, key}, timeout)
end
@spec prefix(
name :: module(),
+ keyspace :: String.t(),
key :: String.t(),
split_on :: String.t(),
min :: integer(),
timeout :: non_neg_integer() | :infinity
) :: [DB.element()]
- def prefix(name, key, split_on, min, timeout) do
- :gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min}, timeout)
+ def prefix(name, keyspace, key, split_on, min, timeout) do
+ :gen_statem.call(ClusterKV.ring_name(name), {:prefix, keyspace, key, split_on, min}, timeout)
end
@spec wildcard(
name :: module(),
- header :: String.t(),
+ keyspace :: String.t(),
key :: String.t(),
split_on :: String.t(),
join :: String.t(),
wildcard :: String.t(),
timeout :: non_neg_integer() | :infinity
) :: [DB.element()]
- def wildcard(name, header, key, split_on, join, wildcard, timeout) do
+ def wildcard(name, keyspace, key, split_on, join, wildcard, timeout) do
:gen_statem.call(
ClusterKV.ring_name(name),
- {:wildcard, header, key, split_on, join, wildcard},
+ {:wildcard, keyspace, key, split_on, join, wildcard},
timeout
)
end
end
def handle_call(
- {:get, key},
+ {:get, keyspace, key},
from,
@ready,
%SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl
) do
+ key = "#{keyspace}:#{key}"
node = get_node(key, r, me, repls)
ref = make_ref()
send({n, node}, {:get_key, key, ref, me})
{:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, [node], []} | reqs]}}, []}
end
- def handle_call({:get, _key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
+ def handle_call({:get, _keyspace, _key}, from, _, sl),
+ do: {:ok, sl, [{:reply, from, :no_quorum}]}
def handle_call(
- {:prefix, key, split_on, min},
+ {:prefix, keyspace, key, split_on, min},
from,
@ready,
%SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl
) do
ref = make_ref()
+ key = "#{keyspace}:#{key}"
parts = String.split(key, split_on)
head = Enum.slice(parts, 0..(min - 1))
itr = Enum.slice(parts, min..-2)
{:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, nodes, []} | reqs]}}, []}
end
- def handle_call({:prefix, _, _, _}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
+ def handle_call({:prefix, _, _, _, _}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
def handle_call(
- {:wildcard, header, key, split_on, join, wildcard},
+ {:wildcard, keyspace, key, split_on, join, wildcard},
from,
@ready,
%SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl
nodes =
Enum.map(Enum.with_index(parts), fn {k, i} ->
- k = Enum.join([header, k, i], join)
+ k = Enum.join([keyspace, k, i], join)
get_wildcard(k, parts, wildcard, r, me, repls, n, ref)
end)
def handle_call(_, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
def handle_cast(
- {:batch, batch},
+ {:batch, keyspace, batch},
_,
%SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
) do
batches =
- Enum.reduce(batch, %{}, fn {k, _v} = p, acc ->
+ Enum.reduce(batch, %{}, fn {k, v}, acc ->
+ k = "#{keyspace}:#{k}"
nodes = HashRing.key_to_nodes(r, k, repls)
+ el = {k, v}
Enum.reduce(nodes, acc, fn n, a ->
- Map.update(a, n, [p], &[p | &1])
+ Map.update(a, n, [el], &[el | &1])
end)
end)
end
def handle_cast(
- {:put_wildcard, header, key, value, split_on, join, wildcard},
+ {:put_wildcard, keyspace, key, value, split_on, join, wildcard},
_,
%SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
) do
key
|> String.split(split_on)
- |> send_wildcard(header, wildcard, join, value, n, r, repls)
+ |> send_wildcard(keyspace, wildcard, join, value, n, r, repls)
{:ok, sl, []}
end
def handle_cast(
- {:put, key, value},
+ {:put, keyspace, key, value},
_,
%SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
) do
+ key = "#{keyspace}:#{key}"
nodes = HashRing.key_to_nodes(r, key, repls)
send_sync(n, nodes, key, value)
{:ok, sl, []}
end
def handle_cast(
- {:update, key, value, fun},
+ {:update, keyspace, key, value, fun},
_,
%SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
) do
+ key = "#{keyspace}:#{key}"
nodes = HashRing.key_to_nodes(r, key, repls)
send_update(n, nodes, key, value, fun)
{:ok, sl, []}
end
- def handle_cast({:put, _key, _value}, _, sl), do: {:ok, sl, []}
+ def handle_cast({:put, _keyspace, _key, _value}, _, sl), do: {:ok, sl, []}
def handle_cast(_, _, sl), do: {:ok, sl, []}
end
def handle_info({:get_key, key, ref, node}, _, %SL{data: %Ring{name: n, db: db, node: me}} = sl) do
- val = DB.get(db, key)
- send({n, node}, {:reply, val, ref, me})
+ send({n, node}, {:reply, get_key(db, key), ref, me})
{:ok, sl, []}
end
%SL{data: %Ring{name: n, db: db, node: me}} = sl
) do
val =
- case DB.get(db, key) do
+ case get_key(db, key) do
:not_found ->
:not_found
node
end
+ def get_key(db, key) do
+ case DB.get(db, key) do
+ :not_found ->
+ :not_found
+
+ {id, val} ->
+ [_keyspace, key] = String.split(id, ":", parts: 2)
+ {key, val}
+ end
+ end
+
@spec get_wildcard(
key :: String.t(),
parts :: [String.t()],
_ -> true
end)
- @spec send_sync(name :: module(), nodes :: [node()], key :: String.t(), value :: any()) :: :ok
+ @spec send_sync(
+ name :: module(),
+ nodes :: [node()],
+ key :: {String.t(), String.t()},
+ value :: any()
+ ) :: :ok
defp send_sync(name, nodes, key, value) do
Enum.each(nodes, fn n ->
send({name, n}, {:sync, key, value})