From a8353fbb87e374e5e3abfc1a30495ce3c6452664 Mon Sep 17 00:00:00 2001 From: Christopher Date: Mon, 9 Mar 2020 13:46:45 -0500 Subject: [PATCH] add worker pool for ets access --- lib/cluster_kv.ex | 9 +++++++-- lib/cluster_kv/db.ex | 34 ++++++++++++++++++++-------------- lib/cluster_kv/ets_table.ex | 18 ++++++++++++++++++ mix.exs | 1 + mix.lock | 2 ++ 5 files changed, 48 insertions(+), 16 deletions(-) create mode 100644 lib/cluster_kv/ets_table.ex diff --git a/lib/cluster_kv.ex b/lib/cluster_kv.ex index 4910122..68e6f3d 100644 --- a/lib/cluster_kv.ex +++ b/lib/cluster_kv.ex @@ -3,7 +3,7 @@ defmodule ClusterKV do Documentation for ClusterKV. """ use Supervisor - alias ClusterKV.{Ring, DB} + alias ClusterKV.{DB, ETSTable, Ring} @spec start_link( name: atom(), @@ -27,10 +27,14 @@ defmodule ClusterKV do db = db_name(name) ring = ring_name(name) cluster_supervisor = cluster_supervisor_name(name) + ets = ets_name(name) ring_data = %Ring{name: ring, replicas: replicas, quorum: quorum, db: db} children = [ - {DB, [name: db]}, + {ETSTable, [name: ets]}, + :poolboy.child_spec(db, [name: {:local, db}, worker_module: DB, size: 8, max_overflow: 10], + table: ets + ), {Ring, [{:local, ring}, ring_data, []]}, {Cluster.Supervisor, [topologies, [name: cluster_supervisor]]} ] @@ -39,6 +43,7 @@ defmodule ClusterKV do end def db_name(name), do: Module.concat([name, DB]) + def ets_name(name), do: Module.concat([name, ETSTable]) def ring_name(name), do: Module.concat([name, Ring]) def cluster_supervisor_name(name), do: Module.concat([name, ClusterSupervisor]) diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index 8f1e17f..3e99b6a 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -4,8 +4,6 @@ defmodule ClusterKV.DB do alias __MODULE__ - @db_options [:set, {:read_concurrency, true}] - defstruct [:db, :batch_chunk, :batch_fun, :batch_ref, last_batch: 0, batch: []] @type element :: {key :: String.t(), value :: any()} @@ -22,38 +20,46 @@ defmodule ClusterKV.DB do @spec get(name :: module(), key :: binary(), timeout :: non_neg_integer() | :infinity) :: element() | :not_found def get(name, key, timeout \\ :infinity) do - GenServer.call(name, {:get, key}, timeout) + :poolboy.transaction(name, fn w -> + GenServer.call(w, {:get, key}, timeout) + end) end @spec put(name :: module(), key :: binary(), value :: term()) :: :ok def put(name, key, value) do - GenServer.cast(name, {:put, key, value}) + :poolboy.transaction(name, fn w -> + GenServer.cast(w, {:put, key, value}) + end) end @spec upsert(name :: module(), key :: binary(), value :: term(), fun()) :: :ok def upsert(name, key, value, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do - GenServer.cast(name, {:upsert, key, value, fun}) + :poolboy.transaction(name, fn w -> + GenServer.cast(w, {:upsert, key, value, fun}) + end) end @spec batch(name :: module(), batch :: [element()], chunk :: integer(), fun :: fun()) :: :ok def batch(name, batch, chunk \\ 10, fun \\ &{elem(&1, 0), &2}) do - GenServer.cast(name, {:batch, batch, chunk, fun}) + :poolboy.transaction(name, fn w -> + GenServer.cast(w, {:batch, batch, chunk, fun}) + end) end @spec stream(name :: module(), timeout :: non_neg_integer() | :infinity) :: Enumerable.t() def stream(name, timeout \\ :infinity) do - GenServer.call(name, :stream, timeout) + :poolboy.transaction(name, fn w -> + GenServer.call(w, :stream, timeout) + end) end - @spec start_link(name: module()) :: GenServer.on_start() - def start_link(name: name) do - GenServer.start_link(__MODULE__, name, name: name) + @spec start_link(table: :ets.tid() | atom()) :: GenServer.on_start() + def start_link(table: table) do + GenServer.start_link(__MODULE__, table) end - def init(name) do - Logger.debug("Opening ETS Table") - db = :ets.new(Module.concat([name, ETSTable]), @db_options) - {:ok, %DB{db: db}} + def init(table) do + {:ok, %DB{db: table}} end def handle_call(:stream, _from, %DB{db: db} = state) do diff --git a/lib/cluster_kv/ets_table.ex b/lib/cluster_kv/ets_table.ex new file mode 100644 index 0000000..31b11ea --- /dev/null +++ b/lib/cluster_kv/ets_table.ex @@ -0,0 +1,18 @@ +defmodule ClusterKV.ETSTable do + use GenServer + @db_options [:set, :public, :named_table, {:read_concurrency, true}, {:write_concurrency, true}] + + @spec create_table(name :: module()) :: :ets.tab() + def create_table(name) do + :ets.new(name, @db_options) + end + + def start_link(name: name) do + GenServer.start_link(__MODULE__, name) + end + + def init(name) do + create_table(name) + {:ok, %{}} + end +end diff --git a/mix.exs b/mix.exs index fd148ae..cbd99ba 100644 --- a/mix.exs +++ b/mix.exs @@ -23,6 +23,7 @@ defmodule ClusterKv.MixProject do {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false}, {:libcluster, "~> 3.2"}, {:libring, "~> 1.4"}, + {:poolboy, "~> 1.5"}, {:states_language, "~> 0.2"} ] end diff --git a/mix.lock b/mix.lock index 1600408..9583efc 100644 --- a/mix.lock +++ b/mix.lock @@ -2,11 +2,13 @@ "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"}, "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"}, "elixpath": {:hex, :elixpath, "0.1.0", "f860e931db7bda6856dc68145694ca429643cc068ef30d7ff6b4096d4357963e", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "30ce06079b41f1f5216ea2cd11605cfe4c82239628555cb3fde9f10055a6eb67"}, + "erlex": {:hex, :erlex, "0.2.5", "e51132f2f472e13d606d808f0574508eeea2030d487fc002b46ad97e738b0510", [:mix], [], "hexpm", "756d3e19b056339af674b715fdd752c5dac468cf9d0e2d1a03abf4574e99fbf8"}, "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"}, "json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"}, "libcluster": {:hex, :libcluster, "3.2.0", "ed483eea1b960b89a4cee374898644930b28759226716cd7aeaf707976c5966d", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "350e7c2cd5311b9dfb0156d5f5778561e5833f90fabdaac3cd79ea022c4dc3dc"}, "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"}, "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"}, + "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"}, "states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"}, "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"}, "xema": {:hex, :xema, "0.11.0", "7b5118418633cffc27092110d02d4faeea938149dd3f6c64299e41e747067e80", [:mix], [{:conv_case, "~> 0.2.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "51491c9a953d65069d4b30aa2f70bc45ff99fd1bc3345bc72ce4e644d01ea14e"}, -- 2.45.3