From 1d7a8c8c1e2aed205f0037a4fa0da23f8b63570e Mon Sep 17 00:00:00 2001 From: Christopher Date: Sat, 21 Mar 2020 14:15:18 -0500 Subject: [PATCH] move realm sepcific logic to it's own module --- lib/router/admin.ex | 7 +- lib/router/realms/session.ex | 123 ++++++++++++++++++++++ lib/router/session.ex | 151 +++++++--------------------- lib/router/transports/web_socket.ex | 2 +- 4 files changed, 166 insertions(+), 117 deletions(-) create mode 100644 lib/router/realms/session.ex diff --git a/lib/router/admin.ex b/lib/router/admin.ex index 77e9c2c..16204d9 100644 --- a/lib/router/admin.ex +++ b/lib/router/admin.ex @@ -5,7 +5,8 @@ defmodule Wampex.Router.Admin do alias Wampex.Roles.Dealer.{Invocation, Result} alias Wampex.Roles.Peer.Error alias Wampex.Router.Authentication.{Peer, Realm} - alias Wampex.Router.{Realms, Session} + alias Wampex.Router.Realms + alias Realms.Session, as: RealmSession @procedures [ "admin.create_peer", @@ -29,8 +30,8 @@ defmodule Wampex.Router.Admin do Logger.info("Registering admin procedures: #{inspect(@procedures)}") Enum.reduce(@procedures, regs, fn proc, acc -> - val = {Session.get_global_id(), {self(), Node.self()}} - ClusterKV.put(db, realm, proc, val) + val = {RealmSession.get_id(), {self(), Node.self()}} + RealmSession.register(db, realm, proc, val) [{proc, val} | acc] end) end diff --git a/lib/router/realms/session.ex b/lib/router/realms/session.ex new file mode 100644 index 0000000..31aacbd --- /dev/null +++ b/lib/router/realms/session.ex @@ -0,0 +1,123 @@ +defmodule Wampex.Router.Realms.Session do + @moduledoc false + + @max_id 9_007_199_254_740_992 + + def register(db, realm, procedure, val) do + ClusterKV.put(db, realm, procedure, val) + ClusterKV.put(db, realm, "#{procedure}:callee_index", 0) + end + + def unregister(db, realm, {id, _from} = value, regs) do + {_id, proc} = reg = get_registration(id, regs) + ClusterKV.update(db, realm, proc, value, &upsert_remove/2) + List.delete(regs, reg) + end + + def callees(db, realm, procedure) do + with {_key, callees} <- ClusterKV.get(db, realm, procedure, 500), + {_key, [callee_index]} <- ClusterKV.get(db, realm, "#{procedure}:callee_index") do + {callees, callee_index} + end + end + + def round_robin(db, realm, procedure, len) do + ClusterKV.update(db, realm, "#{procedure}:callee_index", len, &update_round_robin/2) + end + + def subscribe(db, realm, topic, val, %{"match" => "wildcard"}) do + put_wildcard(db, realm, topic, val) + true + end + + def subscribe(db, realm, topic, val, _) do + ClusterKV.put(db, realm, topic, val) + false + end + + def unsubscribe(db, realm, {id, _from} = value, subscriptions) do + {_id, topic, wc} = sub = get_subscription(id, subscriptions) + + {key, filter} = + case wc do + true -> + key = get_wildcard_key(topic) + {key, &upsert_remove/2} + + false -> + {topic, &upsert_remove/2} + end + + ClusterKV.update(db, realm, key, value, filter) + remove_val(sub, subscriptions) + end + + def subscriptions(db, realm, topic) do + {_, _, subs} = + {db, {realm, topic}, []} + |> get_subscribers() + |> get_prefix() + |> get_wildcard() + + Enum.uniq(subs) + end + + def get_id do + Enum.random(0..@max_id) + end + + def get_request_id(current_id) when current_id == @max_id do + 1 + end + + def get_request_id(current_id) do + current_id + 1 + end + + defp update_round_robin({id, [values]}, value) do + case values + 1 do + ni when ni < value -> {id, [ni]} + _ -> {id, [0]} + end + end + + defp get_subscribers({db, {keyspace, key}, acc}) do + case ClusterKV.get(db, keyspace, key, 500) do + {_, subscribers} -> {db, {keyspace, key}, acc ++ subscribers} + _ -> {db, {keyspace, key}, acc} + end + end + + defp get_prefix({db, {keyspace, key}, acc}) do + l = ClusterKV.prefix(db, keyspace, key, ".", 2, 500) + {db, {keyspace, key}, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)} + end + + defp get_wildcard({db, {keyspace, key}, acc}) do + l = ClusterKV.wildcard(db, keyspace, key, ".", ":", "") + {db, {keyspace, key}, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)} + end + + defp get_registration(id, regs) do + Enum.find(regs, fn + {^id, _proc} -> true + _ -> false + end) + end + + defp get_subscription(id, subscriptions) do + Enum.find(subscriptions, fn + {^id, _topic, _} -> true + _ -> false + end) + end + + defp remove_val(val, values) do + List.delete(values, val) + end + + defp upsert_remove({id, values}, value), do: {id, remove_val(value, values)} + + defp put_wildcard(db, realm, topic, val), do: ClusterKV.put_wildcard(db, realm, topic, val, ".", ":", "") + defp get_wildcard_key(topic), do: ClusterKV.get_wildcard_key(topic, ".", ":", "") +end diff --git a/lib/router/session.ex b/lib/router/session.ex index 8406778..4dede66 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -4,8 +4,6 @@ defmodule Wampex.Router.Session do """ use StatesLanguage, data: "priv/router.json" - @max_id 9_007_199_254_740_992 - alias __MODULE__, as: Sess alias StatesLanguage, as: SL alias Wampex.Roles.{Broker, Caller, Dealer, Peer} @@ -14,6 +12,7 @@ defmodule Wampex.Router.Session do alias Broker.{Subscribed, Published, Event, Unsubscribed} alias Dealer.{Invocation, Registered, Result, Unregistered} alias Router.{Authentication, Realms} + alias Realms.Session, as: RealmSession @enforce_keys [:transport, :transport_pid] defstruct [ @@ -110,11 +109,7 @@ defmodule Wampex.Router.Session do # @handle_interrupt "HandleInterrupt" @handle_abort "HandleAbort" @handle_goodbye "HandleGoodbye" - # @handle_cancel "HandleCancel" - - def get_global_id do - Enum.random(0..@max_id) - end + # @handle_cancel "HandleCancel" @impl true def handle_resource( @@ -125,7 +120,7 @@ defmodule Wampex.Router.Session do ) do debug("Init") - {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}}, [{:next_event, :internal, :transition}]} + {:ok, %SL{data | data: %Sess{data.data | id: RealmSession.get_id()}}, [{:next_event, :internal, :transition}]} end @impl true @@ -258,18 +253,9 @@ defmodule Wampex.Router.Session do } = data } = sl ) do - id = get_global_id() + id = RealmSession.get_id() - wc = - case opts do - %{"match" => "wildcard"} -> - ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "") - true - - _ -> - ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}}) - false - end + wc = RealmSession.subscribe(db, realm, topic, {id, {self(), Node.self()}}, opts) send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t) @@ -293,7 +279,7 @@ defmodule Wampex.Router.Session do } } = sl ) do - subs = remove_subscription(subs, subscription_id, realm, db) + subs = RealmSession.unsubscribe(db, realm, {subscription_id, {self(), Node.self()}}, subs) send_to_peer(Broker.unsubscribed(%Unsubscribed{request_id: rid}), tt, t) @@ -316,15 +302,9 @@ defmodule Wampex.Router.Session do } } = sl ) do - pub_id = get_global_id() + pub_id = RealmSession.get_id() - {_, _, subs} = - {db, {realm, topic}, []} - |> get_subscribers() - |> get_prefix() - |> get_wildcard() - - subs = Enum.uniq(subs) + subs = RealmSession.subscriptions(db, realm, topic) Enum.each(subs, fn {id, {pid, node}} -> send( @@ -367,7 +347,7 @@ defmodule Wampex.Router.Session do } = data } = sl ) do - regs = remove_registration(regs, realm, registration_id, db) + regs = RealmSession.unregister(db, realm, {registration_id, {self(), Node.self()}}, regs) send_to_peer(Dealer.unregistered(%Unregistered{request_id: request_id}), tt, t) {:ok, %SL{sl | data: %Sess{data | registrations: regs}}} @@ -390,8 +370,8 @@ defmodule Wampex.Router.Session do } = data } = sl ) do - id = get_global_id() - ClusterKV.put(db, realm, procedure, {id, {self(), Node.self()}}) + id = RealmSession.get_id() + RealmSession.register(db, realm, procedure, {id, {self(), Node.self()}}) regd = %Registered{request_id: rid, registration_id: id} send_to_peer(Dealer.registered(regd), tt, t) @@ -418,9 +398,9 @@ defmodule Wampex.Router.Session do } = sl ) do data = - with {_key, callees} <- ClusterKV.get(db, realm, proc, 500), - {id, {pid, node}} <- get_live_callee(proxy, callees) do - req_id = get_request_id(ri) + with {callees, index} <- RealmSession.callees(db, realm, proc), + {id, {pid, node}} <- get_live_callee(proxy, callees, index, 3) do + req_id = RealmSession.get_request_id(ri) dets = Map.put(dets, "procedure", proc) send( @@ -441,6 +421,8 @@ defmodule Wampex.Router.Session do } ) + RealmSession.round_robin(db, realm, proc, length(callees)) + %SL{sl | data: %Sess{data | request_id: req_id}} else {:error, :no_live_callees} -> @@ -562,6 +544,12 @@ defmodule Wampex.Router.Session do {:ok, %SL{data | data: %Sess{sess | message: message}}, [{:next_event, :internal, :message_received}]} end + @impl true + def handle_info({:EXIT, t, reason}, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do + debug("Transport #{tt} exited because #{inspect(reason)}: Shutting down") + {:ok, data, [{:next_event, :internal, :abort}]} + end + @impl true def handle_info(event, state, data) do Logger.warn("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}") @@ -579,14 +567,14 @@ defmodule Wampex.Router.Session do subscriptions: subs } }) do - Logger.warn("Router session terminating #{inspect(reason)}") + Logger.debug("Router session terminating #{inspect(reason)}") Enum.each(regs, fn {id, _proc} -> - remove_registration(regs, realm, id, db) + RealmSession.unregister(db, realm, {id, {self(), Node.self()}}, regs) end) Enum.each(subs, fn {id, _proc, _} -> - remove_subscription(subs, id, realm, db) + RealmSession.unsubscribe(db, realm, {id, {self(), Node.self()}}, subs) end) send_to_peer( @@ -655,79 +643,24 @@ defmodule Wampex.Router.Session do end end - defp remove_subscription(subs, subscription_id, realm, db) do - {id, topic, wc} = - sub = - Enum.find(subs, fn - {^subscription_id, _topic, _} -> true - _ -> false - end) - - {key, filter} = - case wc do - true -> - key = ClusterKV.get_wildcard_key(topic, ".", ":", "") - - {key, - fn {id, values}, value -> - {id, - Enum.filter(values, fn - {_, ^value} -> false - _ -> true - end)} - end} - - false -> - {topic, - fn {id, values}, value -> - {id, List.delete(values, value)} - end} - end - - ClusterKV.update(db, realm, key, {id, {self(), Node.self()}}, filter) - List.delete(subs, sub) - end - - defp remove_registration(regs, realm, registration_id, db) do - {id, proc} = - reg = - Enum.find(regs, fn - {^registration_id, _proc} -> true - _ -> false - end) - - filter = fn {id, values}, value -> - {id, List.delete(values, value)} - end - - ClusterKV.update(db, realm, proc, {id, {self(), Node.self()}}, filter) - List.delete(regs, reg) - end + defp get_live_callee(_proxy, [], _index, 0), do: {:error, :no_live_callees} - defp get_live_callee(_proxy, []), do: {:error, :no_live_callees} + defp get_live_callee(proxy, callees, index, tries) do + {_id, {pid, node}} = c = Enum.at(callees, index) - defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do case GenServer.call({proxy, node}, {:is_up, pid}) do - true -> c - false -> get_live_callee(proxy, t) - end - end - - defp get_subscribers({db, {keyspace, key}, acc}) do - case ClusterKV.get(db, keyspace, key, 500) do - {_, subscribers} -> {db, {keyspace, key}, acc ++ subscribers} - _ -> {db, {keyspace, key}, acc} - end - end + true -> + c - defp get_prefix({db, {keyspace, key}, acc}) do - l = ClusterKV.prefix(db, keyspace, key, ".", 2, 500) - {db, {keyspace, key}, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)} - end + false -> + index = + case index + 1 do + ni when ni < length(callees) -> ni + _ -> 0 + end - defp get_wildcard({db, {keyspace, key}, acc}) do - l = ClusterKV.wildcard(db, keyspace, key, ".", ":", "") - {db, {keyspace, key}, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)} + get_live_callee(proxy, callees, index, tries - 1) + end end defp send_to_peer(msg, transport, pid) do @@ -763,14 +696,6 @@ defmodule Wampex.Router.Session do Enum.reverse(message) end - defp get_request_id(current_id) when current_id == @max_id do - 1 - end - - defp get_request_id(current_id) do - current_id + 1 - end - defp handle_message(msg, roles) do Enum.reduce_while(roles, nil, fn r, _ -> try do diff --git a/lib/router/transports/web_socket.ex b/lib/router/transports/web_socket.ex index 4cedd83..cf10e08 100644 --- a/lib/router/transports/web_socket.ex +++ b/lib/router/transports/web_socket.ex @@ -74,7 +74,7 @@ defmodule Wampex.Router.Transports.WebSocket do @impl true def terminate(reason, _req, _state) do - Logger.info("Websocket closing for reason #{inspect(reason)}") + Logger.debug("Websocket closing for reason #{inspect(reason)}") :ok end -- 2.45.3