From b8dab5e15fef8cab8195af2ba1dd16283921c412 Mon Sep 17 00:00:00 2001 From: Christopher Date: Fri, 6 Mar 2020 23:47:07 -0600 Subject: [PATCH] move to ets tables --- lib/cluster_kv.ex | 10 +++--- lib/cluster_kv/db.ex | 76 ++++++++---------------------------------- lib/cluster_kv/ring.ex | 67 +++++++++++++++++++++---------------- mix.exs | 1 - mix.lock | 1 - 5 files changed, 55 insertions(+), 100 deletions(-) diff --git a/lib/cluster_kv.ex b/lib/cluster_kv.ex index 2d3e504..9a40d13 100644 --- a/lib/cluster_kv.ex +++ b/lib/cluster_kv.ex @@ -7,7 +7,6 @@ defmodule ClusterKV do @spec start_link( name: atom(), - db_path: charlist() | String.t(), topologies: list(), replicas: integer(), quorum: integer() @@ -17,22 +16,21 @@ defmodule ClusterKV do def start_link( name: name, - db_path: db_path, topologies: topologies, replicas: replicas, quorum: quorum ) do - Supervisor.start_link(__MODULE__, {name, db_path, topologies, replicas, quorum}, name: name) + Supervisor.start_link(__MODULE__, {name, topologies, replicas, quorum}, name: name) end - def init({name, db_path, topologies, replicas, quorum}) do + def init({name, topologies, replicas, quorum}) do db = db_name(name) ring = ring_name(name) cluster_supervisor = cluster_supervisor_name(name) ring_data = %Ring{name: ring, replicas: replicas, quorum: quorum, db: db} children = [ - {DB, [name: db, db_path: db_path]}, + {DB, [name: db]}, {Cluster.Supervisor, [topologies, [name: cluster_supervisor]]}, {Ring, [{:local, ring}, ring_data, []]} ] @@ -46,5 +44,5 @@ defmodule ClusterKV do defdelegate get(name, key), to: Ring defdelegate put(name, key, value), to: Ring - defdelegate prefix(name, prefix), to: Ring + defdelegate prefix(name, key, split_on, min), to: Ring end diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index f49cc39..9d7ccc1 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -4,94 +4,44 @@ defmodule ClusterKV.DB do alias __MODULE__ - @db_options [ - {:create_if_missing, true}, - {:merge_operator, :erlang_merge_operator}, - {:capped_prefix_extractor, 255} - ] + @db_options [:set, :private] defstruct [:db] @type t :: %__MODULE__{ - db: :rocksdb.db_handle() + db: :ets.tid() | atom() } - @spec get(name :: module(), key :: binary()) :: - {:ok, term()} | :not_found | {:error, {:corruption, String.t()}} | {:error, any()} + @spec get(name :: module(), key :: binary()) :: [tuple()] def get(name, key) do GenServer.call(name, {:get, key}) end - @spec get_prefix(name :: module(), prefix :: binary()) :: - [{:ok, term()}] | :not_found | {:error, {:corruption, String.t()}} | {:error, any()} - def get_prefix(name, prefix) do - GenServer.call(name, {:get_prefix, prefix}) - end - @spec put(name :: module(), key :: binary(), value :: term()) :: :ok def put(name, key, value) do GenServer.cast(name, {:put, key, value}) end - def start_link(name: name, db_path: db_path) do - GenServer.start_link(__MODULE__, db_path, name: name) - end - - def init(db_path) when is_binary(db_path) do - init(to_charlist(db_path)) + def start_link(name: name) do + GenServer.start_link(__MODULE__, name, name: name) end - def init(db_path) when is_list(db_path) do - Logger.info("Opening database at #{db_path}") - {:ok, db} = :rocksdb.open(db_path, @db_options) + def init(name) do + Logger.debug("Opening ETS Table") + db = :ets.new(Module.concat([name, ETSTable]), @db_options) {:ok, %DB{db: db}} end def handle_call({:get, key}, _from, %DB{db: db} = state) do {:reply, - db - |> :rocksdb.get(key, []) - |> deserialize(), state} - end - - def handle_call({:get_prefix, prefix}, _from, %DB{db: db} = state) do - {:reply, seek_iterator(db, prefix), state} + case :ets.lookup(db, key) do + [] -> :not_found + [other] -> other + end, state} end def handle_cast({:put, key, value}, %DB{db: db} = state) do - :rocksdb.put(db, key, serialize(value), []) + :ets.insert(db, {key, value}) {:noreply, state} end - - def seek_iterator(db, prefix, acc \\ []) do - {:ok, itr} = :rocksdb.iterator(db, []) - seek_loop(:rocksdb.iterator_move(itr, {:seek, prefix}), prefix, itr, acc) - end - - def seek_loop({:error, :iterator_closed}, _pre, _itr, acc), do: acc - def seek_loop({:error, :invalid_iterator}, _pre, _itr, acc), do: acc - - def seek_loop({:ok, key, val}, prefix, itr, acc) do - case String.starts_with?(key, prefix) do - true -> - seek_loop( - :rocksdb.iterator_move(itr, :next), - prefix, - itr, - [{key, :erlang.binary_to_term(val)} | acc] - ) - - false -> - acc - end - end - - def deserialize({:ok, val}), do: {:ok, :erlang.binary_to_term(val)} - def deserialize(other), do: other - - def serialize(val), do: :erlang.term_to_binary(val) - - def terminate(_, %{db: db}) do - :rocksdb.close(db) - end end diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index 871f6fb..dce3363 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -54,9 +54,9 @@ defmodule ClusterKV.Ring do :gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout) end - def prefix(name, prefix) do - Logger.debug("PREFIX - #{prefix}") - :gen_statem.call(ClusterKV.ring_name(name), {:prefix, prefix}) + def prefix(name, key, split_on, min) do + Logger.debug("PREFIX - #{key} / #{inspect(split_on)} : #{min}") + :gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min}) end def handle_resource(@handle_init, _, @init, %SL{data: %Ring{} = data} = sl) do @@ -103,13 +103,7 @@ defmodule ClusterKV.Ring do %SL{data: %Ring{name: n, node: node, requests: reqs, ring: r, replicas: repls} = data} = sl ) do - nodes = HashRing.key_to_nodes(r, key, repls) - - node = - case node in nodes do - true -> node - false -> Enum.random(nodes) - end + node = get_node(key, r, node, repls) ref = make_ref() send({n, node}, {:get_key, key, ref, node}) @@ -119,20 +113,24 @@ defmodule ClusterKV.Ring do def handle_call({:get, _key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} def handle_call( - {:prefix, prefix}, + {:prefix, key, split_on, min}, from, @ready, - %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r} = data} = sl + %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl ) do - nodes = HashRing.nodes(r) ref = make_ref() - - Enum.each(nodes, fn node -> - Logger.debug("Prefix request to #{inspect(node)}") - send({n, node}, {:get_prefix, prefix, ref, me}) + parts = String.split(key, split_on) + head = Enum.slice(parts, 0..(min - 1)) + itr = Enum.slice(parts, min..-2) + get_prefix(head, key, r, me, repls, n, ref) + + Enum.reduce(itr, head, fn ns, acc -> + next = acc ++ [ns] + get_prefix(next, key, r, me, repls, n, ref) + next end) - {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(nodes), []} | reqs]}}, []} + {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(itr) + 1, []} | reqs]}}, []} end def handle_call({:prefix, _prefix}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} @@ -165,16 +163,6 @@ defmodule ClusterKV.Ring do {:ok, sl, []} end - def handle_info( - {:get_prefix, prefix, ref, node}, - @ready, - %SL{data: %Ring{name: n, db: db}} = sl - ) do - val = DB.get_prefix(db, prefix) - send({n, node}, {:reply, val, ref}) - {:ok, sl, []} - end - def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do {requests, actions} = maybe_reply(ref, reqs, val) {:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions} @@ -194,6 +182,21 @@ defmodule ClusterKV.Ring do [{:next_event, :internal, :node_down}]} end + defp get_prefix(next, key, r, me, repls, n, ref) do + prefix = Enum.join(next, ".") + node = get_node(key, r, me, repls) + send({n, node}, {:get_key, prefix, ref, me}) + end + + defp get_node(key, r, node, repls) do + nodes = HashRing.key_to_nodes(r, key, repls) + + case node in nodes do + true -> node + false -> Enum.random(nodes) + end + end + defp maybe_reply(ref, reqs, val) do case get_request(ref, reqs) do {_, from, 1, res} -> @@ -207,7 +210,13 @@ defmodule ClusterKV.Ring do defp get_final_value([], val), do: val defp get_final_value(res, val) when is_list(res), - do: {:ok, MapSet.new(List.flatten([val | res]))} + do: + {:ok, + [val | res] + |> Enum.filter(fn + :not_found -> false + _ -> true + end)} defp get_request(ref, reqs) do Enum.find(reqs, fn diff --git a/mix.exs b/mix.exs index e8d9540..1d69932 100644 --- a/mix.exs +++ b/mix.exs @@ -22,7 +22,6 @@ defmodule ClusterKv.MixProject do [ {:libcluster, "~> 3.2"}, {:libring, "~> 1.4"}, - {:rocksdb, "~> 1.5"}, {:states_language, "~> 0.2"} ] end diff --git a/mix.lock b/mix.lock index b4f29f9..a42d739 100644 --- a/mix.lock +++ b/mix.lock @@ -6,7 +6,6 @@ "libcluster": {:hex, :libcluster, "3.2.0", "ed483eea1b960b89a4cee374898644930b28759226716cd7aeaf707976c5966d", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "350e7c2cd5311b9dfb0156d5f5778561e5833f90fabdaac3cd79ea022c4dc3dc"}, "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"}, "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"}, - "rocksdb": {:hex, :rocksdb, "1.5.1", "016861fbfb1f76ec2c48f745fca1f6e140fc0f27adc8c389c4cabce4a79b5f2f", [:rebar3], [], "hexpm", "63db7dfd65082d61df92810f61bd934080c77bc57eb711b36bb56a8926bdf3b0"}, "states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"}, "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"}, "xema": {:hex, :xema, "0.11.0", "7b5118418633cffc27092110d02d4faeea938149dd3f6c64299e41e747067e80", [:mix], [{:conv_case, "~> 0.2.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "51491c9a953d65069d4b30aa2f70bc45ff99fd1bc3345bc72ce4e644d01ea14e"}, -- 2.45.3