wc = RealmSession.subscribe(db, realm, topic, {id, {self(), Node.self()}}, opts)
send_to_peer(Broker.subscribed(%Subscribed{request_id: rid, subscription_id: id}), tt, t)
+ Logger.info("Subscription #{id}: #{topic}")
+
{%SL{sl | data: %Sess{data | subscriptions: [{id, topic, wc} | subs]}},
[{:next_event, :internal, :transition}]}
subs = RealmSession.unsubscribe(db, realm, {subscription_id, {self(), Node.self()}}, subs)
send_to_peer(Broker.unsubscribed(%Unsubscribed{request_id: rid}), tt, t)
+ Logger.info("Unsubscribe: #{subscription_id}")
{:ok, %SL{sl | data: %Sess{sl.data | subscriptions: subs}}, [{:next_event, :internal, :transition}]}
end
:noop
end
+ Logger.info("Published #{pub_id}: #{topic}")
{sl, [{:next_event, :internal, :transition}]}
false ->
regs = RealmSession.unregister(db, realm, {registration_id, {self(), Node.self()}}, regs)
send_to_peer(Dealer.unregistered(%Unregistered{request_id: request_id}), tt, t)
+ Logger.info("Unregister: #{registration_id}")
{:ok, %SL{sl | data: %Sess{data | registrations: regs}}, [{:next_event, :internal, :transition}]}
end
regd = %Registered{request_id: rid, registration_id: id}
send_to_peer(Dealer.registered(regd), tt, t)
+ Logger.info("Registered #{id}: #{procedure}")
+
{%SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}},
[{:next_event, :internal, :transition}]}
RealmSession.round_robin(db, realm, proc, length(callees))
+ Logger.info("Call #{req_id}: #{proc}")
{%SL{sl | data: %Sess{data | request_id: req_id}}, [{:next_event, :internal, :transition}]}
else
{:error, :no_live_callees} ->
_ -> true
end)
+ Logger.info("Yield #{id}: #{get_in(opts, ["procedure"])}")
{:ok, %SL{data | data: %Sess{data.data | invocations: inv, yield: nil}}, [{:next_event, :internal, :transition}]}
end
_ -> true
end)
+ Logger.info("Call Error #{id}: #{get_in(dets, ["procedure"])}")
{:ok, %SL{data | data: %Sess{data.data | invocations: inv}}, [{:next_event, :internal, :transition}]}
end
%SL{data: %Sess{transport: tt, transport_pid: t, error: er}} = data
) do
send_to_peer(Peer.abort(%Abort{reason: er}), tt, t)
+
+ Logger.info("Abort")
{:ok, data, []}
end
%SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data
) do
send_to_peer(Peer.goodbye(goodbye), tt, t)
+
+ Logger.info("Goodbye")
{:ok, data, []}
end