defmodule ClusterKV.DB do
+ @moduledoc false
use GenServer
require Logger
defmodule ClusterKV.ETSTable do
+ @moduledoc false
use GenServer
@db_options [:set, :public, :named_table, {:read_concurrency, true}, {:write_concurrency, true}]
defmodule ClusterKV.Ring do
+ @moduledoc false
@external_resource "priv/ring.json"
use StatesLanguage, data: "priv/ring.json"
require Logger
parts = Enum.with_index(parts)
Enum.filter(vals, fn {p, _val} ->
- Enum.all?(parts, fn {k, i} ->
- case Enum.at(p, i) do
- ^k -> true
- ^wildcard -> true
- _ -> false
- end
- end)
+ all_wildcard(parts, p, wildcard)
end)
end
{:ok, sl, []}
end
+ defp all_wildcard(parts, p, wildcard) do
+ Enum.all?(parts, fn {k, i} ->
+ case Enum.at(p, i) do
+ ^k -> true
+ ^wildcard -> true
+ _ -> false
+ end
+ end)
+ end
+
@spec get_prefix(
next :: [String.t()],
ring :: HashRing.t(),
|> DB.stream()
|> Enum.reduce(%{}, fn {k, v}, acc ->
old_nodes = HashRing.key_to_nodes(old_ring, k, repls) |> MapSet.new()
- new_nodes = HashRing.key_to_nodes(new_ring, k, repls) |> MapSet.new()
- nodes = MapSet.difference(new_nodes, old_nodes) |> MapSet.to_list()
-
- Enum.reduce(nodes, acc, fn
- ^me, a ->
- a
- n, a ->
- Map.update(a, n, [], &[{k, v} | &1])
- end)
+ new_ring
+ |> HashRing.key_to_nodes(k, repls)
+ |> MapSet.new()
+ |> MapSet.difference(old_nodes)
+ |> MapSet.to_list()
+ |> reduce_nodes(me, k, v, acc)
end)
send_batch(name, batch)
end)
end
+ defp reduce_nodes(nodes, me, k, v, acc) do
+ Enum.reduce(nodes, acc, fn
+ ^me, a ->
+ a
+
+ n, a ->
+ Map.update(a, n, [], &[{k, v} | &1])
+ end)
+ end
+
@spec maybe_reply(ref :: reference(), requests :: [Requests.t()], val :: any(), node :: node()) ::
{[Requests.t()], [:gen_statem.action()]}
defp maybe_reply(ref, reqs, val, node) do
version: "0.1.0",
elixir: "~> 1.9",
start_permanent: Mix.env() == :prod,
+ aliases: aliases(),
deps: deps()
]
end
defp deps do
[
+ {:credo, "~> 1.2", only: [:test], runtime: false},
{:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
{:libcluster, "~> 3.2"},
{:libring, "~> 1.4"},
{:states_language, "~> 0.2"}
]
end
+
+ defp aliases do
+ [
+ all_tests: [
+ "compile --force --warnings-as-errors",
+ "credo --strict",
+ "format --check-formatted",
+ "test",
+ "dialyzer --halt-exit-status"
+ ]
+ ]
+ end
end
%{
+ "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"},
+ "credo": {:hex, :credo, "1.3.2", "08d456dcf3c24da162d02953fb07267e444469d8dad3a2ae47794938ea467b3a", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "b11d28cce1f1f399dddffd42d8e21dcad783309e230f84b70267b1a5546468b6"},
"dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"},
"elixpath": {:hex, :elixpath, "0.1.0", "f860e931db7bda6856dc68145694ca429643cc068ef30d7ff6b4096d4357963e", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "30ce06079b41f1f5216ea2cd11605cfe4c82239628555cb3fde9f10055a6eb67"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},