]> Entropealabs - cluster_kv.git/commitdiff
add worker pool for ets access
authorChristopher <chris@entropealabs.com>
Mon, 9 Mar 2020 18:46:45 +0000 (13:46 -0500)
committerChristopher <chris@entropealabs.com>
Mon, 9 Mar 2020 18:46:45 +0000 (13:46 -0500)
lib/cluster_kv.ex
lib/cluster_kv/db.ex
lib/cluster_kv/ets_table.ex [new file with mode: 0644]
mix.exs
mix.lock

index 49101226d3957cd9a07368b49f1d9bfe9c963bca..68e6f3dafecc86f75f72b8d06e58a4984a2d9583 100644 (file)
@@ -3,7 +3,7 @@ defmodule ClusterKV do
   Documentation for ClusterKV.
   """
   use Supervisor
-  alias ClusterKV.{Ring, DB}
+  alias ClusterKV.{DB, ETSTable, Ring}
 
   @spec start_link(
           name: atom(),
@@ -27,10 +27,14 @@ defmodule ClusterKV do
     db = db_name(name)
     ring = ring_name(name)
     cluster_supervisor = cluster_supervisor_name(name)
+    ets = ets_name(name)
     ring_data = %Ring{name: ring, replicas: replicas, quorum: quorum, db: db}
 
     children = [
-      {DB, [name: db]},
+      {ETSTable, [name: ets]},
+      :poolboy.child_spec(db, [name: {:local, db}, worker_module: DB, size: 8, max_overflow: 10],
+        table: ets
+      ),
       {Ring, [{:local, ring}, ring_data, []]},
       {Cluster.Supervisor, [topologies, [name: cluster_supervisor]]}
     ]
@@ -39,6 +43,7 @@ defmodule ClusterKV do
   end
 
   def db_name(name), do: Module.concat([name, DB])
+  def ets_name(name), do: Module.concat([name, ETSTable])
   def ring_name(name), do: Module.concat([name, Ring])
   def cluster_supervisor_name(name), do: Module.concat([name, ClusterSupervisor])
 
index 8f1e17fc20741ad3bf41a6233b8307fba61ef277..3e99b6a001d41f8ed5b7c7aa34747a3fcbbdf7a7 100644 (file)
@@ -4,8 +4,6 @@ defmodule ClusterKV.DB do
 
   alias __MODULE__
 
-  @db_options [:set, {:read_concurrency, true}]
-
   defstruct [:db, :batch_chunk, :batch_fun, :batch_ref, last_batch: 0, batch: []]
 
   @type element :: {key :: String.t(), value :: any()}
@@ -22,38 +20,46 @@ defmodule ClusterKV.DB do
   @spec get(name :: module(), key :: binary(), timeout :: non_neg_integer() | :infinity) ::
           element() | :not_found
   def get(name, key, timeout \\ :infinity) do
-    GenServer.call(name, {:get, key}, timeout)
+    :poolboy.transaction(name, fn w ->
+      GenServer.call(w, {:get, key}, timeout)
+    end)
   end
 
   @spec put(name :: module(), key :: binary(), value :: term()) :: :ok
   def put(name, key, value) do
-    GenServer.cast(name, {:put, key, value})
+    :poolboy.transaction(name, fn w ->
+      GenServer.cast(w, {:put, key, value})
+    end)
   end
 
   @spec upsert(name :: module(), key :: binary(), value :: term(), fun()) :: :ok
   def upsert(name, key, value, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do
-    GenServer.cast(name, {:upsert, key, value, fun})
+    :poolboy.transaction(name, fn w ->
+      GenServer.cast(w, {:upsert, key, value, fun})
+    end)
   end
 
   @spec batch(name :: module(), batch :: [element()], chunk :: integer(), fun :: fun()) :: :ok
   def batch(name, batch, chunk \\ 10, fun \\ &{elem(&1, 0), &2}) do
-    GenServer.cast(name, {:batch, batch, chunk, fun})
+    :poolboy.transaction(name, fn w ->
+      GenServer.cast(w, {:batch, batch, chunk, fun})
+    end)
   end
 
   @spec stream(name :: module(), timeout :: non_neg_integer() | :infinity) :: Enumerable.t()
   def stream(name, timeout \\ :infinity) do
-    GenServer.call(name, :stream, timeout)
+    :poolboy.transaction(name, fn w ->
+      GenServer.call(w, :stream, timeout)
+    end)
   end
 
-  @spec start_link(name: module()) :: GenServer.on_start()
-  def start_link(name: name) do
-    GenServer.start_link(__MODULE__, name, name: name)
+  @spec start_link(table: :ets.tid() | atom()) :: GenServer.on_start()
+  def start_link(table: table) do
+    GenServer.start_link(__MODULE__, table)
   end
 
-  def init(name) do
-    Logger.debug("Opening ETS Table")
-    db = :ets.new(Module.concat([name, ETSTable]), @db_options)
-    {:ok, %DB{db: db}}
+  def init(table) do
+    {:ok, %DB{db: table}}
   end
 
   def handle_call(:stream, _from, %DB{db: db} = state) do
diff --git a/lib/cluster_kv/ets_table.ex b/lib/cluster_kv/ets_table.ex
new file mode 100644 (file)
index 0000000..31b11ea
--- /dev/null
@@ -0,0 +1,18 @@
+defmodule ClusterKV.ETSTable do
+  use GenServer
+  @db_options [:set, :public, :named_table, {:read_concurrency, true}, {:write_concurrency, true}]
+
+  @spec create_table(name :: module()) :: :ets.tab()
+  def create_table(name) do
+    :ets.new(name, @db_options)
+  end
+
+  def start_link(name: name) do
+    GenServer.start_link(__MODULE__, name)
+  end
+
+  def init(name) do
+    create_table(name)
+    {:ok, %{}}
+  end
+end
diff --git a/mix.exs b/mix.exs
index fd148ae72327f5d933bbef5c43efc2f74b0e0bf5..cbd99baaff8e070d14a1354c54b24fe270d4cd85 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -23,6 +23,7 @@ defmodule ClusterKv.MixProject do
       {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
       {:libcluster, "~> 3.2"},
       {:libring, "~> 1.4"},
+      {:poolboy, "~> 1.5"},
       {:states_language, "~> 0.2"}
     ]
   end
index 1600408a2365002a698611300d215f585ea73ca7..9583efc2f1469e1066b515e0e1503fd4650b6f64 100644 (file)
--- a/mix.lock
+++ b/mix.lock
@@ -2,11 +2,13 @@
   "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"},
   "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"},
   "elixpath": {:hex, :elixpath, "0.1.0", "f860e931db7bda6856dc68145694ca429643cc068ef30d7ff6b4096d4357963e", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "30ce06079b41f1f5216ea2cd11605cfe4c82239628555cb3fde9f10055a6eb67"},
+  "erlex": {:hex, :erlex, "0.2.5", "e51132f2f472e13d606d808f0574508eeea2030d487fc002b46ad97e738b0510", [:mix], [], "hexpm", "756d3e19b056339af674b715fdd752c5dac468cf9d0e2d1a03abf4574e99fbf8"},
   "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},
   "json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"},
   "libcluster": {:hex, :libcluster, "3.2.0", "ed483eea1b960b89a4cee374898644930b28759226716cd7aeaf707976c5966d", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "350e7c2cd5311b9dfb0156d5f5778561e5833f90fabdaac3cd79ea022c4dc3dc"},
   "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"},
   "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"},
+  "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
   "states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"},
   "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
   "xema": {:hex, :xema, "0.11.0", "7b5118418633cffc27092110d02d4faeea938149dd3f6c64299e41e747067e80", [:mix], [{:conv_case, "~> 0.2.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "51491c9a953d65069d4b30aa2f70bc45ff99fd1bc3345bc72ce4e644d01ea14e"},