db: db,
authorization_module: am,
peer: peer,
+ id: session_id,
publish: %Publish{request_id: rid, options: opts, topic: topic, arg_list: arg_l, arg_kw: arg_kw}
}
} = sl
subs = RealmSession.subscriptions(db, realm, topic)
details = Map.put_new(opts, "topic", topic)
+ details = Map.put_new(details, "session_id", session_id)
groups =
Enum.reduce(subs, %{}, fn {id, {pid, node}}, acc ->
realm: realm,
request_id: ri,
authorization_module: am,
+ id: session_id,
peer: peer,
call: %Call{request_id: call_id, options: opts, procedure: proc, arg_list: al, arg_kw: akw}
} = data
{id, {pid, node}} <- get_live_callee(proxy, callees, index, 3) do
req_id = RealmSession.get_request_id(ri)
opts = Map.put(opts, "procedure", proc)
+ opts = Map.put(opts, "session_id", session_id)
send(
{proxy, node},
@impl true
def handle_termination(_reason, _, %SL{
data: %Sess{
+ id: session_id,
db: db,
registrations: regs,
realm: realm,
)
end)
+ pub_id = RealmSession.get_id()
+ topic = "router.peer.disconnect"
+ d_subs = RealmSession.subscriptions(db, realm, topic)
+ details = %{topic: topic, session_id: session_id}
+
+ groups =
+ Enum.reduce(d_subs, %{}, fn {id, {pid, node}}, acc ->
+ event = {{:event, id, pub_id, [], %{}, details}, pid}
+ Map.update(acc, node, [event], &[event | &1])
+ end)
+
+ Enum.each(groups, fn {node, messages} ->
+ send({proxy, node}, messages)
+ end)
+
Enum.each(regs, fn {id, _proc} ->
RealmSession.unregister(db, realm, {id, {self(), Node.self()}}, regs)
end)