]> Entropealabs - cluster_kv.git/commitdiff
more specs and dialyzer pass
authorChristopher <chris@entropealabs.com>
Mon, 9 Mar 2020 17:32:27 +0000 (12:32 -0500)
committerChristopher <chris@entropealabs.com>
Mon, 9 Mar 2020 17:32:27 +0000 (12:32 -0500)
lib/cluster_kv/db.ex
lib/cluster_kv/requests.ex [new file with mode: 0644]
lib/cluster_kv/ring.ex
mix.exs
mix.lock

index 74391c5d6e4beb0715e0956d7eeb68f166ded97b..8f1e17fc20741ad3bf41a6233b8307fba61ef277 100644 (file)
@@ -8,16 +8,19 @@ defmodule ClusterKV.DB do
 
   defstruct [:db, :batch_chunk, :batch_fun, :batch_ref, last_batch: 0, batch: []]
 
+  @type element :: {key :: String.t(), value :: any()}
+
   @type t :: %__MODULE__{
           db: :ets.tid() | atom(),
-          batch: list(),
+          batch: [element()],
           batch_chunk: integer(),
           batch_fun: fun(),
           batch_ref: reference(),
           last_batch: integer()
         }
 
-  @spec get(name :: module(), key :: binary()) :: [tuple()]
+  @spec get(name :: module(), key :: binary(), timeout :: non_neg_integer() | :infinity) ::
+          element() | :not_found
   def get(name, key, timeout \\ :infinity) do
     GenServer.call(name, {:get, key}, timeout)
   end
@@ -32,14 +35,17 @@ defmodule ClusterKV.DB do
     GenServer.cast(name, {:upsert, key, value, fun})
   end
 
+  @spec batch(name :: module(), batch :: [element()], chunk :: integer(), fun :: fun()) :: :ok
   def batch(name, batch, chunk \\ 10, fun \\ &{elem(&1, 0), &2}) do
     GenServer.cast(name, {:batch, batch, chunk, fun})
   end
 
+  @spec stream(name :: module(), timeout :: non_neg_integer() | :infinity) :: Enumerable.t()
   def stream(name, timeout \\ :infinity) do
     GenServer.call(name, :stream, timeout)
   end
 
+  @spec start_link(name: module()) :: GenServer.on_start()
   def start_link(name: name) do
     GenServer.start_link(__MODULE__, name, name: name)
   end
@@ -98,6 +104,13 @@ defmodule ClusterKV.DB do
     {:noreply, %DB{state | batch_ref: ref, batch: batch, last_batch: lb}}
   end
 
+  @spec handle_next_batch_chunk(
+          db :: reference(),
+          batch :: [element()],
+          chunk :: integer(),
+          last_batch :: integer(),
+          fun :: fun()
+        ) :: {[element()], integer(), reference()}
   defp handle_next_batch_chunk(db, batch, chunk, last_batch, fun) do
     Logger.debug("processing batch of #{length(batch)} from #{last_batch} with chunk of #{chunk}")
 
@@ -118,6 +131,7 @@ defmodule ClusterKV.DB do
     end
   end
 
+  @spec do_upsert(db :: module(), key :: String.t(), value :: any(), fun :: fun()) :: true
   defp do_upsert(db, key, value, fun) do
     case :ets.lookup(db, key) do
       [] ->
@@ -138,6 +152,7 @@ defmodule ClusterKV.DB do
     end
   end
 
+  @spec do_get(db :: module(), key :: String.t()) :: element() | :not_found
   defp do_get(db, key) do
     case :ets.lookup(db, key) do
       [] -> :not_found
@@ -145,6 +160,7 @@ defmodule ClusterKV.DB do
     end
   end
 
+  @spec get_stream(db :: module()) :: Enumerable.t()
   defp get_stream(db) do
     :ets.safe_fixtable(db, true)
 
diff --git a/lib/cluster_kv/requests.ex b/lib/cluster_kv/requests.ex
new file mode 100644 (file)
index 0000000..62c1c1b
--- /dev/null
@@ -0,0 +1,46 @@
+defmodule ClusterKV.Requests do
+  @moduledoc """
+  Type and functions for working with requests
+  """
+  alias ClusterKV.DB
+
+  @type t ::
+          {reference :: reference(), from :: GenServer.from(), nodes :: [node()],
+           responses :: [DB.element()]}
+
+  @spec get(reference :: reference(), requests :: [t()]) :: t() | nil
+  def get(ref, reqs) do
+    Enum.find(reqs, fn
+      {^ref, _, _, _} -> true
+      _ -> false
+    end)
+  end
+
+  @spec remove_node(requests :: [t()], node :: node()) :: [t()]
+  def remove_node(reqs, node) do
+    Enum.map(reqs, fn {ref, fr, nodes, res} ->
+      {ref, fr, List.delete(nodes, node), res}
+    end)
+  end
+
+  @spec decrement(
+          reference :: reference(),
+          requests :: [t()],
+          value :: any(),
+          from :: node()
+        ) :: [t()]
+  def decrement(ref, reqs, val, from) do
+    Enum.map(reqs, fn
+      {^ref, f, c, res} -> {ref, f, List.delete(c, from), [val | res]}
+      other -> other
+    end)
+  end
+
+  @spec remove(reference :: reference(), requests :: [t()]) :: [t()]
+  def remove(ref, requests) do
+    Enum.filter(requests, fn
+      {^ref, _, _, _} -> false
+      {_, _, _, _} -> true
+    end)
+  end
+end
index 2924bf8764e5b8d926f19f8c8c98158832df0a50..ac6bc31a7e9c4cddb812a10c52e95bddc87e0581 100644 (file)
@@ -3,7 +3,7 @@ defmodule ClusterKV.Ring do
   use StatesLanguage, data: "priv/ring.json"
   require Logger
   alias StatesLanguage, as: SL
-  alias ClusterKV.{Ring, DB}
+  alias ClusterKV.{DB, Requests, Ring}
 
   @enforce_keys [:name, :replicas, :quorum, :db]
   defstruct [
@@ -23,6 +23,7 @@ defmodule ClusterKV.Ring do
           db: module(),
           node: node(),
           ring: HashRing.t(),
+          requests: [Requests.t()],
           anti_entropy_interval: integer()
         }
 
@@ -30,7 +31,6 @@ defmodule ClusterKV.Ring do
   @init "Init"
   @ready "Ready"
   @quorum "Quorum"
-  @anti_entropy "AntiEntropy"
 
   # Resources
   @handle_init "HandleInit"
@@ -38,12 +38,21 @@ defmodule ClusterKV.Ring do
   @await_quorum "AwaitQuorum"
   @handle_node_up "HandleNodeUp"
   @handle_node_down "HandleNodeDown"
-  @handle_anti_entropy "HandleAntiEntropy"
 
+  @spec put(name :: module(), key :: String.t(), value :: term()) :: :ok
   def put(name, key, value) do
     :gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value})
   end
 
+  @spec put_wildcard(
+          name :: module(),
+          header :: String.t(),
+          key :: String.t(),
+          value :: term(),
+          split_on :: String.t(),
+          join :: String.t(),
+          wildcard :: String.t()
+        ) :: :ok
   def put_wildcard(name, header, key, value, split_on, join, wildcard) do
     :gen_statem.cast(
       ClusterKV.ring_name(name),
@@ -51,18 +60,37 @@ defmodule ClusterKV.Ring do
     )
   end
 
+  @spec batch(name :: module(), batch :: [DB.element()]) :: :ok
   def batch(name, batch) do
     :gen_statem.cast(ClusterKV.ring_name(name), {:batch, batch})
   end
 
+  @spec get(name :: module(), key :: String.t(), timeout :: non_neg_integer() | :infinity) ::
+          DB.element() | :not_found
   def get(name, key, timeout) do
     :gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout)
   end
 
+  @spec prefix(
+          name :: module(),
+          key :: String.t(),
+          split_on :: String.t(),
+          min :: integer(),
+          timeout :: non_neg_integer() | :infinity
+        ) :: [DB.element()]
   def prefix(name, key, split_on, min, timeout) do
     :gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min}, timeout)
   end
 
+  @spec wildcard(
+          name :: module(),
+          header :: String.t(),
+          key :: String.t(),
+          split_on :: String.t(),
+          join :: String.t(),
+          wildcard :: String.t(),
+          timeout :: non_neg_integer() | :infinity
+        ) :: [DB.element()]
   def wildcard(name, header, key, split_on, join, wildcard, timeout) do
     :gen_statem.call(
       ClusterKV.ring_name(name),
@@ -71,6 +99,7 @@ defmodule ClusterKV.Ring do
     )
   end
 
+  @spec stream(name :: module(), timeout :: non_neg_integer() | :infinity) :: Enumerable.t()
   def stream(name, timeout) do
     :gen_statem.call(ClusterKV.ring_name(name), :stream, timeout)
   end
@@ -297,7 +326,7 @@ defmodule ClusterKV.Ring do
     Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
     ring = HashRing.remove_node(r, node)
     redistribute_data(n, me, db, r, ring, repls)
-    reqs = remove_node_from_requests(reqs, node)
+    reqs = Requests.remove_node(reqs, node)
 
     {requests, actions} =
       Enum.reduce(reqs, {reqs, [{:next_event, :internal, :node_down}]}, fn {ref, _fr, _nodes,
@@ -321,6 +350,14 @@ defmodule ClusterKV.Ring do
     {:ok, sl, []}
   end
 
+  @spec get_prefix(
+          next :: [String.t()],
+          ring :: HashRing.t(),
+          me :: node(),
+          replicas :: non_neg_integer(),
+          name :: module(),
+          reference :: reference()
+        ) :: node()
   defp get_prefix(next, r, me, repls, n, ref) do
     prefix = Enum.join(next, ".")
     node = get_node(prefix, r, me, repls)
@@ -328,12 +365,23 @@ defmodule ClusterKV.Ring do
     node
   end
 
+  @spec get_wildcard(
+          key :: String.t(),
+          parts :: [String.t()],
+          wildcard :: String.t(),
+          ring :: HashRing.t(),
+          me :: node(),
+          replicas :: non_neg_integer(),
+          name :: module(),
+          ref :: reference()
+        ) :: node()
   defp get_wildcard(key, parts, wildcard, r, me, repls, n, ref) do
     node = get_node(key, r, me, repls)
     send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me})
     node
   end
 
+  @spec send_batch(name :: module(), batches :: map()) :: :ok
   def send_batch(name, batches) do
     Logger.debug("Sending batches to: #{inspect(Map.keys(batches))}")
 
@@ -343,6 +391,16 @@ defmodule ClusterKV.Ring do
     end)
   end
 
+  @spec send_wildcard(
+          parts :: [String.t()],
+          header :: String.t(),
+          wildcard :: String.t(),
+          join :: String.t(),
+          value :: any(),
+          name :: module(),
+          ring :: HashRing.t(),
+          replicas :: non_neg_integer()
+        ) :: [:ok]
   def send_wildcard(parts, header, wildcard, join, value, name, ring, repls) do
     parts
     |> Enum.with_index()
@@ -357,6 +415,14 @@ defmodule ClusterKV.Ring do
     end)
   end
 
+  @spec redistribute_data(
+          name :: module(),
+          me :: node(),
+          db :: module(),
+          old_ring :: HashRing.t(),
+          new_ring :: HashRing.t(),
+          replicas :: non_neg_integer()
+        ) :: {:ok, pid()}
   def redistribute_data(name, me, db, old_ring, new_ring, repls) do
     Task.start(fn ->
       batch =
@@ -380,19 +446,22 @@ defmodule ClusterKV.Ring do
     end)
   end
 
+  @spec maybe_reply(ref :: reference(), requests :: [Requests.t()], val :: any(), node :: node()) ::
+          {[Requests.t()], [:gen_statem.action()]}
   defp maybe_reply(ref, reqs, val, node) do
-    case get_request(ref, reqs) do
+    case Requests.get(ref, reqs) do
       {_, from, [_], res} ->
-        {remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]}
+        {Requests.remove(ref, reqs), [{:reply, from, get_final_value(res, val)}]}
 
       {_, from, [], res} ->
-        {remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]}
+        {Requests.remove(ref, reqs), [{:reply, from, get_final_value(res, val)}]}
 
       _ ->
-        {decrement_request(ref, reqs, val, node), []}
+        {Requests.decrement(ref, reqs, val, node), []}
     end
   end
 
+  @spec get_final_value(result :: [DB.element()], value :: any()) :: [DB.element()] | DB.element()
   defp get_final_value([], val), do: val
 
   defp get_final_value(res, val) when is_list(res),
@@ -404,45 +473,27 @@ defmodule ClusterKV.Ring do
         _ -> true
       end)
 
-  defp get_request(ref, reqs) do
-    Enum.find(reqs, fn
-      {^ref, _, _, _} -> true
-      _ -> false
-    end)
-  end
-
-  defp remove_node_from_requests(reqs, node) do
-    Enum.map(reqs, fn {ref, fr, nodes, res} ->
-      {ref, fr, List.delete(nodes, node), res}
-    end)
-  end
-
-  defp decrement_request(ref, reqs, val, from) do
-    Enum.map(reqs, fn
-      {^ref, f, c, res} -> {ref, f, List.delete(c, from), [val | res]}
-      other -> other
-    end)
-  end
-
-  defp remove_request(ref, requests) do
-    Enum.filter(requests, fn
-      {^ref, _, _, _} -> false
-      {_, _, _, _} -> true
-    end)
-  end
-
+  @spec send_sync(name :: module(), nodes :: [node()], key :: String.t(), value :: any()) :: :ok
   defp send_sync(name, nodes, key, value) do
     Enum.each(nodes, fn n ->
       send({name, n}, {:sync, key, value})
     end)
   end
 
+  @spec get_existing_nodes(ring :: HashRing.t()) :: HashRing.t()
   defp get_existing_nodes(ring) do
     Enum.reduce(Node.list(), ring, fn n, acc ->
       HashRing.add_node(acc, n)
     end)
   end
 
+  @spec get_node(
+          key :: String.t(),
+          ring :: HashRing.t(),
+          node :: node(),
+          replicas :: non_neg_integer()
+        ) ::
+          node()
   defp get_node(key, r, node, repls) do
     nodes = HashRing.key_to_nodes(r, key, repls)
 
diff --git a/mix.exs b/mix.exs
index 1d6993269291be0f9e1326237b1c107f7bd9c5eb..fd148ae72327f5d933bbef5c43efc2f74b0e0bf5 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -20,6 +20,7 @@ defmodule ClusterKv.MixProject do
 
   defp deps do
     [
+      {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
       {:libcluster, "~> 3.2"},
       {:libring, "~> 1.4"},
       {:states_language, "~> 0.2"}
index a42d739f970e7cdf5d269ab4e599a30bc9591e6a..1600408a2365002a698611300d215f585ea73ca7 100644 (file)
--- a/mix.lock
+++ b/mix.lock
@@ -1,5 +1,6 @@
 %{
   "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"},
+  "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"},
   "elixpath": {:hex, :elixpath, "0.1.0", "f860e931db7bda6856dc68145694ca429643cc068ef30d7ff6b4096d4357963e", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "30ce06079b41f1f5216ea2cd11605cfe4c82239628555cb3fde9f10055a6eb67"},
   "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},
   "json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"},