]> Entropealabs - cluster_kv.git/commitdiff
move to ets tables
authorChristopher <chris@entropealabs.com>
Sat, 7 Mar 2020 05:47:07 +0000 (23:47 -0600)
committerChristopher <chris@entropealabs.com>
Sat, 7 Mar 2020 05:47:07 +0000 (23:47 -0600)
lib/cluster_kv.ex
lib/cluster_kv/db.ex
lib/cluster_kv/ring.ex
mix.exs
mix.lock

index 2d3e50460973dd48d95ded2327be931a8bcd68d2..9a40d131f9e8fa344f7401b443caa038011636b8 100644 (file)
@@ -7,7 +7,6 @@ defmodule ClusterKV do
 
   @spec start_link(
           name: atom(),
-          db_path: charlist() | String.t(),
           topologies: list(),
           replicas: integer(),
           quorum: integer()
@@ -17,22 +16,21 @@ defmodule ClusterKV do
 
   def start_link(
         name: name,
-        db_path: db_path,
         topologies: topologies,
         replicas: replicas,
         quorum: quorum
       ) do
-    Supervisor.start_link(__MODULE__, {name, db_path, topologies, replicas, quorum}, name: name)
+    Supervisor.start_link(__MODULE__, {name, topologies, replicas, quorum}, name: name)
   end
 
-  def init({name, db_path, topologies, replicas, quorum}) do
+  def init({name, topologies, replicas, quorum}) do
     db = db_name(name)
     ring = ring_name(name)
     cluster_supervisor = cluster_supervisor_name(name)
     ring_data = %Ring{name: ring, replicas: replicas, quorum: quorum, db: db}
 
     children = [
-      {DB, [name: db, db_path: db_path]},
+      {DB, [name: db]},
       {Cluster.Supervisor, [topologies, [name: cluster_supervisor]]},
       {Ring, [{:local, ring}, ring_data, []]}
     ]
@@ -46,5 +44,5 @@ defmodule ClusterKV do
 
   defdelegate get(name, key), to: Ring
   defdelegate put(name, key, value), to: Ring
-  defdelegate prefix(name, prefix), to: Ring
+  defdelegate prefix(name, key, split_on, min), to: Ring
 end
index f49cc399c07093e5b88baca6128fce48251480b0..9d7ccc13661175f49e6a0a856d089e512e3a58ef 100644 (file)
@@ -4,94 +4,44 @@ defmodule ClusterKV.DB do
 
   alias __MODULE__
 
-  @db_options [
-    {:create_if_missing, true},
-    {:merge_operator, :erlang_merge_operator},
-    {:capped_prefix_extractor, 255}
-  ]
+  @db_options [:set, :private]
 
   defstruct [:db]
 
   @type t :: %__MODULE__{
-          db: :rocksdb.db_handle()
+          db: :ets.tid() | atom()
         }
 
-  @spec get(name :: module(), key :: binary()) ::
-          {:ok, term()} | :not_found | {:error, {:corruption, String.t()}} | {:error, any()}
+  @spec get(name :: module(), key :: binary()) :: [tuple()]
   def get(name, key) do
     GenServer.call(name, {:get, key})
   end
 
-  @spec get_prefix(name :: module(), prefix :: binary()) ::
-          [{:ok, term()}] | :not_found | {:error, {:corruption, String.t()}} | {:error, any()}
-  def get_prefix(name, prefix) do
-    GenServer.call(name, {:get_prefix, prefix})
-  end
-
   @spec put(name :: module(), key :: binary(), value :: term()) :: :ok
   def put(name, key, value) do
     GenServer.cast(name, {:put, key, value})
   end
 
-  def start_link(name: name, db_path: db_path) do
-    GenServer.start_link(__MODULE__, db_path, name: name)
-  end
-
-  def init(db_path) when is_binary(db_path) do
-    init(to_charlist(db_path))
+  def start_link(name: name) do
+    GenServer.start_link(__MODULE__, name, name: name)
   end
 
-  def init(db_path) when is_list(db_path) do
-    Logger.info("Opening database at #{db_path}")
-    {:ok, db} = :rocksdb.open(db_path, @db_options)
+  def init(name) do
+    Logger.debug("Opening ETS Table")
+    db = :ets.new(Module.concat([name, ETSTable]), @db_options)
     {:ok, %DB{db: db}}
   end
 
   def handle_call({:get, key}, _from, %DB{db: db} = state) do
     {:reply,
-     db
-     |> :rocksdb.get(key, [])
-     |> deserialize(), state}
-  end
-
-  def handle_call({:get_prefix, prefix}, _from, %DB{db: db} = state) do
-    {:reply, seek_iterator(db, prefix), state}
+     case :ets.lookup(db, key) do
+       [] -> :not_found
+       [other] -> other
+     end, state}
   end
 
   def handle_cast({:put, key, value}, %DB{db: db} = state) do
-    :rocksdb.put(db, key, serialize(value), [])
+    :ets.insert(db, {key, value})
     {:noreply, state}
   end
-
-  def seek_iterator(db, prefix, acc \\ []) do
-    {:ok, itr} = :rocksdb.iterator(db, [])
-    seek_loop(:rocksdb.iterator_move(itr, {:seek, prefix}), prefix, itr, acc)
-  end
-
-  def seek_loop({:error, :iterator_closed}, _pre, _itr, acc), do: acc
-  def seek_loop({:error, :invalid_iterator}, _pre, _itr, acc), do: acc
-
-  def seek_loop({:ok, key, val}, prefix, itr, acc) do
-    case String.starts_with?(key, prefix) do
-      true ->
-        seek_loop(
-          :rocksdb.iterator_move(itr, :next),
-          prefix,
-          itr,
-          [{key, :erlang.binary_to_term(val)} | acc]
-        )
-
-      false ->
-        acc
-    end
-  end
-
-  def deserialize({:ok, val}), do: {:ok, :erlang.binary_to_term(val)}
-  def deserialize(other), do: other
-
-  def serialize(val), do: :erlang.term_to_binary(val)
-
-  def terminate(_, %{db: db}) do
-    :rocksdb.close(db)
-  end
 end
index 871f6fba839952f584a1056640d747d7bd8946e1..dce336316c6c66d2c204fc4a75e3d215146a2b07 100644 (file)
@@ -54,9 +54,9 @@ defmodule ClusterKV.Ring do
     :gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout)
   end
 
-  def prefix(name, prefix) do
-    Logger.debug("PREFIX - #{prefix}")
-    :gen_statem.call(ClusterKV.ring_name(name), {:prefix, prefix})
+  def prefix(name, key, split_on, min) do
+    Logger.debug("PREFIX - #{key} / #{inspect(split_on)} : #{min}")
+    :gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min})
   end
 
   def handle_resource(@handle_init, _, @init, %SL{data: %Ring{} = data} = sl) do
@@ -103,13 +103,7 @@ defmodule ClusterKV.Ring do
         %SL{data: %Ring{name: n, node: node, requests: reqs, ring: r, replicas: repls} = data} =
           sl
       ) do
-    nodes = HashRing.key_to_nodes(r, key, repls)
-
-    node =
-      case node in nodes do
-        true -> node
-        false -> Enum.random(nodes)
-      end
+    node = get_node(key, r, node, repls)
 
     ref = make_ref()
     send({n, node}, {:get_key, key, ref, node})
@@ -119,20 +113,24 @@ defmodule ClusterKV.Ring do
   def handle_call({:get, _key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
 
   def handle_call(
-        {:prefix, prefix},
+        {:prefix, key, split_on, min},
         from,
         @ready,
-        %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r} = data} = sl
+        %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl
       ) do
-    nodes = HashRing.nodes(r)
     ref = make_ref()
-
-    Enum.each(nodes, fn node ->
-      Logger.debug("Prefix request to #{inspect(node)}")
-      send({n, node}, {:get_prefix, prefix, ref, me})
+    parts = String.split(key, split_on)
+    head = Enum.slice(parts, 0..(min - 1))
+    itr = Enum.slice(parts, min..-2)
+    get_prefix(head, key, r, me, repls, n, ref)
+
+    Enum.reduce(itr, head, fn ns, acc ->
+      next = acc ++ [ns]
+      get_prefix(next, key, r, me, repls, n, ref)
+      next
     end)
 
-    {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(nodes), []} | reqs]}}, []}
+    {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(itr) + 1, []} | reqs]}}, []}
   end
 
   def handle_call({:prefix, _prefix}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
@@ -165,16 +163,6 @@ defmodule ClusterKV.Ring do
     {:ok, sl, []}
   end
 
-  def handle_info(
-        {:get_prefix, prefix, ref, node},
-        @ready,
-        %SL{data: %Ring{name: n, db: db}} = sl
-      ) do
-    val = DB.get_prefix(db, prefix)
-    send({n, node}, {:reply, val, ref})
-    {:ok, sl, []}
-  end
-
   def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do
     {requests, actions} = maybe_reply(ref, reqs, val)
     {:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions}
@@ -194,6 +182,21 @@ defmodule ClusterKV.Ring do
      [{:next_event, :internal, :node_down}]}
   end
 
+  defp get_prefix(next, key, r, me, repls, n, ref) do
+    prefix = Enum.join(next, ".")
+    node = get_node(key, r, me, repls)
+    send({n, node}, {:get_key, prefix, ref, me})
+  end
+
+  defp get_node(key, r, node, repls) do
+    nodes = HashRing.key_to_nodes(r, key, repls)
+
+    case node in nodes do
+      true -> node
+      false -> Enum.random(nodes)
+    end
+  end
+
   defp maybe_reply(ref, reqs, val) do
     case get_request(ref, reqs) do
       {_, from, 1, res} ->
@@ -207,7 +210,13 @@ defmodule ClusterKV.Ring do
   defp get_final_value([], val), do: val
 
   defp get_final_value(res, val) when is_list(res),
-    do: {:ok, MapSet.new(List.flatten([val | res]))}
+    do:
+      {:ok,
+       [val | res]
+       |> Enum.filter(fn
+         :not_found -> false
+         _ -> true
+       end)}
 
   defp get_request(ref, reqs) do
     Enum.find(reqs, fn
diff --git a/mix.exs b/mix.exs
index e8d9540f8f7659d5fee6b130ac28463525352487..1d6993269291be0f9e1326237b1c107f7bd9c5eb 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -22,7 +22,6 @@ defmodule ClusterKv.MixProject do
     [
       {:libcluster, "~> 3.2"},
       {:libring, "~> 1.4"},
-      {:rocksdb, "~> 1.5"},
       {:states_language, "~> 0.2"}
     ]
   end
index b4f29f9853d0afc0ed41820b91120aae5d2036f3..a42d739f970e7cdf5d269ab4e599a30bc9591e6a 100644 (file)
--- a/mix.lock
+++ b/mix.lock
@@ -6,7 +6,6 @@
   "libcluster": {:hex, :libcluster, "3.2.0", "ed483eea1b960b89a4cee374898644930b28759226716cd7aeaf707976c5966d", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "350e7c2cd5311b9dfb0156d5f5778561e5833f90fabdaac3cd79ea022c4dc3dc"},
   "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"},
   "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"},
-  "rocksdb": {:hex, :rocksdb, "1.5.1", "016861fbfb1f76ec2c48f745fca1f6e140fc0f27adc8c389c4cabce4a79b5f2f", [:rebar3], [], "hexpm", "63db7dfd65082d61df92810f61bd934080c77bc57eb711b36bb56a8926bdf3b0"},
   "states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"},
   "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
   "xema": {:hex, :xema, "0.11.0", "7b5118418633cffc27092110d02d4faeea938149dd3f6c64299e41e747067e80", [:mix], [{:conv_case, "~> 0.2.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "51491c9a953d65069d4b30aa2f70bc45ff99fd1bc3345bc72ce4e644d01ea14e"},