From ca564fad7e4b00e4978b9d33cc25ce23bb36bc9f Mon Sep 17 00:00:00 2001 From: Christopher Date: Mon, 9 Mar 2020 12:32:27 -0500 Subject: [PATCH] more specs and dialyzer pass --- lib/cluster_kv/db.ex | 20 +++++- lib/cluster_kv/requests.ex | 46 ++++++++++++++ lib/cluster_kv/ring.ex | 121 ++++++++++++++++++++++++++----------- mix.exs | 1 + mix.lock | 1 + 5 files changed, 152 insertions(+), 37 deletions(-) create mode 100644 lib/cluster_kv/requests.ex diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index 74391c5..8f1e17f 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -8,16 +8,19 @@ defmodule ClusterKV.DB do defstruct [:db, :batch_chunk, :batch_fun, :batch_ref, last_batch: 0, batch: []] + @type element :: {key :: String.t(), value :: any()} + @type t :: %__MODULE__{ db: :ets.tid() | atom(), - batch: list(), + batch: [element()], batch_chunk: integer(), batch_fun: fun(), batch_ref: reference(), last_batch: integer() } - @spec get(name :: module(), key :: binary()) :: [tuple()] + @spec get(name :: module(), key :: binary(), timeout :: non_neg_integer() | :infinity) :: + element() | :not_found def get(name, key, timeout \\ :infinity) do GenServer.call(name, {:get, key}, timeout) end @@ -32,14 +35,17 @@ defmodule ClusterKV.DB do GenServer.cast(name, {:upsert, key, value, fun}) end + @spec batch(name :: module(), batch :: [element()], chunk :: integer(), fun :: fun()) :: :ok def batch(name, batch, chunk \\ 10, fun \\ &{elem(&1, 0), &2}) do GenServer.cast(name, {:batch, batch, chunk, fun}) end + @spec stream(name :: module(), timeout :: non_neg_integer() | :infinity) :: Enumerable.t() def stream(name, timeout \\ :infinity) do GenServer.call(name, :stream, timeout) end + @spec start_link(name: module()) :: GenServer.on_start() def start_link(name: name) do GenServer.start_link(__MODULE__, name, name: name) end @@ -98,6 +104,13 @@ defmodule ClusterKV.DB do {:noreply, %DB{state | batch_ref: ref, batch: batch, last_batch: lb}} end + @spec handle_next_batch_chunk( + db :: reference(), + batch :: [element()], + chunk :: integer(), + last_batch :: integer(), + fun :: fun() + ) :: {[element()], integer(), reference()} defp handle_next_batch_chunk(db, batch, chunk, last_batch, fun) do Logger.debug("processing batch of #{length(batch)} from #{last_batch} with chunk of #{chunk}") @@ -118,6 +131,7 @@ defmodule ClusterKV.DB do end end + @spec do_upsert(db :: module(), key :: String.t(), value :: any(), fun :: fun()) :: true defp do_upsert(db, key, value, fun) do case :ets.lookup(db, key) do [] -> @@ -138,6 +152,7 @@ defmodule ClusterKV.DB do end end + @spec do_get(db :: module(), key :: String.t()) :: element() | :not_found defp do_get(db, key) do case :ets.lookup(db, key) do [] -> :not_found @@ -145,6 +160,7 @@ defmodule ClusterKV.DB do end end + @spec get_stream(db :: module()) :: Enumerable.t() defp get_stream(db) do :ets.safe_fixtable(db, true) diff --git a/lib/cluster_kv/requests.ex b/lib/cluster_kv/requests.ex new file mode 100644 index 0000000..62c1c1b --- /dev/null +++ b/lib/cluster_kv/requests.ex @@ -0,0 +1,46 @@ +defmodule ClusterKV.Requests do + @moduledoc """ + Type and functions for working with requests + """ + alias ClusterKV.DB + + @type t :: + {reference :: reference(), from :: GenServer.from(), nodes :: [node()], + responses :: [DB.element()]} + + @spec get(reference :: reference(), requests :: [t()]) :: t() | nil + def get(ref, reqs) do + Enum.find(reqs, fn + {^ref, _, _, _} -> true + _ -> false + end) + end + + @spec remove_node(requests :: [t()], node :: node()) :: [t()] + def remove_node(reqs, node) do + Enum.map(reqs, fn {ref, fr, nodes, res} -> + {ref, fr, List.delete(nodes, node), res} + end) + end + + @spec decrement( + reference :: reference(), + requests :: [t()], + value :: any(), + from :: node() + ) :: [t()] + def decrement(ref, reqs, val, from) do + Enum.map(reqs, fn + {^ref, f, c, res} -> {ref, f, List.delete(c, from), [val | res]} + other -> other + end) + end + + @spec remove(reference :: reference(), requests :: [t()]) :: [t()] + def remove(ref, requests) do + Enum.filter(requests, fn + {^ref, _, _, _} -> false + {_, _, _, _} -> true + end) + end +end diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 2924bf8..ac6bc31 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -3,7 +3,7 @@ defmodule ClusterKV.Ring do use StatesLanguage, data: "priv/ring.json" require Logger alias StatesLanguage, as: SL - alias ClusterKV.{Ring, DB} + alias ClusterKV.{DB, Requests, Ring} @enforce_keys [:name, :replicas, :quorum, :db] defstruct [ @@ -23,6 +23,7 @@ defmodule ClusterKV.Ring do db: module(), node: node(), ring: HashRing.t(), + requests: [Requests.t()], anti_entropy_interval: integer() } @@ -30,7 +31,6 @@ defmodule ClusterKV.Ring do @init "Init" @ready "Ready" @quorum "Quorum" - @anti_entropy "AntiEntropy" # Resources @handle_init "HandleInit" @@ -38,12 +38,21 @@ defmodule ClusterKV.Ring do @await_quorum "AwaitQuorum" @handle_node_up "HandleNodeUp" @handle_node_down "HandleNodeDown" - @handle_anti_entropy "HandleAntiEntropy" + @spec put(name :: module(), key :: String.t(), value :: term()) :: :ok def put(name, key, value) do :gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value}) end + @spec put_wildcard( + name :: module(), + header :: String.t(), + key :: String.t(), + value :: term(), + split_on :: String.t(), + join :: String.t(), + wildcard :: String.t() + ) :: :ok def put_wildcard(name, header, key, value, split_on, join, wildcard) do :gen_statem.cast( ClusterKV.ring_name(name), @@ -51,18 +60,37 @@ defmodule ClusterKV.Ring do ) end + @spec batch(name :: module(), batch :: [DB.element()]) :: :ok def batch(name, batch) do :gen_statem.cast(ClusterKV.ring_name(name), {:batch, batch}) end + @spec get(name :: module(), key :: String.t(), timeout :: non_neg_integer() | :infinity) :: + DB.element() | :not_found def get(name, key, timeout) do :gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout) end + @spec prefix( + name :: module(), + key :: String.t(), + split_on :: String.t(), + min :: integer(), + timeout :: non_neg_integer() | :infinity + ) :: [DB.element()] def prefix(name, key, split_on, min, timeout) do :gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min}, timeout) end + @spec wildcard( + name :: module(), + header :: String.t(), + key :: String.t(), + split_on :: String.t(), + join :: String.t(), + wildcard :: String.t(), + timeout :: non_neg_integer() | :infinity + ) :: [DB.element()] def wildcard(name, header, key, split_on, join, wildcard, timeout) do :gen_statem.call( ClusterKV.ring_name(name), @@ -71,6 +99,7 @@ defmodule ClusterKV.Ring do ) end + @spec stream(name :: module(), timeout :: non_neg_integer() | :infinity) :: Enumerable.t() def stream(name, timeout) do :gen_statem.call(ClusterKV.ring_name(name), :stream, timeout) end @@ -297,7 +326,7 @@ defmodule ClusterKV.Ring do Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}") ring = HashRing.remove_node(r, node) redistribute_data(n, me, db, r, ring, repls) - reqs = remove_node_from_requests(reqs, node) + reqs = Requests.remove_node(reqs, node) {requests, actions} = Enum.reduce(reqs, {reqs, [{:next_event, :internal, :node_down}]}, fn {ref, _fr, _nodes, @@ -321,6 +350,14 @@ defmodule ClusterKV.Ring do {:ok, sl, []} end + @spec get_prefix( + next :: [String.t()], + ring :: HashRing.t(), + me :: node(), + replicas :: non_neg_integer(), + name :: module(), + reference :: reference() + ) :: node() defp get_prefix(next, r, me, repls, n, ref) do prefix = Enum.join(next, ".") node = get_node(prefix, r, me, repls) @@ -328,12 +365,23 @@ defmodule ClusterKV.Ring do node end + @spec get_wildcard( + key :: String.t(), + parts :: [String.t()], + wildcard :: String.t(), + ring :: HashRing.t(), + me :: node(), + replicas :: non_neg_integer(), + name :: module(), + ref :: reference() + ) :: node() defp get_wildcard(key, parts, wildcard, r, me, repls, n, ref) do node = get_node(key, r, me, repls) send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me}) node end + @spec send_batch(name :: module(), batches :: map()) :: :ok def send_batch(name, batches) do Logger.debug("Sending batches to: #{inspect(Map.keys(batches))}") @@ -343,6 +391,16 @@ defmodule ClusterKV.Ring do end) end + @spec send_wildcard( + parts :: [String.t()], + header :: String.t(), + wildcard :: String.t(), + join :: String.t(), + value :: any(), + name :: module(), + ring :: HashRing.t(), + replicas :: non_neg_integer() + ) :: [:ok] def send_wildcard(parts, header, wildcard, join, value, name, ring, repls) do parts |> Enum.with_index() @@ -357,6 +415,14 @@ defmodule ClusterKV.Ring do end) end + @spec redistribute_data( + name :: module(), + me :: node(), + db :: module(), + old_ring :: HashRing.t(), + new_ring :: HashRing.t(), + replicas :: non_neg_integer() + ) :: {:ok, pid()} def redistribute_data(name, me, db, old_ring, new_ring, repls) do Task.start(fn -> batch = @@ -380,19 +446,22 @@ defmodule ClusterKV.Ring do end) end + @spec maybe_reply(ref :: reference(), requests :: [Requests.t()], val :: any(), node :: node()) :: + {[Requests.t()], [:gen_statem.action()]} defp maybe_reply(ref, reqs, val, node) do - case get_request(ref, reqs) do + case Requests.get(ref, reqs) do {_, from, [_], res} -> - {remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]} + {Requests.remove(ref, reqs), [{:reply, from, get_final_value(res, val)}]} {_, from, [], res} -> - {remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]} + {Requests.remove(ref, reqs), [{:reply, from, get_final_value(res, val)}]} _ -> - {decrement_request(ref, reqs, val, node), []} + {Requests.decrement(ref, reqs, val, node), []} end end + @spec get_final_value(result :: [DB.element()], value :: any()) :: [DB.element()] | DB.element() defp get_final_value([], val), do: val defp get_final_value(res, val) when is_list(res), @@ -404,45 +473,27 @@ defmodule ClusterKV.Ring do _ -> true end) - defp get_request(ref, reqs) do - Enum.find(reqs, fn - {^ref, _, _, _} -> true - _ -> false - end) - end - - defp remove_node_from_requests(reqs, node) do - Enum.map(reqs, fn {ref, fr, nodes, res} -> - {ref, fr, List.delete(nodes, node), res} - end) - end - - defp decrement_request(ref, reqs, val, from) do - Enum.map(reqs, fn - {^ref, f, c, res} -> {ref, f, List.delete(c, from), [val | res]} - other -> other - end) - end - - defp remove_request(ref, requests) do - Enum.filter(requests, fn - {^ref, _, _, _} -> false - {_, _, _, _} -> true - end) - end - + @spec send_sync(name :: module(), nodes :: [node()], key :: String.t(), value :: any()) :: :ok defp send_sync(name, nodes, key, value) do Enum.each(nodes, fn n -> send({name, n}, {:sync, key, value}) end) end + @spec get_existing_nodes(ring :: HashRing.t()) :: HashRing.t() defp get_existing_nodes(ring) do Enum.reduce(Node.list(), ring, fn n, acc -> HashRing.add_node(acc, n) end) end + @spec get_node( + key :: String.t(), + ring :: HashRing.t(), + node :: node(), + replicas :: non_neg_integer() + ) :: + node() defp get_node(key, r, node, repls) do nodes = HashRing.key_to_nodes(r, key, repls) diff --git a/mix.exs b/mix.exs index 1d69932..fd148ae 100644 --- a/mix.exs +++ b/mix.exs @@ -20,6 +20,7 @@ defmodule ClusterKv.MixProject do defp deps do [ + {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false}, {:libcluster, "~> 3.2"}, {:libring, "~> 1.4"}, {:states_language, "~> 0.2"} diff --git a/mix.lock b/mix.lock index a42d739..1600408 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,6 @@ %{ "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"}, + "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"}, "elixpath": {:hex, :elixpath, "0.1.0", "f860e931db7bda6856dc68145694ca429643cc068ef30d7ff6b4096d4357963e", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "30ce06079b41f1f5216ea2cd11605cfe4c82239628555cb3fde9f10055a6eb67"}, "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"}, "json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"}, -- 2.45.3