From 8ba1770a874b639e91ac4f5e1703d3c000100561 Mon Sep 17 00:00:00 2001 From: Christopher Date: Thu, 23 Apr 2020 16:04:19 -0500 Subject: [PATCH] make session processes temporary, broadcast disconnect --- lib/router/realms.ex | 27 ++++++++++++++++----------- lib/router/session.ex | 20 ++++++++++++++++++++ 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/lib/router/realms.ex b/lib/router/realms.ex index 9baa3ce..b961570 100644 --- a/lib/router/realms.ex +++ b/lib/router/realms.ex @@ -31,17 +31,22 @@ defmodule Wampex.Router.Realms do DynamicSupervisor.start_child( s_name, - {Session, - [ - %Session{ - db: db, - name: name, - authentication_module: atm, - authorization_module: azm, - transport: transport, - transport_pid: socket - } - ]} + %{ + id: Session, + start: + {Session, :start_link, + [ + %Session{ + db: db, + name: name, + authentication_module: atm, + authorization_module: azm, + transport: transport, + transport_pid: socket + } + ]}, + restart: :temporary + } ) end diff --git a/lib/router/session.ex b/lib/router/session.ex index b154db6..7bf57bd 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -299,6 +299,7 @@ defmodule Wampex.Router.Session do db: db, authorization_module: am, peer: peer, + id: session_id, publish: %Publish{request_id: rid, options: opts, topic: topic, arg_list: arg_l, arg_kw: arg_kw} } } = sl @@ -310,6 +311,7 @@ defmodule Wampex.Router.Session do subs = RealmSession.subscriptions(db, realm, topic) details = Map.put_new(opts, "topic", topic) + details = Map.put_new(details, "session_id", session_id) groups = Enum.reduce(subs, %{}, fn {id, {pid, node}}, acc -> @@ -415,6 +417,7 @@ defmodule Wampex.Router.Session do realm: realm, request_id: ri, authorization_module: am, + id: session_id, peer: peer, call: %Call{request_id: call_id, options: opts, procedure: proc, arg_list: al, arg_kw: akw} } = data @@ -427,6 +430,7 @@ defmodule Wampex.Router.Session do {id, {pid, node}} <- get_live_callee(proxy, callees, index, 3) do req_id = RealmSession.get_request_id(ri) opts = Map.put(opts, "procedure", proc) + opts = Map.put(opts, "session_id", session_id) send( {proxy, node}, @@ -642,6 +646,7 @@ defmodule Wampex.Router.Session do @impl true def handle_termination(_reason, _, %SL{ data: %Sess{ + id: session_id, db: db, registrations: regs, realm: realm, @@ -657,6 +662,21 @@ defmodule Wampex.Router.Session do ) end) + pub_id = RealmSession.get_id() + topic = "router.peer.disconnect" + d_subs = RealmSession.subscriptions(db, realm, topic) + details = %{topic: topic, session_id: session_id} + + groups = + Enum.reduce(d_subs, %{}, fn {id, {pid, node}}, acc -> + event = {{:event, id, pub_id, [], %{}, details}, pid} + Map.update(acc, node, [event], &[event | &1]) + end) + + Enum.each(groups, fn {node, messages} -> + send({proxy, node}, messages) + end) + Enum.each(regs, fn {id, _proc} -> RealmSession.unregister(db, realm, {id, {self(), Node.self()}}, regs) end) -- 2.45.3