From: Christopher Coté Date: Wed, 25 Nov 2020 03:42:05 +0000 (-0600) Subject: update cluster_kv to use local update functions X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=d58d5f7e046723553ea8bdf85012725bf7c6ba94;p=wampex_router.git update cluster_kv to use local update functions --- diff --git a/lib/router/realms/session.ex b/lib/router/realms/session.ex index 0d2e830..62c466f 100644 --- a/lib/router/realms/session.ex +++ b/lib/router/realms/session.ex @@ -12,7 +12,7 @@ defmodule Wampex.Router.Realms.Session do def unregister(db, realm, {id, _from} = value, regs) do {_id, proc} = reg = get_registration(id, regs) - ClusterKV.update(db, realm, proc, value, &upsert_remove/2) + ClusterKV.update(db, realm, proc, value, :replace) List.delete(regs, reg) end @@ -41,7 +41,7 @@ defmodule Wampex.Router.Realms.Session do def get_last_index([_ | t]), do: get_last_index(t) def round_robin(db, realm, procedure, len) do - ClusterKV.update(db, realm, "#{procedure}:callee_index", len, &update_round_robin/2) + ClusterKV.update(db, realm, "#{procedure}:callee_index", len, :round_robin) end def subscribe(db, realm, topic, val, %{"match" => "wildcard"}) do @@ -57,17 +57,16 @@ defmodule Wampex.Router.Realms.Session do def unsubscribe(db, realm, {id, _from} = value, subscriptions) do {_id, topic, wc} = sub = get_subscription(id, subscriptions) - {key, filter} = + key = case wc do true -> - key = get_wildcard_key(topic) - {key, &upsert_remove/2} + get_wildcard_key(topic) false -> - {topic, &upsert_remove/2} + topic end - ClusterKV.update(db, realm, key, value, filter) + ClusterKV.update(db, realm, key, value, :remove) remove_val(sub, subscriptions) end @@ -93,22 +92,6 @@ defmodule Wampex.Router.Realms.Session do current_id + 1 end - defp update_round_robin({id, v}, value) do - val = - case v do - [values] -> values - [_ | t] -> Enum.reduce(t, 0, fn a, _acc -> a end) - end - - case val + 1 do - ni when ni < value -> - {id, [ni]} - - _ -> - {id, [0]} - end - end - defp get_subscribers({db, {keyspace, key}, acc}) do case ClusterKV.get(db, keyspace, key, 500) do {_, subscribers} -> {db, {keyspace, key}, acc ++ subscribers} @@ -151,8 +134,6 @@ defmodule Wampex.Router.Realms.Session do List.delete(values, val) end - defp upsert_remove({id, values}, value), do: {id, remove_val(value, values)} - defp put_wildcard(db, realm, topic, val), do: ClusterKV.put_wildcard(db, realm, topic, val, ".", ":", "") defp get_wildcard_key(topic), do: ClusterKV.get_wildcard_key(topic, ".", ":", "") end diff --git a/mix.exs b/mix.exs index b719572..558a20e 100644 --- a/mix.exs +++ b/mix.exs @@ -27,7 +27,7 @@ defmodule Wampex.Router.MixProject do defp deps do [ {:cluster_kv, - git: "https://gitlab.com/entropealabs/cluster_kv.git", tag: "e1dfa31b10afba62d513688f21da64c3629ea65c"}, + git: "https://gitlab.com/entropealabs/cluster_kv.git", tag: "75494e6fa3725a1237d423cb3181ceeaf4205bd6"}, {:cors_plug, "~> 2.0"}, {:credo, "~> 1.2", only: [:test], runtime: false}, {:dialyxir, "~> 0.5.1", only: [:test], runtime: false}, diff --git a/mix.lock b/mix.lock index 08ab71b..26b347f 100644 --- a/mix.lock +++ b/mix.lock @@ -1,6 +1,6 @@ %{ "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, - "cluster_kv": {:git, "https://gitlab.com/entropealabs/cluster_kv.git", "e1dfa31b10afba62d513688f21da64c3629ea65c", [tag: "e1dfa31b10afba62d513688f21da64c3629ea65c"]}, + "cluster_kv": {:git, "https://gitlab.com/entropealabs/cluster_kv.git", "75494e6fa3725a1237d423cb3181ceeaf4205bd6", [tag: "75494e6fa3725a1237d423cb3181ceeaf4205bd6"]}, "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"}, "cors_plug": {:hex, :cors_plug, "2.0.2", "2b46083af45e4bc79632bd951550509395935d3e7973275b2b743bd63cc942ce", [:mix], [{:plug, "~> 1.8", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "f0d0e13f71c51fd4ef8b2c7e051388e4dfb267522a83a22392c856de7e46465f"}, "cowboy": {:hex, :cowboy, "2.7.0", "91ed100138a764355f43316b1d23d7ff6bdb0de4ea618cb5d8677c93a7a2f115", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "04fd8c6a39edc6aaa9c26123009200fc61f92a3a94f3178c527b70b767c6e605"},