--- /dev/null
+# Used by "mix format"
+[
+ inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
+]
--- /dev/null
+# The directory Mix will write compiled artifacts to.
+/_build/
+
+# If you run "mix test --cover", coverage assets end up here.
+/cover/
+
+# The directory Mix downloads your dependencies sources to.
+/deps/
+
+# Where third-party dependencies like ExDoc output generated docs.
+/doc/
+
+# Ignore .fetch files in case you like to edit your project deps locally.
+/.fetch
+
+# If the VM crashes, it generates a dump, let's ignore it too.
+erl_crash.dump
+
+# Also ignore archive artifacts (built via "mix archive.build").
+*.ez
+
+# Ignore package tarball (built via "mix hex.build").
+cluster_kv-*.tar
+
--- /dev/null
+elixir 1.9.4-otp-22
+erlang 22.2.6
--- /dev/null
+# ClusterKv
+
+**TODO: Add description**
+
+## Installation
+
+If [available in Hex](https://hex.pm/docs/publish), the package can be installed
+by adding `cluster_kv` to your list of dependencies in `mix.exs`:
+
+```elixir
+def deps do
+ [
+ {:cluster_kv, "~> 0.1.0"}
+ ]
+end
+```
+
+Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
+and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
+be found at [https://hexdocs.pm/cluster_kv](https://hexdocs.pm/cluster_kv).
+
--- /dev/null
+defmodule ClusterKV do
+ @moduledoc """
+ Documentation for ClusterKV.
+ """
+ use Supervisor
+ alias ClusterKV.{Ring, DB}
+
+ @spec start_link(
+ name: atom(),
+ db_path: charlist() | String.t(),
+ topologies: list(),
+ replicas: integer(),
+ quorum: integer()
+ ) ::
+ {:ok, pid()}
+ | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
+
+ def start_link(
+ name: name,
+ db_path: db_path,
+ topologies: topologies,
+ replicas: replicas,
+ quorum: quorum
+ ) do
+ Supervisor.start_link(__MODULE__, {name, db_path, topologies, replicas, quorum}, name: name)
+ end
+
+ def init({name, db_path, topologies, replicas, quorum}) do
+ db = db_name(name)
+ ring = ring_name(name)
+ cluster_supervisor = cluster_supervisor_name(name)
+ ring_data = %Ring{name: ring, replicas: replicas, quorum: quorum, db: db}
+
+ children = [
+ {DB, [name: db, db_path: db_path]},
+ {Cluster.Supervisor, [topologies, [name: cluster_supervisor]]},
+ {Ring, [{:local, ring}, ring_data, []]}
+ ]
+
+ Supervisor.init(children, strategy: :one_for_one, max_restarts: 5, max_seconds: 10)
+ end
+
+ def db_name(name), do: Module.concat([name, DB])
+ def ring_name(name), do: Module.concat([name, Ring])
+ def cluster_supervisor_name(name), do: Module.concat([name, ClusterSupervisor])
+
+ defdelegate get(name, key), to: Ring
+ defdelegate put(name, key, value), to: Ring
+ defdelegate prefix(name, prefix), to: Ring
+end
--- /dev/null
+defmodule ClusterKV.DB do
+ use GenServer
+ require Logger
+
+ alias __MODULE__
+
+ @db_options [
+ {:create_if_missing, true},
+ {:merge_operator, :erlang_merge_operator},
+ {:capped_prefix_extractor, 255}
+ ]
+
+ defstruct [:db]
+
+ @type t :: %__MODULE__{
+ db: :rocksdb.db_handle()
+ }
+
+ @spec get(name :: module(), key :: binary()) ::
+ {:ok, term()} | :not_found | {:error, {:corruption, String.t()}} | {:error, any()}
+ def get(name, key) do
+ GenServer.call(name, {:get, key})
+ end
+
+ @spec get_prefix(name :: module(), prefix :: binary()) ::
+ [{:ok, term()}] | :not_found | {:error, {:corruption, String.t()}} | {:error, any()}
+ def get_prefix(name, prefix) do
+ GenServer.call(name, {:get_prefix, prefix})
+ end
+
+ @spec put(name :: module(), key :: binary(), value :: term()) :: :ok
+ def put(name, key, value) do
+ GenServer.cast(name, {:put, key, value})
+ end
+
+ def start_link(name: name, db_path: db_path) do
+ GenServer.start_link(__MODULE__, db_path, name: name)
+ end
+
+ def init(db_path) when is_binary(db_path) do
+ init(to_charlist(db_path))
+ end
+
+ def init(db_path) when is_list(db_path) do
+ {:ok, db} = :rocksdb.open(db_path, @db_options)
+ {:ok, %DB{db: db}}
+ end
+
+ def handle_call({:get, key}, _from, %DB{db: db} = state) do
+ {:reply,
+ db
+ |> :rocksdb.get(key, [])
+ |> deserialize(), state}
+ end
+
+ def handle_call({:get_prefix, prefix}, _from, %DB{db: db} = state) do
+ {:reply,
+ db
+ |> :rocksdb.get(prefix, [])
+ |> deserialize(), state}
+ end
+
+ def handle_cast({:put, key, value}, %DB{db: db} = state) do
+ :rocksdb.put(db, key, serialize(value), [])
+ {:noreply, state}
+ end
+
+ def deserialize({:ok, val}), do: {:ok, :erlang.binary_to_term(val)}
+ def deserialize(other), do: other
+
+ def serialize(val), do: :erlang.term_to_binary(val)
+
+ def terminate(_, %{db: db}) do
+ :rocksdb.close(db)
+ end
+end
--- /dev/null
+defmodule ClusterKV.Ring do
+ @external_resource "priv/ring.json"
+ use StatesLanguage, data: "priv/ring.json"
+ require Logger
+ alias StatesLanguage, as: SL
+ alias ClusterKV.{Ring, DB}
+
+ @enforce_keys [:name, :replicas, :quorum, :db]
+ defstruct [
+ :name,
+ :replicas,
+ :quorum,
+ :ring,
+ :db,
+ requests: [],
+ anti_entropy_interval: 5 * 60_000
+ ]
+
+ @type t :: %__MODULE__{
+ replicas: integer(),
+ quorum: integer(),
+ db: module(),
+ ring: HashRing.t(),
+ anti_entropy_interval: integer()
+ }
+
+ # States
+ @init "Init"
+ @ready "Ready"
+ @quorum_node_up "QuorumNodeUp"
+ @quorum_node_down "QuorumNodeDown"
+ @ready_node_up "ReadyNodeUp"
+ @ready_node_down "ReadyNodeDown"
+ @quorum "Quorum"
+ @anti_entropy "AntiEntropy"
+
+ # Resources
+ @handle_init "HandleInit"
+ @handle_ready "HandleReady"
+ @handle_node_up "HandleNodeUp"
+ @await_quorum "AwaitQuorum"
+ @handle_node_down "HandleNodeDown"
+ @handle_anti_entropy "HandleAntiEntropy"
+
+ def put(name, key, value) do
+ :gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value})
+ end
+
+ def get(name, key) do
+ :gen_statem.call(ClusterKV.ring_name(name), {:get, key})
+ end
+
+ def prefix(name, prefix) do
+ :gen_statem.call(ClusterKV.ring_name(name), {:prefix, prefix})
+ end
+
+ def handle_resource(@handle_init, _, @init, %SL{data: %Ring{} = data} = sl) do
+ ring =
+ Node.self()
+ |> HashRing.new()
+ |> get_existing_nodes()
+
+ :net_kernel.monitor_nodes(true, node_type: :all)
+ {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :initialized}]}
+ end
+
+ def handle_resource(@await_quorum, _, @quorum, %SL{data: %Ring{quorum: q, ring: r}} = sl) do
+ actions =
+ case length(HashRing.nodes(r)) >= q do
+ true -> [{:next_event, :internal, :quorum}]
+ false -> []
+ end
+
+ {:ok, sl, actions}
+ end
+
+ def handle_resource(@handle_ready, _, @ready, sl) do
+ Logger.info("Ready")
+ {:ok, sl, []}
+ end
+
+ def handle_resource(@handle_node_up, _, _, sl) do
+ {:ok, sl, [{:next_event, :internal, :node_added}]}
+ end
+
+ def handle_resource(@handle_node_down, _, _, sl) do
+ {:ok, sl, [{:next_event, :internal, :node_removed}]}
+ end
+
+ def handle_call(
+ {:get, key},
+ from,
+ @ready,
+ %SL{data: %Ring{name: n, requests: reqs, ring: r} = data} = sl
+ ) do
+ node = HashRing.key_to_node(r, key)
+ ref = make_ref()
+ dest = {n, node}
+ send(dest, {:get_key, key, ref, Node.self()})
+ {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from} | reqs]}}, []}
+ end
+
+ def handle_cast(
+ {:put, key, value},
+ @ready,
+ %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
+ ) do
+ nodes = HashRing.key_to_nodes(r, key, repls)
+ cast_sync(n, nodes, key, value)
+ {:ok, sl, []}
+ end
+
+ def handle_cast({:sync, keys}, @ready, %SL{data: %Ring{db: db}} = sl) do
+ Enum.each(keys, fn {key, value} ->
+ DB.put(db, key, value)
+ end)
+
+ {:ok, sl, []}
+ end
+
+ def handle_info({:get_key, key, ref, node}, @ready, %SL{data: %Ring{name: n, db: db}} = sl) do
+ val = DB.get(db, key)
+ send({n, node}, {:reply, val, ref})
+ {:ok, sl, []}
+ end
+
+ def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do
+ {_, from} = get_request(ref, reqs)
+
+ {:ok, %SL{sl | data: %Ring{data | requests: remove_request(ref, reqs)}},
+ [{:reply, from, val}]}
+ end
+
+ def handle_info({:nodeup, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do
+ Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
+
+ {:ok, %SL{sl | data: %Ring{data | ring: HashRing.add_node(r, node)}},
+ [{:next_event, :internal, :node_up}]}
+ end
+
+ def handle_info({:nodedown, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do
+ Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
+
+ {:ok, %SL{sl | data: %Ring{data | ring: HashRing.remove_node(r, node)}},
+ [{:next_event, :internal, :node_down}]}
+ end
+
+ defp get_request(ref, reqs) do
+ Enum.find(reqs, fn
+ {^ref, _} -> true
+ _ -> false
+ end)
+ end
+
+ defp remove_request(ref, requests) do
+ Enum.filter(requests, fn
+ {^ref, _} -> false
+ {_, _} -> true
+ end)
+ end
+
+ defp cast_sync(name, nodes, key, value) do
+ Enum.each(nodes, fn n ->
+ :gen_statem.cast({name, n}, {:sync, [{key, value}]})
+ end)
+ end
+
+ defp get_existing_nodes(ring) do
+ Enum.reduce(Node.list(), ring, fn n, acc ->
+ HashRing.add_node(acc, n)
+ end)
+ end
+end
--- /dev/null
+defmodule ClusterKv.MixProject do
+ use Mix.Project
+
+ def project do
+ [
+ app: :cluster_kv,
+ version: "0.1.0",
+ elixir: "~> 1.9",
+ start_permanent: Mix.env() == :prod,
+ deps: deps()
+ ]
+ end
+
+ # Run "mix help compile.app" to learn about applications.
+ def application do
+ [
+ extra_applications: [:logger]
+ ]
+ end
+
+ defp deps do
+ [
+ {:libcluster, "~> 3.2"},
+ {:libring, "~> 1.4"},
+ {:rocksdb, "~> 1.5"},
+ {:states_language, "~> 0.2"}
+ ]
+ end
+end
--- /dev/null
+%{
+ "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"},
+ "elixpath": {:hex, :elixpath, "0.1.0", "f860e931db7bda6856dc68145694ca429643cc068ef30d7ff6b4096d4357963e", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "30ce06079b41f1f5216ea2cd11605cfe4c82239628555cb3fde9f10055a6eb67"},
+ "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},
+ "json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"},
+ "libcluster": {:hex, :libcluster, "3.2.0", "ed483eea1b960b89a4cee374898644930b28759226716cd7aeaf707976c5966d", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "350e7c2cd5311b9dfb0156d5f5778561e5833f90fabdaac3cd79ea022c4dc3dc"},
+ "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"},
+ "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"},
+ "rocksdb": {:hex, :rocksdb, "1.5.1", "016861fbfb1f76ec2c48f745fca1f6e140fc0f27adc8c389c4cabce4a79b5f2f", [:rebar3], [], "hexpm", "63db7dfd65082d61df92810f61bd934080c77bc57eb711b36bb56a8926bdf3b0"},
+ "states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"},
+ "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
+ "xema": {:hex, :xema, "0.11.0", "7b5118418633cffc27092110d02d4faeea938149dd3f6c64299e41e747067e80", [:mix], [{:conv_case, "~> 0.2.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "51491c9a953d65069d4b30aa2f70bc45ff99fd1bc3345bc72ce4e644d01ea14e"},
+}
--- /dev/null
+{
+ "StartAt": "Init",
+ "Comment": "",
+ "States": {
+ "Init": {
+ "Type": "Task",
+ "Resource": "HandleInit",
+ "TransitionEvent": ":initialized",
+ "Catch": [],
+ "InputPath": "",
+ "OutputPath": "",
+ "ResourcePath": "",
+ "Next": "Quorum",
+ "End": false
+ },
+ "Ready": {
+ "Type": "Choice",
+ "Resource": "HandleReady",
+ "Choices": [
+ {
+ "StringEquals": ":node_up",
+ "Next": "ReadyNodeUp"
+ },
+ {
+ "StringEquals": ":node_down",
+ "Next": "ReadyNodeDown"
+ },
+ {
+ "StringEquals": ":anti_entropy",
+ "Next": "AntiEntropy"
+ }
+ ],
+ "InputPath": "",
+ "OutputPath": ""
+ },
+ "QuorumNodeUp": {
+ "Type": "Task",
+ "Resource": "HandleNodeUp",
+ "TransitionEvent": ":node_added",
+ "Catch": [],
+ "InputPath": "",
+ "OutputPath": "",
+ "ResourcePath": "",
+ "Next": "Quorum",
+ "End": false
+ },
+ "ReadyNodeUp": {
+ "Type": "Task",
+ "Resource": "HandleNodeUp",
+ "TransitionEvent": ":node_added",
+ "Catch": [],
+ "InputPath": "",
+ "OutputPath": "",
+ "ResourcePath": "",
+ "Next": "Ready",
+ "End": false
+ },
+ "Quorum": {
+ "Type": "Choice",
+ "Resource": "AwaitQuorum",
+ "Choices": [
+ {
+ "StringEquals": ":quorum",
+ "Next": "Ready"
+ },
+ {
+ "StringEquals": ":node_up",
+ "Next": "QuorumNodeUp"
+ },
+ {
+ "StringEquals": ":node_down",
+ "Next": "QuorumNodeDown"
+ }
+ ],
+ "InputPath": "",
+ "OutputPath": ""
+ },
+ "ReadyNodeDown": {
+ "Type": "Task",
+ "Resource": "HandleNodeDown",
+ "TransitionEvent": ":node_removed",
+ "Catch": [],
+ "InputPath": "",
+ "OutputPath": "",
+ "ResourcePath": "",
+ "Next": "Quorum",
+ "End": false
+ },
+ "QuorumNodeDown": {
+ "Type": "Task",
+ "Resource": "HandleNodeDown",
+ "TransitionEvent": ":node_removed",
+ "Catch": [],
+ "InputPath": "",
+ "OutputPath": "",
+ "ResourcePath": "",
+ "Next": "Quorum",
+ "End": false
+ },
+ "AntiEntropy": {
+ "Type": "Task",
+ "Resource": "HandleAntiEntropy",
+ "TransitionEvent": ":done",
+ "Catch": [],
+ "InputPath": "",
+ "OutputPath": "",
+ "ResourcePath": "",
+ "Next": "Ready",
+ "End": false
+ }
+ }
+}
--- /dev/null
+defmodule ClusterKvTest do
+ use ExUnit.Case
+ doctest ClusterKv
+
+ test "greets the world" do
+ assert ClusterKv.hello() == :world
+ end
+end
--- /dev/null
+ExUnit.start()