]> Entropealabs - cluster_kv.git/commitdiff
initial commit
authorChristopher <chris@entropealabs.com>
Fri, 6 Mar 2020 22:24:07 +0000 (16:24 -0600)
committerChristopher <chris@entropealabs.com>
Fri, 6 Mar 2020 22:24:07 +0000 (16:24 -0600)
12 files changed:
.formatter.exs [new file with mode: 0644]
.gitignore [new file with mode: 0644]
.tool-versions [new file with mode: 0644]
README.md [new file with mode: 0644]
lib/cluster_kv.ex [new file with mode: 0644]
lib/cluster_kv/db.ex [new file with mode: 0644]
lib/cluster_kv/ring.ex [new file with mode: 0644]
mix.exs [new file with mode: 0644]
mix.lock [new file with mode: 0644]
priv/ring.json [new file with mode: 0644]
test/cluster_kv_test.exs [new file with mode: 0644]
test/test_helper.exs [new file with mode: 0644]

diff --git a/.formatter.exs b/.formatter.exs
new file mode 100644 (file)
index 0000000..d2cda26
--- /dev/null
@@ -0,0 +1,4 @@
+# Used by "mix format"
+[
+  inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
+]
diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..27f433a
--- /dev/null
@@ -0,0 +1,24 @@
+# The directory Mix will write compiled artifacts to.
+/_build/
+
+# If you run "mix test --cover", coverage assets end up here.
+/cover/
+
+# The directory Mix downloads your dependencies sources to.
+/deps/
+
+# Where third-party dependencies like ExDoc output generated docs.
+/doc/
+
+# Ignore .fetch files in case you like to edit your project deps locally.
+/.fetch
+
+# If the VM crashes, it generates a dump, let's ignore it too.
+erl_crash.dump
+
+# Also ignore archive artifacts (built via "mix archive.build").
+*.ez
+
+# Ignore package tarball (built via "mix hex.build").
+cluster_kv-*.tar
+
diff --git a/.tool-versions b/.tool-versions
new file mode 100644 (file)
index 0000000..30c5b80
--- /dev/null
@@ -0,0 +1,2 @@
+elixir 1.9.4-otp-22
+erlang 22.2.6
diff --git a/README.md b/README.md
new file mode 100644 (file)
index 0000000..f4dcfea
--- /dev/null
+++ b/README.md
@@ -0,0 +1,21 @@
+# ClusterKv
+
+**TODO: Add description**
+
+## Installation
+
+If [available in Hex](https://hex.pm/docs/publish), the package can be installed
+by adding `cluster_kv` to your list of dependencies in `mix.exs`:
+
+```elixir
+def deps do
+  [
+    {:cluster_kv, "~> 0.1.0"}
+  ]
+end
+```
+
+Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
+and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
+be found at [https://hexdocs.pm/cluster_kv](https://hexdocs.pm/cluster_kv).
+
diff --git a/lib/cluster_kv.ex b/lib/cluster_kv.ex
new file mode 100644 (file)
index 0000000..2d3e504
--- /dev/null
@@ -0,0 +1,50 @@
+defmodule ClusterKV do
+  @moduledoc """
+  Documentation for ClusterKV.
+  """
+  use Supervisor
+  alias ClusterKV.{Ring, DB}
+
+  @spec start_link(
+          name: atom(),
+          db_path: charlist() | String.t(),
+          topologies: list(),
+          replicas: integer(),
+          quorum: integer()
+        ) ::
+          {:ok, pid()}
+          | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
+
+  def start_link(
+        name: name,
+        db_path: db_path,
+        topologies: topologies,
+        replicas: replicas,
+        quorum: quorum
+      ) do
+    Supervisor.start_link(__MODULE__, {name, db_path, topologies, replicas, quorum}, name: name)
+  end
+
+  def init({name, db_path, topologies, replicas, quorum}) do
+    db = db_name(name)
+    ring = ring_name(name)
+    cluster_supervisor = cluster_supervisor_name(name)
+    ring_data = %Ring{name: ring, replicas: replicas, quorum: quorum, db: db}
+
+    children = [
+      {DB, [name: db, db_path: db_path]},
+      {Cluster.Supervisor, [topologies, [name: cluster_supervisor]]},
+      {Ring, [{:local, ring}, ring_data, []]}
+    ]
+
+    Supervisor.init(children, strategy: :one_for_one, max_restarts: 5, max_seconds: 10)
+  end
+
+  def db_name(name), do: Module.concat([name, DB])
+  def ring_name(name), do: Module.concat([name, Ring])
+  def cluster_supervisor_name(name), do: Module.concat([name, ClusterSupervisor])
+
+  defdelegate get(name, key), to: Ring
+  defdelegate put(name, key, value), to: Ring
+  defdelegate prefix(name, prefix), to: Ring
+end
diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex
new file mode 100644 (file)
index 0000000..628c91e
--- /dev/null
@@ -0,0 +1,76 @@
+defmodule ClusterKV.DB do
+  use GenServer
+  require Logger
+
+  alias __MODULE__
+
+  @db_options [
+    {:create_if_missing, true},
+    {:merge_operator, :erlang_merge_operator},
+    {:capped_prefix_extractor, 255}
+  ]
+
+  defstruct [:db]
+
+  @type t :: %__MODULE__{
+          db: :rocksdb.db_handle()
+        }
+
+  @spec get(name :: module(), key :: binary()) ::
+          {:ok, term()} | :not_found | {:error, {:corruption, String.t()}} | {:error, any()}
+  def get(name, key) do
+    GenServer.call(name, {:get, key})
+  end
+
+  @spec get_prefix(name :: module(), prefix :: binary()) ::
+          [{:ok, term()}] | :not_found | {:error, {:corruption, String.t()}} | {:error, any()}
+  def get_prefix(name, prefix) do
+    GenServer.call(name, {:get_prefix, prefix})
+  end
+
+  @spec put(name :: module(), key :: binary(), value :: term()) :: :ok
+  def put(name, key, value) do
+    GenServer.cast(name, {:put, key, value})
+  end
+
+  def start_link(name: name, db_path: db_path) do
+    GenServer.start_link(__MODULE__, db_path, name: name)
+  end
+
+  def init(db_path) when is_binary(db_path) do
+    init(to_charlist(db_path))
+  end
+
+  def init(db_path) when is_list(db_path) do
+    {:ok, db} = :rocksdb.open(db_path, @db_options)
+    {:ok, %DB{db: db}}
+  end
+
+  def handle_call({:get, key}, _from, %DB{db: db} = state) do
+    {:reply,
+     db
+     |> :rocksdb.get(key, [])
+     |> deserialize(), state}
+  end
+
+  def handle_call({:get_prefix, prefix}, _from, %DB{db: db} = state) do
+    {:reply,
+     db
+     |> :rocksdb.get(prefix, [])
+     |> deserialize(), state}
+  end
+
+  def handle_cast({:put, key, value}, %DB{db: db} = state) do
+    :rocksdb.put(db, key, serialize(value), [])
+    {:noreply, state}
+  end
+
+  def deserialize({:ok, val}), do: {:ok, :erlang.binary_to_term(val)}
+  def deserialize(other), do: other
+
+  def serialize(val), do: :erlang.term_to_binary(val)
+
+  def terminate(_, %{db: db}) do
+    :rocksdb.close(db)
+  end
+end
diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex
new file mode 100644 (file)
index 0000000..701bc94
--- /dev/null
@@ -0,0 +1,173 @@
+defmodule ClusterKV.Ring do
+  @external_resource "priv/ring.json"
+  use StatesLanguage, data: "priv/ring.json"
+  require Logger
+  alias StatesLanguage, as: SL
+  alias ClusterKV.{Ring, DB}
+
+  @enforce_keys [:name, :replicas, :quorum, :db]
+  defstruct [
+    :name,
+    :replicas,
+    :quorum,
+    :ring,
+    :db,
+    requests: [],
+    anti_entropy_interval: 5 * 60_000
+  ]
+
+  @type t :: %__MODULE__{
+          replicas: integer(),
+          quorum: integer(),
+          db: module(),
+          ring: HashRing.t(),
+          anti_entropy_interval: integer()
+        }
+
+  # States
+  @init "Init"
+  @ready "Ready"
+  @quorum_node_up "QuorumNodeUp"
+  @quorum_node_down "QuorumNodeDown"
+  @ready_node_up "ReadyNodeUp"
+  @ready_node_down "ReadyNodeDown"
+  @quorum "Quorum"
+  @anti_entropy "AntiEntropy"
+
+  # Resources
+  @handle_init "HandleInit"
+  @handle_ready "HandleReady"
+  @handle_node_up "HandleNodeUp"
+  @await_quorum "AwaitQuorum"
+  @handle_node_down "HandleNodeDown"
+  @handle_anti_entropy "HandleAntiEntropy"
+
+  def put(name, key, value) do
+    :gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value})
+  end
+
+  def get(name, key) do
+    :gen_statem.call(ClusterKV.ring_name(name), {:get, key})
+  end
+
+  def prefix(name, prefix) do
+    :gen_statem.call(ClusterKV.ring_name(name), {:prefix, prefix})
+  end
+
+  def handle_resource(@handle_init, _, @init, %SL{data: %Ring{} = data} = sl) do
+    ring =
+      Node.self()
+      |> HashRing.new()
+      |> get_existing_nodes()
+
+    :net_kernel.monitor_nodes(true, node_type: :all)
+    {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :initialized}]}
+  end
+
+  def handle_resource(@await_quorum, _, @quorum, %SL{data: %Ring{quorum: q, ring: r}} = sl) do
+    actions =
+      case length(HashRing.nodes(r)) >= q do
+        true -> [{:next_event, :internal, :quorum}]
+        false -> []
+      end
+
+    {:ok, sl, actions}
+  end
+
+  def handle_resource(@handle_ready, _, @ready, sl) do
+    Logger.info("Ready")
+    {:ok, sl, []}
+  end
+
+  def handle_resource(@handle_node_up, _, _, sl) do
+    {:ok, sl, [{:next_event, :internal, :node_added}]}
+  end
+
+  def handle_resource(@handle_node_down, _, _, sl) do
+    {:ok, sl, [{:next_event, :internal, :node_removed}]}
+  end
+
+  def handle_call(
+        {:get, key},
+        from,
+        @ready,
+        %SL{data: %Ring{name: n, requests: reqs, ring: r} = data} = sl
+      ) do
+    node = HashRing.key_to_node(r, key)
+    ref = make_ref()
+    dest = {n, node}
+    send(dest, {:get_key, key, ref, Node.self()})
+    {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from} | reqs]}}, []}
+  end
+
+  def handle_cast(
+        {:put, key, value},
+        @ready,
+        %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
+      ) do
+    nodes = HashRing.key_to_nodes(r, key, repls)
+    cast_sync(n, nodes, key, value)
+    {:ok, sl, []}
+  end
+
+  def handle_cast({:sync, keys}, @ready, %SL{data: %Ring{db: db}} = sl) do
+    Enum.each(keys, fn {key, value} ->
+      DB.put(db, key, value)
+    end)
+
+    {:ok, sl, []}
+  end
+
+  def handle_info({:get_key, key, ref, node}, @ready, %SL{data: %Ring{name: n, db: db}} = sl) do
+    val = DB.get(db, key)
+    send({n, node}, {:reply, val, ref})
+    {:ok, sl, []}
+  end
+
+  def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do
+    {_, from} = get_request(ref, reqs)
+
+    {:ok, %SL{sl | data: %Ring{data | requests: remove_request(ref, reqs)}},
+     [{:reply, from, val}]}
+  end
+
+  def handle_info({:nodeup, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do
+    Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
+
+    {:ok, %SL{sl | data: %Ring{data | ring: HashRing.add_node(r, node)}},
+     [{:next_event, :internal, :node_up}]}
+  end
+
+  def handle_info({:nodedown, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do
+    Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
+
+    {:ok, %SL{sl | data: %Ring{data | ring: HashRing.remove_node(r, node)}},
+     [{:next_event, :internal, :node_down}]}
+  end
+
+  defp get_request(ref, reqs) do
+    Enum.find(reqs, fn
+      {^ref, _} -> true
+      _ -> false
+    end)
+  end
+
+  defp remove_request(ref, requests) do
+    Enum.filter(requests, fn
+      {^ref, _} -> false
+      {_, _} -> true
+    end)
+  end
+
+  defp cast_sync(name, nodes, key, value) do
+    Enum.each(nodes, fn n ->
+      :gen_statem.cast({name, n}, {:sync, [{key, value}]})
+    end)
+  end
+
+  defp get_existing_nodes(ring) do
+    Enum.reduce(Node.list(), ring, fn n, acc ->
+      HashRing.add_node(acc, n)
+    end)
+  end
+end
diff --git a/mix.exs b/mix.exs
new file mode 100644 (file)
index 0000000..e8d9540
--- /dev/null
+++ b/mix.exs
@@ -0,0 +1,29 @@
+defmodule ClusterKv.MixProject do
+  use Mix.Project
+
+  def project do
+    [
+      app: :cluster_kv,
+      version: "0.1.0",
+      elixir: "~> 1.9",
+      start_permanent: Mix.env() == :prod,
+      deps: deps()
+    ]
+  end
+
+  # Run "mix help compile.app" to learn about applications.
+  def application do
+    [
+      extra_applications: [:logger]
+    ]
+  end
+
+  defp deps do
+    [
+      {:libcluster, "~> 3.2"},
+      {:libring, "~> 1.4"},
+      {:rocksdb, "~> 1.5"},
+      {:states_language, "~> 0.2"}
+    ]
+  end
+end
diff --git a/mix.lock b/mix.lock
new file mode 100644 (file)
index 0000000..b4f29f9
--- /dev/null
+++ b/mix.lock
@@ -0,0 +1,13 @@
+%{
+  "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"},
+  "elixpath": {:hex, :elixpath, "0.1.0", "f860e931db7bda6856dc68145694ca429643cc068ef30d7ff6b4096d4357963e", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "30ce06079b41f1f5216ea2cd11605cfe4c82239628555cb3fde9f10055a6eb67"},
+  "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},
+  "json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"},
+  "libcluster": {:hex, :libcluster, "3.2.0", "ed483eea1b960b89a4cee374898644930b28759226716cd7aeaf707976c5966d", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "350e7c2cd5311b9dfb0156d5f5778561e5833f90fabdaac3cd79ea022c4dc3dc"},
+  "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"},
+  "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"},
+  "rocksdb": {:hex, :rocksdb, "1.5.1", "016861fbfb1f76ec2c48f745fca1f6e140fc0f27adc8c389c4cabce4a79b5f2f", [:rebar3], [], "hexpm", "63db7dfd65082d61df92810f61bd934080c77bc57eb711b36bb56a8926bdf3b0"},
+  "states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"},
+  "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
+  "xema": {:hex, :xema, "0.11.0", "7b5118418633cffc27092110d02d4faeea938149dd3f6c64299e41e747067e80", [:mix], [{:conv_case, "~> 0.2.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "51491c9a953d65069d4b30aa2f70bc45ff99fd1bc3345bc72ce4e644d01ea14e"},
+}
diff --git a/priv/ring.json b/priv/ring.json
new file mode 100644 (file)
index 0000000..4a07b37
--- /dev/null
@@ -0,0 +1,112 @@
+{
+  "StartAt": "Init",
+  "Comment": "",
+  "States": {
+    "Init": {
+      "Type": "Task",
+      "Resource": "HandleInit",
+      "TransitionEvent": ":initialized",
+      "Catch": [],
+      "InputPath": "",
+      "OutputPath": "",
+      "ResourcePath": "",
+      "Next": "Quorum",
+      "End": false
+    },
+    "Ready": {
+      "Type": "Choice",
+      "Resource": "HandleReady",
+      "Choices": [
+        {
+          "StringEquals": ":node_up",
+          "Next": "ReadyNodeUp"
+        },
+        {
+          "StringEquals": ":node_down",
+          "Next": "ReadyNodeDown"
+        },
+        {
+          "StringEquals": ":anti_entropy",
+          "Next": "AntiEntropy"
+        }
+      ],
+      "InputPath": "",
+      "OutputPath": ""
+    },
+    "QuorumNodeUp": {
+      "Type": "Task",
+      "Resource": "HandleNodeUp",
+      "TransitionEvent": ":node_added",
+      "Catch": [],
+      "InputPath": "",
+      "OutputPath": "",
+      "ResourcePath": "",
+      "Next": "Quorum",
+      "End": false
+    },
+    "ReadyNodeUp": {
+      "Type": "Task",
+      "Resource": "HandleNodeUp",
+      "TransitionEvent": ":node_added",
+      "Catch": [],
+      "InputPath": "",
+      "OutputPath": "",
+      "ResourcePath": "",
+      "Next": "Ready",
+      "End": false
+    },
+    "Quorum": {
+      "Type": "Choice",
+      "Resource": "AwaitQuorum",
+      "Choices": [
+        {
+          "StringEquals": ":quorum",
+          "Next": "Ready"
+        },
+        {
+          "StringEquals": ":node_up",
+          "Next": "QuorumNodeUp"
+        },
+        {
+          "StringEquals": ":node_down",
+          "Next": "QuorumNodeDown"
+        }
+      ],
+      "InputPath": "",
+      "OutputPath": ""
+    },
+    "ReadyNodeDown": {
+      "Type": "Task",
+      "Resource": "HandleNodeDown",
+      "TransitionEvent": ":node_removed",
+      "Catch": [],
+      "InputPath": "",
+      "OutputPath": "",
+      "ResourcePath": "",
+      "Next": "Quorum",
+      "End": false
+    },
+    "QuorumNodeDown": {
+      "Type": "Task",
+      "Resource": "HandleNodeDown",
+      "TransitionEvent": ":node_removed",
+      "Catch": [],
+      "InputPath": "",
+      "OutputPath": "",
+      "ResourcePath": "",
+      "Next": "Quorum",
+      "End": false
+    },
+    "AntiEntropy": {
+      "Type": "Task",
+      "Resource": "HandleAntiEntropy",
+      "TransitionEvent": ":done",
+      "Catch": [],
+      "InputPath": "",
+      "OutputPath": "",
+      "ResourcePath": "",
+      "Next": "Ready",
+      "End": false
+    }
+  }
+}
diff --git a/test/cluster_kv_test.exs b/test/cluster_kv_test.exs
new file mode 100644 (file)
index 0000000..431dddf
--- /dev/null
@@ -0,0 +1,8 @@
+defmodule ClusterKvTest do
+  use ExUnit.Case
+  doctest ClusterKv
+
+  test "greets the world" do
+    assert ClusterKv.hello() == :world
+  end
+end
diff --git a/test/test_helper.exs b/test/test_helper.exs
new file mode 100644 (file)
index 0000000..869559e
--- /dev/null
@@ -0,0 +1 @@
+ExUnit.start()