defstruct [:db, :batch_chunk, :batch_fun, :batch_ref, last_batch: 0, batch: []]
+ @type element :: {key :: String.t(), value :: any()}
+
@type t :: %__MODULE__{
db: :ets.tid() | atom(),
- batch: list(),
+ batch: [element()],
batch_chunk: integer(),
batch_fun: fun(),
batch_ref: reference(),
last_batch: integer()
}
- @spec get(name :: module(), key :: binary()) :: [tuple()]
+ @spec get(name :: module(), key :: binary(), timeout :: non_neg_integer() | :infinity) ::
+ element() | :not_found
def get(name, key, timeout \\ :infinity) do
GenServer.call(name, {:get, key}, timeout)
end
GenServer.cast(name, {:upsert, key, value, fun})
end
+ @spec batch(name :: module(), batch :: [element()], chunk :: integer(), fun :: fun()) :: :ok
def batch(name, batch, chunk \\ 10, fun \\ &{elem(&1, 0), &2}) do
GenServer.cast(name, {:batch, batch, chunk, fun})
end
+ @spec stream(name :: module(), timeout :: non_neg_integer() | :infinity) :: Enumerable.t()
def stream(name, timeout \\ :infinity) do
GenServer.call(name, :stream, timeout)
end
+ @spec start_link(name: module()) :: GenServer.on_start()
def start_link(name: name) do
GenServer.start_link(__MODULE__, name, name: name)
end
{:noreply, %DB{state | batch_ref: ref, batch: batch, last_batch: lb}}
end
+ @spec handle_next_batch_chunk(
+ db :: reference(),
+ batch :: [element()],
+ chunk :: integer(),
+ last_batch :: integer(),
+ fun :: fun()
+ ) :: {[element()], integer(), reference()}
defp handle_next_batch_chunk(db, batch, chunk, last_batch, fun) do
Logger.debug("processing batch of #{length(batch)} from #{last_batch} with chunk of #{chunk}")
end
end
+ @spec do_upsert(db :: module(), key :: String.t(), value :: any(), fun :: fun()) :: true
defp do_upsert(db, key, value, fun) do
case :ets.lookup(db, key) do
[] ->
end
end
+ @spec do_get(db :: module(), key :: String.t()) :: element() | :not_found
defp do_get(db, key) do
case :ets.lookup(db, key) do
[] -> :not_found
end
end
+ @spec get_stream(db :: module()) :: Enumerable.t()
defp get_stream(db) do
:ets.safe_fixtable(db, true)
use StatesLanguage, data: "priv/ring.json"
require Logger
alias StatesLanguage, as: SL
- alias ClusterKV.{Ring, DB}
+ alias ClusterKV.{DB, Requests, Ring}
@enforce_keys [:name, :replicas, :quorum, :db]
defstruct [
db: module(),
node: node(),
ring: HashRing.t(),
+ requests: [Requests.t()],
anti_entropy_interval: integer()
}
@init "Init"
@ready "Ready"
@quorum "Quorum"
- @anti_entropy "AntiEntropy"
# Resources
@handle_init "HandleInit"
@await_quorum "AwaitQuorum"
@handle_node_up "HandleNodeUp"
@handle_node_down "HandleNodeDown"
- @handle_anti_entropy "HandleAntiEntropy"
+ @spec put(name :: module(), key :: String.t(), value :: term()) :: :ok
def put(name, key, value) do
:gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value})
end
+ @spec put_wildcard(
+ name :: module(),
+ header :: String.t(),
+ key :: String.t(),
+ value :: term(),
+ split_on :: String.t(),
+ join :: String.t(),
+ wildcard :: String.t()
+ ) :: :ok
def put_wildcard(name, header, key, value, split_on, join, wildcard) do
:gen_statem.cast(
ClusterKV.ring_name(name),
)
end
+ @spec batch(name :: module(), batch :: [DB.element()]) :: :ok
def batch(name, batch) do
:gen_statem.cast(ClusterKV.ring_name(name), {:batch, batch})
end
+ @spec get(name :: module(), key :: String.t(), timeout :: non_neg_integer() | :infinity) ::
+ DB.element() | :not_found
def get(name, key, timeout) do
:gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout)
end
+ @spec prefix(
+ name :: module(),
+ key :: String.t(),
+ split_on :: String.t(),
+ min :: integer(),
+ timeout :: non_neg_integer() | :infinity
+ ) :: [DB.element()]
def prefix(name, key, split_on, min, timeout) do
:gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min}, timeout)
end
+ @spec wildcard(
+ name :: module(),
+ header :: String.t(),
+ key :: String.t(),
+ split_on :: String.t(),
+ join :: String.t(),
+ wildcard :: String.t(),
+ timeout :: non_neg_integer() | :infinity
+ ) :: [DB.element()]
def wildcard(name, header, key, split_on, join, wildcard, timeout) do
:gen_statem.call(
ClusterKV.ring_name(name),
)
end
+ @spec stream(name :: module(), timeout :: non_neg_integer() | :infinity) :: Enumerable.t()
def stream(name, timeout) do
:gen_statem.call(ClusterKV.ring_name(name), :stream, timeout)
end
Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
ring = HashRing.remove_node(r, node)
redistribute_data(n, me, db, r, ring, repls)
- reqs = remove_node_from_requests(reqs, node)
+ reqs = Requests.remove_node(reqs, node)
{requests, actions} =
Enum.reduce(reqs, {reqs, [{:next_event, :internal, :node_down}]}, fn {ref, _fr, _nodes,
{:ok, sl, []}
end
+ @spec get_prefix(
+ next :: [String.t()],
+ ring :: HashRing.t(),
+ me :: node(),
+ replicas :: non_neg_integer(),
+ name :: module(),
+ reference :: reference()
+ ) :: node()
defp get_prefix(next, r, me, repls, n, ref) do
prefix = Enum.join(next, ".")
node = get_node(prefix, r, me, repls)
node
end
+ @spec get_wildcard(
+ key :: String.t(),
+ parts :: [String.t()],
+ wildcard :: String.t(),
+ ring :: HashRing.t(),
+ me :: node(),
+ replicas :: non_neg_integer(),
+ name :: module(),
+ ref :: reference()
+ ) :: node()
defp get_wildcard(key, parts, wildcard, r, me, repls, n, ref) do
node = get_node(key, r, me, repls)
send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me})
node
end
+ @spec send_batch(name :: module(), batches :: map()) :: :ok
def send_batch(name, batches) do
Logger.debug("Sending batches to: #{inspect(Map.keys(batches))}")
end)
end
+ @spec send_wildcard(
+ parts :: [String.t()],
+ header :: String.t(),
+ wildcard :: String.t(),
+ join :: String.t(),
+ value :: any(),
+ name :: module(),
+ ring :: HashRing.t(),
+ replicas :: non_neg_integer()
+ ) :: [:ok]
def send_wildcard(parts, header, wildcard, join, value, name, ring, repls) do
parts
|> Enum.with_index()
end)
end
+ @spec redistribute_data(
+ name :: module(),
+ me :: node(),
+ db :: module(),
+ old_ring :: HashRing.t(),
+ new_ring :: HashRing.t(),
+ replicas :: non_neg_integer()
+ ) :: {:ok, pid()}
def redistribute_data(name, me, db, old_ring, new_ring, repls) do
Task.start(fn ->
batch =
end)
end
+ @spec maybe_reply(ref :: reference(), requests :: [Requests.t()], val :: any(), node :: node()) ::
+ {[Requests.t()], [:gen_statem.action()]}
defp maybe_reply(ref, reqs, val, node) do
- case get_request(ref, reqs) do
+ case Requests.get(ref, reqs) do
{_, from, [_], res} ->
- {remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]}
+ {Requests.remove(ref, reqs), [{:reply, from, get_final_value(res, val)}]}
{_, from, [], res} ->
- {remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]}
+ {Requests.remove(ref, reqs), [{:reply, from, get_final_value(res, val)}]}
_ ->
- {decrement_request(ref, reqs, val, node), []}
+ {Requests.decrement(ref, reqs, val, node), []}
end
end
+ @spec get_final_value(result :: [DB.element()], value :: any()) :: [DB.element()] | DB.element()
defp get_final_value([], val), do: val
defp get_final_value(res, val) when is_list(res),
_ -> true
end)
- defp get_request(ref, reqs) do
- Enum.find(reqs, fn
- {^ref, _, _, _} -> true
- _ -> false
- end)
- end
-
- defp remove_node_from_requests(reqs, node) do
- Enum.map(reqs, fn {ref, fr, nodes, res} ->
- {ref, fr, List.delete(nodes, node), res}
- end)
- end
-
- defp decrement_request(ref, reqs, val, from) do
- Enum.map(reqs, fn
- {^ref, f, c, res} -> {ref, f, List.delete(c, from), [val | res]}
- other -> other
- end)
- end
-
- defp remove_request(ref, requests) do
- Enum.filter(requests, fn
- {^ref, _, _, _} -> false
- {_, _, _, _} -> true
- end)
- end
-
+ @spec send_sync(name :: module(), nodes :: [node()], key :: String.t(), value :: any()) :: :ok
defp send_sync(name, nodes, key, value) do
Enum.each(nodes, fn n ->
send({name, n}, {:sync, key, value})
end)
end
+ @spec get_existing_nodes(ring :: HashRing.t()) :: HashRing.t()
defp get_existing_nodes(ring) do
Enum.reduce(Node.list(), ring, fn n, acc ->
HashRing.add_node(acc, n)
end)
end
+ @spec get_node(
+ key :: String.t(),
+ ring :: HashRing.t(),
+ node :: node(),
+ replicas :: non_neg_integer()
+ ) ::
+ node()
defp get_node(key, r, node, repls) do
nodes = HashRing.key_to_nodes(r, key, repls)