From: Christopher Date: Fri, 6 Mar 2020 22:24:07 +0000 (-0600) Subject: initial commit X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=12aaec934d9e098ce7025c66b910dde76bdbfe2b;p=cluster_kv.git initial commit --- 12aaec934d9e098ce7025c66b910dde76bdbfe2b diff --git a/.formatter.exs b/.formatter.exs new file mode 100644 index 0000000..d2cda26 --- /dev/null +++ b/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..27f433a --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# Ignore .fetch files in case you like to edit your project deps locally. +/.fetch + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +cluster_kv-*.tar + diff --git a/.tool-versions b/.tool-versions new file mode 100644 index 0000000..30c5b80 --- /dev/null +++ b/.tool-versions @@ -0,0 +1,2 @@ +elixir 1.9.4-otp-22 +erlang 22.2.6 diff --git a/README.md b/README.md new file mode 100644 index 0000000..f4dcfea --- /dev/null +++ b/README.md @@ -0,0 +1,21 @@ +# ClusterKv + +**TODO: Add description** + +## Installation + +If [available in Hex](https://hex.pm/docs/publish), the package can be installed +by adding `cluster_kv` to your list of dependencies in `mix.exs`: + +```elixir +def deps do + [ + {:cluster_kv, "~> 0.1.0"} + ] +end +``` + +Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc) +and published on [HexDocs](https://hexdocs.pm). Once published, the docs can +be found at [https://hexdocs.pm/cluster_kv](https://hexdocs.pm/cluster_kv). + diff --git a/lib/cluster_kv.ex b/lib/cluster_kv.ex new file mode 100644 index 0000000..2d3e504 --- /dev/null +++ b/lib/cluster_kv.ex @@ -0,0 +1,50 @@ +defmodule ClusterKV do + @moduledoc """ + Documentation for ClusterKV. + """ + use Supervisor + alias ClusterKV.{Ring, DB} + + @spec start_link( + name: atom(), + db_path: charlist() | String.t(), + topologies: list(), + replicas: integer(), + quorum: integer() + ) :: + {:ok, pid()} + | {:error, {:already_started, pid()} | {:shutdown, term()} | term()} + + def start_link( + name: name, + db_path: db_path, + topologies: topologies, + replicas: replicas, + quorum: quorum + ) do + Supervisor.start_link(__MODULE__, {name, db_path, topologies, replicas, quorum}, name: name) + end + + def init({name, db_path, topologies, replicas, quorum}) do + db = db_name(name) + ring = ring_name(name) + cluster_supervisor = cluster_supervisor_name(name) + ring_data = %Ring{name: ring, replicas: replicas, quorum: quorum, db: db} + + children = [ + {DB, [name: db, db_path: db_path]}, + {Cluster.Supervisor, [topologies, [name: cluster_supervisor]]}, + {Ring, [{:local, ring}, ring_data, []]} + ] + + Supervisor.init(children, strategy: :one_for_one, max_restarts: 5, max_seconds: 10) + end + + def db_name(name), do: Module.concat([name, DB]) + def ring_name(name), do: Module.concat([name, Ring]) + def cluster_supervisor_name(name), do: Module.concat([name, ClusterSupervisor]) + + defdelegate get(name, key), to: Ring + defdelegate put(name, key, value), to: Ring + defdelegate prefix(name, prefix), to: Ring +end diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex new file mode 100644 index 0000000..628c91e --- /dev/null +++ b/lib/cluster_kv/db.ex @@ -0,0 +1,76 @@ +defmodule ClusterKV.DB do + use GenServer + require Logger + + alias __MODULE__ + + @db_options [ + {:create_if_missing, true}, + {:merge_operator, :erlang_merge_operator}, + {:capped_prefix_extractor, 255} + ] + + defstruct [:db] + + @type t :: %__MODULE__{ + db: :rocksdb.db_handle() + } + + @spec get(name :: module(), key :: binary()) :: + {:ok, term()} | :not_found | {:error, {:corruption, String.t()}} | {:error, any()} + def get(name, key) do + GenServer.call(name, {:get, key}) + end + + @spec get_prefix(name :: module(), prefix :: binary()) :: + [{:ok, term()}] | :not_found | {:error, {:corruption, String.t()}} | {:error, any()} + def get_prefix(name, prefix) do + GenServer.call(name, {:get_prefix, prefix}) + end + + @spec put(name :: module(), key :: binary(), value :: term()) :: :ok + def put(name, key, value) do + GenServer.cast(name, {:put, key, value}) + end + + def start_link(name: name, db_path: db_path) do + GenServer.start_link(__MODULE__, db_path, name: name) + end + + def init(db_path) when is_binary(db_path) do + init(to_charlist(db_path)) + end + + def init(db_path) when is_list(db_path) do + {:ok, db} = :rocksdb.open(db_path, @db_options) + {:ok, %DB{db: db}} + end + + def handle_call({:get, key}, _from, %DB{db: db} = state) do + {:reply, + db + |> :rocksdb.get(key, []) + |> deserialize(), state} + end + + def handle_call({:get_prefix, prefix}, _from, %DB{db: db} = state) do + {:reply, + db + |> :rocksdb.get(prefix, []) + |> deserialize(), state} + end + + def handle_cast({:put, key, value}, %DB{db: db} = state) do + :rocksdb.put(db, key, serialize(value), []) + {:noreply, state} + end + + def deserialize({:ok, val}), do: {:ok, :erlang.binary_to_term(val)} + def deserialize(other), do: other + + def serialize(val), do: :erlang.term_to_binary(val) + + def terminate(_, %{db: db}) do + :rocksdb.close(db) + end +end diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex new file mode 100644 index 0000000..701bc94 --- /dev/null +++ b/lib/cluster_kv/ring.ex @@ -0,0 +1,173 @@ +defmodule ClusterKV.Ring do + @external_resource "priv/ring.json" + use StatesLanguage, data: "priv/ring.json" + require Logger + alias StatesLanguage, as: SL + alias ClusterKV.{Ring, DB} + + @enforce_keys [:name, :replicas, :quorum, :db] + defstruct [ + :name, + :replicas, + :quorum, + :ring, + :db, + requests: [], + anti_entropy_interval: 5 * 60_000 + ] + + @type t :: %__MODULE__{ + replicas: integer(), + quorum: integer(), + db: module(), + ring: HashRing.t(), + anti_entropy_interval: integer() + } + + # States + @init "Init" + @ready "Ready" + @quorum_node_up "QuorumNodeUp" + @quorum_node_down "QuorumNodeDown" + @ready_node_up "ReadyNodeUp" + @ready_node_down "ReadyNodeDown" + @quorum "Quorum" + @anti_entropy "AntiEntropy" + + # Resources + @handle_init "HandleInit" + @handle_ready "HandleReady" + @handle_node_up "HandleNodeUp" + @await_quorum "AwaitQuorum" + @handle_node_down "HandleNodeDown" + @handle_anti_entropy "HandleAntiEntropy" + + def put(name, key, value) do + :gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value}) + end + + def get(name, key) do + :gen_statem.call(ClusterKV.ring_name(name), {:get, key}) + end + + def prefix(name, prefix) do + :gen_statem.call(ClusterKV.ring_name(name), {:prefix, prefix}) + end + + def handle_resource(@handle_init, _, @init, %SL{data: %Ring{} = data} = sl) do + ring = + Node.self() + |> HashRing.new() + |> get_existing_nodes() + + :net_kernel.monitor_nodes(true, node_type: :all) + {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :initialized}]} + end + + def handle_resource(@await_quorum, _, @quorum, %SL{data: %Ring{quorum: q, ring: r}} = sl) do + actions = + case length(HashRing.nodes(r)) >= q do + true -> [{:next_event, :internal, :quorum}] + false -> [] + end + + {:ok, sl, actions} + end + + def handle_resource(@handle_ready, _, @ready, sl) do + Logger.info("Ready") + {:ok, sl, []} + end + + def handle_resource(@handle_node_up, _, _, sl) do + {:ok, sl, [{:next_event, :internal, :node_added}]} + end + + def handle_resource(@handle_node_down, _, _, sl) do + {:ok, sl, [{:next_event, :internal, :node_removed}]} + end + + def handle_call( + {:get, key}, + from, + @ready, + %SL{data: %Ring{name: n, requests: reqs, ring: r} = data} = sl + ) do + node = HashRing.key_to_node(r, key) + ref = make_ref() + dest = {n, node} + send(dest, {:get_key, key, ref, Node.self()}) + {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from} | reqs]}}, []} + end + + def handle_cast( + {:put, key, value}, + @ready, + %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl + ) do + nodes = HashRing.key_to_nodes(r, key, repls) + cast_sync(n, nodes, key, value) + {:ok, sl, []} + end + + def handle_cast({:sync, keys}, @ready, %SL{data: %Ring{db: db}} = sl) do + Enum.each(keys, fn {key, value} -> + DB.put(db, key, value) + end) + + {:ok, sl, []} + end + + def handle_info({:get_key, key, ref, node}, @ready, %SL{data: %Ring{name: n, db: db}} = sl) do + val = DB.get(db, key) + send({n, node}, {:reply, val, ref}) + {:ok, sl, []} + end + + def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do + {_, from} = get_request(ref, reqs) + + {:ok, %SL{sl | data: %Ring{data | requests: remove_request(ref, reqs)}}, + [{:reply, from, val}]} + end + + def handle_info({:nodeup, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do + Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}") + + {:ok, %SL{sl | data: %Ring{data | ring: HashRing.add_node(r, node)}}, + [{:next_event, :internal, :node_up}]} + end + + def handle_info({:nodedown, node, info}, _, %SL{data: %Ring{ring: r} = data} = sl) do + Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}") + + {:ok, %SL{sl | data: %Ring{data | ring: HashRing.remove_node(r, node)}}, + [{:next_event, :internal, :node_down}]} + end + + defp get_request(ref, reqs) do + Enum.find(reqs, fn + {^ref, _} -> true + _ -> false + end) + end + + defp remove_request(ref, requests) do + Enum.filter(requests, fn + {^ref, _} -> false + {_, _} -> true + end) + end + + defp cast_sync(name, nodes, key, value) do + Enum.each(nodes, fn n -> + :gen_statem.cast({name, n}, {:sync, [{key, value}]}) + end) + end + + defp get_existing_nodes(ring) do + Enum.reduce(Node.list(), ring, fn n, acc -> + HashRing.add_node(acc, n) + end) + end +end diff --git a/mix.exs b/mix.exs new file mode 100644 index 0000000..e8d9540 --- /dev/null +++ b/mix.exs @@ -0,0 +1,29 @@ +defmodule ClusterKv.MixProject do + use Mix.Project + + def project do + [ + app: :cluster_kv, + version: "0.1.0", + elixir: "~> 1.9", + start_permanent: Mix.env() == :prod, + deps: deps() + ] + end + + # Run "mix help compile.app" to learn about applications. + def application do + [ + extra_applications: [:logger] + ] + end + + defp deps do + [ + {:libcluster, "~> 3.2"}, + {:libring, "~> 1.4"}, + {:rocksdb, "~> 1.5"}, + {:states_language, "~> 0.2"} + ] + end +end diff --git a/mix.lock b/mix.lock new file mode 100644 index 0000000..b4f29f9 --- /dev/null +++ b/mix.lock @@ -0,0 +1,13 @@ +%{ + "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"}, + "elixpath": {:hex, :elixpath, "0.1.0", "f860e931db7bda6856dc68145694ca429643cc068ef30d7ff6b4096d4357963e", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "30ce06079b41f1f5216ea2cd11605cfe4c82239628555cb3fde9f10055a6eb67"}, + "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"}, + "json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"}, + "libcluster": {:hex, :libcluster, "3.2.0", "ed483eea1b960b89a4cee374898644930b28759226716cd7aeaf707976c5966d", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "350e7c2cd5311b9dfb0156d5f5778561e5833f90fabdaac3cd79ea022c4dc3dc"}, + "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"}, + "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"}, + "rocksdb": {:hex, :rocksdb, "1.5.1", "016861fbfb1f76ec2c48f745fca1f6e140fc0f27adc8c389c4cabce4a79b5f2f", [:rebar3], [], "hexpm", "63db7dfd65082d61df92810f61bd934080c77bc57eb711b36bb56a8926bdf3b0"}, + "states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"}, + "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"}, + "xema": {:hex, :xema, "0.11.0", "7b5118418633cffc27092110d02d4faeea938149dd3f6c64299e41e747067e80", [:mix], [{:conv_case, "~> 0.2.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "51491c9a953d65069d4b30aa2f70bc45ff99fd1bc3345bc72ce4e644d01ea14e"}, +} diff --git a/priv/ring.json b/priv/ring.json new file mode 100644 index 0000000..4a07b37 --- /dev/null +++ b/priv/ring.json @@ -0,0 +1,112 @@ +{ + "StartAt": "Init", + "Comment": "", + "States": { + "Init": { + "Type": "Task", + "Resource": "HandleInit", + "TransitionEvent": ":initialized", + "Catch": [], + "InputPath": "", + "OutputPath": "", + "ResourcePath": "", + "Next": "Quorum", + "End": false + }, + "Ready": { + "Type": "Choice", + "Resource": "HandleReady", + "Choices": [ + { + "StringEquals": ":node_up", + "Next": "ReadyNodeUp" + }, + { + "StringEquals": ":node_down", + "Next": "ReadyNodeDown" + }, + { + "StringEquals": ":anti_entropy", + "Next": "AntiEntropy" + } + ], + "InputPath": "", + "OutputPath": "" + }, + "QuorumNodeUp": { + "Type": "Task", + "Resource": "HandleNodeUp", + "TransitionEvent": ":node_added", + "Catch": [], + "InputPath": "", + "OutputPath": "", + "ResourcePath": "", + "Next": "Quorum", + "End": false + }, + "ReadyNodeUp": { + "Type": "Task", + "Resource": "HandleNodeUp", + "TransitionEvent": ":node_added", + "Catch": [], + "InputPath": "", + "OutputPath": "", + "ResourcePath": "", + "Next": "Ready", + "End": false + }, + "Quorum": { + "Type": "Choice", + "Resource": "AwaitQuorum", + "Choices": [ + { + "StringEquals": ":quorum", + "Next": "Ready" + }, + { + "StringEquals": ":node_up", + "Next": "QuorumNodeUp" + }, + { + "StringEquals": ":node_down", + "Next": "QuorumNodeDown" + } + ], + "InputPath": "", + "OutputPath": "" + }, + "ReadyNodeDown": { + "Type": "Task", + "Resource": "HandleNodeDown", + "TransitionEvent": ":node_removed", + "Catch": [], + "InputPath": "", + "OutputPath": "", + "ResourcePath": "", + "Next": "Quorum", + "End": false + }, + "QuorumNodeDown": { + "Type": "Task", + "Resource": "HandleNodeDown", + "TransitionEvent": ":node_removed", + "Catch": [], + "InputPath": "", + "OutputPath": "", + "ResourcePath": "", + "Next": "Quorum", + "End": false + }, + "AntiEntropy": { + "Type": "Task", + "Resource": "HandleAntiEntropy", + "TransitionEvent": ":done", + "Catch": [], + "InputPath": "", + "OutputPath": "", + "ResourcePath": "", + "Next": "Ready", + "End": false + } + } +} diff --git a/test/cluster_kv_test.exs b/test/cluster_kv_test.exs new file mode 100644 index 0000000..431dddf --- /dev/null +++ b/test/cluster_kv_test.exs @@ -0,0 +1,8 @@ +defmodule ClusterKvTest do + use ExUnit.Case + doctest ClusterKv + + test "greets the world" do + assert ClusterKv.hello() == :world + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs new file mode 100644 index 0000000..869559e --- /dev/null +++ b/test/test_helper.exs @@ -0,0 +1 @@ +ExUnit.start()