]> Entropealabs - wampex_router.git/commitdiff
make session processes temporary, broadcast disconnect
authorChristopher <chris@entropealabs.com>
Thu, 23 Apr 2020 21:04:19 +0000 (16:04 -0500)
committerChristopher <chris@entropealabs.com>
Thu, 23 Apr 2020 21:04:19 +0000 (16:04 -0500)
lib/router/realms.ex
lib/router/session.ex

index 9baa3ce4be7c62e5b59fd3053fb71788f3b81565..b961570e91ab68dbaae3c18c3d35057f02d8f779 100644 (file)
@@ -31,17 +31,22 @@ defmodule Wampex.Router.Realms do
 
     DynamicSupervisor.start_child(
       s_name,
-      {Session,
-       [
-         %Session{
-           db: db,
-           name: name,
-           authentication_module: atm,
-           authorization_module: azm,
-           transport: transport,
-           transport_pid: socket
-         }
-       ]}
+      %{
+        id: Session,
+        start:
+          {Session, :start_link,
+           [
+             %Session{
+               db: db,
+               name: name,
+               authentication_module: atm,
+               authorization_module: azm,
+               transport: transport,
+               transport_pid: socket
+             }
+           ]},
+        restart: :temporary
+      }
     )
   end
 
index b154db6e935490889ca5715440a412e1c7ce2666..7bf57bdced3928e908c3411e897f3315a5ae1a71 100644 (file)
@@ -299,6 +299,7 @@ defmodule Wampex.Router.Session do
             db: db,
             authorization_module: am,
             peer: peer,
+            id: session_id,
             publish: %Publish{request_id: rid, options: opts, topic: topic, arg_list: arg_l, arg_kw: arg_kw}
           }
         } = sl
@@ -310,6 +311,7 @@ defmodule Wampex.Router.Session do
 
           subs = RealmSession.subscriptions(db, realm, topic)
           details = Map.put_new(opts, "topic", topic)
+          details = Map.put_new(details, "session_id", session_id)
 
           groups =
             Enum.reduce(subs, %{}, fn {id, {pid, node}}, acc ->
@@ -415,6 +417,7 @@ defmodule Wampex.Router.Session do
               realm: realm,
               request_id: ri,
               authorization_module: am,
+              id: session_id,
               peer: peer,
               call: %Call{request_id: call_id, options: opts, procedure: proc, arg_list: al, arg_kw: akw}
             } = data
@@ -427,6 +430,7 @@ defmodule Wampex.Router.Session do
                {id, {pid, node}} <- get_live_callee(proxy, callees, index, 3) do
             req_id = RealmSession.get_request_id(ri)
             opts = Map.put(opts, "procedure", proc)
+            opts = Map.put(opts, "session_id", session_id)
 
             send(
               {proxy, node},
@@ -642,6 +646,7 @@ defmodule Wampex.Router.Session do
   @impl true
   def handle_termination(_reason, _, %SL{
         data: %Sess{
+          id: session_id,
           db: db,
           registrations: regs,
           realm: realm,
@@ -657,6 +662,21 @@ defmodule Wampex.Router.Session do
       )
     end)
 
+    pub_id = RealmSession.get_id()
+    topic = "router.peer.disconnect"
+    d_subs = RealmSession.subscriptions(db, realm, topic)
+    details = %{topic: topic, session_id: session_id}
+
+    groups =
+      Enum.reduce(d_subs, %{}, fn {id, {pid, node}}, acc ->
+        event = {{:event, id, pub_id, [], %{}, details}, pid}
+        Map.update(acc, node, [event], &[event | &1])
+      end)
+
+    Enum.each(groups, fn {node, messages} ->
+      send({proxy, node}, messages)
+    end)
+
     Enum.each(regs, fn {id, _proc} ->
       RealmSession.unregister(db, realm, {id, {self(), Node.self()}}, regs)
     end)