Documentation for ClusterKV.
"""
use Supervisor
- alias ClusterKV.{Ring, DB}
+ alias ClusterKV.{DB, ETSTable, Ring}
@spec start_link(
name: atom(),
db = db_name(name)
ring = ring_name(name)
cluster_supervisor = cluster_supervisor_name(name)
+ ets = ets_name(name)
ring_data = %Ring{name: ring, replicas: replicas, quorum: quorum, db: db}
children = [
- {DB, [name: db]},
+ {ETSTable, [name: ets]},
+ :poolboy.child_spec(db, [name: {:local, db}, worker_module: DB, size: 8, max_overflow: 10],
+ table: ets
+ ),
{Ring, [{:local, ring}, ring_data, []]},
{Cluster.Supervisor, [topologies, [name: cluster_supervisor]]}
]
end
def db_name(name), do: Module.concat([name, DB])
+ def ets_name(name), do: Module.concat([name, ETSTable])
def ring_name(name), do: Module.concat([name, Ring])
def cluster_supervisor_name(name), do: Module.concat([name, ClusterSupervisor])
alias __MODULE__
- @db_options [:set, {:read_concurrency, true}]
-
defstruct [:db, :batch_chunk, :batch_fun, :batch_ref, last_batch: 0, batch: []]
@type element :: {key :: String.t(), value :: any()}
@spec get(name :: module(), key :: binary(), timeout :: non_neg_integer() | :infinity) ::
element() | :not_found
def get(name, key, timeout \\ :infinity) do
- GenServer.call(name, {:get, key}, timeout)
+ :poolboy.transaction(name, fn w ->
+ GenServer.call(w, {:get, key}, timeout)
+ end)
end
@spec put(name :: module(), key :: binary(), value :: term()) :: :ok
def put(name, key, value) do
- GenServer.cast(name, {:put, key, value})
+ :poolboy.transaction(name, fn w ->
+ GenServer.cast(w, {:put, key, value})
+ end)
end
@spec upsert(name :: module(), key :: binary(), value :: term(), fun()) :: :ok
def upsert(name, key, value, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do
- GenServer.cast(name, {:upsert, key, value, fun})
+ :poolboy.transaction(name, fn w ->
+ GenServer.cast(w, {:upsert, key, value, fun})
+ end)
end
@spec batch(name :: module(), batch :: [element()], chunk :: integer(), fun :: fun()) :: :ok
def batch(name, batch, chunk \\ 10, fun \\ &{elem(&1, 0), &2}) do
- GenServer.cast(name, {:batch, batch, chunk, fun})
+ :poolboy.transaction(name, fn w ->
+ GenServer.cast(w, {:batch, batch, chunk, fun})
+ end)
end
@spec stream(name :: module(), timeout :: non_neg_integer() | :infinity) :: Enumerable.t()
def stream(name, timeout \\ :infinity) do
- GenServer.call(name, :stream, timeout)
+ :poolboy.transaction(name, fn w ->
+ GenServer.call(w, :stream, timeout)
+ end)
end
- @spec start_link(name: module()) :: GenServer.on_start()
- def start_link(name: name) do
- GenServer.start_link(__MODULE__, name, name: name)
+ @spec start_link(table: :ets.tid() | atom()) :: GenServer.on_start()
+ def start_link(table: table) do
+ GenServer.start_link(__MODULE__, table)
end
- def init(name) do
- Logger.debug("Opening ETS Table")
- db = :ets.new(Module.concat([name, ETSTable]), @db_options)
- {:ok, %DB{db: db}}
+ def init(table) do
+ {:ok, %DB{db: table}}
end
def handle_call(:stream, _from, %DB{db: db} = state) do
--- /dev/null
+defmodule ClusterKV.ETSTable do
+ use GenServer
+ @db_options [:set, :public, :named_table, {:read_concurrency, true}, {:write_concurrency, true}]
+
+ @spec create_table(name :: module()) :: :ets.tab()
+ def create_table(name) do
+ :ets.new(name, @db_options)
+ end
+
+ def start_link(name: name) do
+ GenServer.start_link(__MODULE__, name)
+ end
+
+ def init(name) do
+ create_table(name)
+ {:ok, %{}}
+ end
+end
{:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
{:libcluster, "~> 3.2"},
{:libring, "~> 1.4"},
+ {:poolboy, "~> 1.5"},
{:states_language, "~> 0.2"}
]
end
"conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"},
"dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"},
"elixpath": {:hex, :elixpath, "0.1.0", "f860e931db7bda6856dc68145694ca429643cc068ef30d7ff6b4096d4357963e", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "30ce06079b41f1f5216ea2cd11605cfe4c82239628555cb3fde9f10055a6eb67"},
+ "erlex": {:hex, :erlex, "0.2.5", "e51132f2f472e13d606d808f0574508eeea2030d487fc002b46ad97e738b0510", [:mix], [], "hexpm", "756d3e19b056339af674b715fdd752c5dac468cf9d0e2d1a03abf4574e99fbf8"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},
"json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"},
"libcluster": {:hex, :libcluster, "3.2.0", "ed483eea1b960b89a4cee374898644930b28759226716cd7aeaf707976c5966d", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "350e7c2cd5311b9dfb0156d5f5778561e5833f90fabdaac3cd79ea022c4dc3dc"},
"libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"},
+ "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"},
"telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
"xema": {:hex, :xema, "0.11.0", "7b5118418633cffc27092110d02d4faeea938149dd3f6c64299e41e747067e80", [:mix], [{:conv_case, "~> 0.2.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "51491c9a953d65069d4b30aa2f70bc45ff99fd1bc3345bc72ce4e644d01ea14e"},