:subscribe,
:unsubscribe,
:publish,
- :error,
:realm,
:name,
:goodbye,
:peer_information,
+ error: "wamp.error.protocol_violation",
roles: [Peer, Broker, Dealer],
registrations: [],
subscriptions: [],
} = sl
) do
# TODO check for authentication request
- send_to_peer(Peer.welcome(%Welcome{session_id: id}), tt, t)
+ send_to_peer(
+ Peer.welcome(%Welcome{
+ session_id: id,
+ options: %{agent: "WAMPex Router", roles: %{broker: %{}, dealer: %{}}}
+ }),
+ tt,
+ t
+ )
{:ok,
%SL{sl | data: %Sess{data | hello_received: true, realm: realm, peer_information: dets}},
ClusterKV.put(db, key, {id, {self(), Node.self()}})
end
- tt.send_request(t, Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}))
+ send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t)
{:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic} | subs]}},
[{:next_event, :internal, :transition}]}
transport: tt,
transport_pid: t,
db: db,
- publish: {:publish, id, opts, topic, arg_l, arg_kw}
+ publish: {:publish, rid, opts, topic, arg_l, arg_kw}
}
} = sl
) do
)
end)
+ case opts do
+ %{acknowledge: true} ->
+ send_to_peer(Broker.published(%Published{request_id: rid, publication_id: pub_id}), tt, t)
+
+ %{} ->
+ :noop
+ end
+
{:ok, sl, [{:next_event, :internal, :transition}]}
end
data:
%Sess{
db: db,
+ transport: tt,
+ transport_pid: t,
registrations: regs,
unregister: {:unregister, request_id, registration_id}
} = data
} = sl
) do
- {id, proc} =
- reg =
- Enum.find(regs, fn
- {^registration_id, proc} -> true
- _ -> false
- end)
+ regs = remove_registration(regs, registration_id, db)
- filter = fn {id, values}, value ->
- {id, List.delete(values, value)}
- end
-
- ClusterKV.update(db, proc, {id, {self(), Node.self()}}, filter)
- {:ok, %SL{sl | data: %Sess{data | registrations: List.delete(regs, reg)}}}
+ send_to_peer(Dealer.unregistered(%Unregistered{request_id: request_id}), tt, t)
+ {:ok, %SL{sl | data: %Sess{data | registrations: regs}}}
end
@impl true
debug("Handling Registration #{inspect(key)}")
id = get_global_id()
ClusterKV.put(db, key, {id, {self(), Node.self()}})
- tt.send_request(t, Dealer.registered(%Registered{request_id: rid, registration_id: id}))
+ send_to_peer(Dealer.registered(%Registered{request_id: rid, registration_id: id}), tt, t)
{:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}},
[{:next_event, :internal, :transition}]}
key = "#{realm}:#{proc}"
{_key, callees} = ClusterKV.get(db, key, 500)
- req_id = get_request_id(ri)
-
- actions =
+ {data, actions} =
case get_live_callee(proxy, callees) do
{id, {pid, node}} ->
+ req_id = get_request_id(ri)
+
send(
{proxy, node},
{
}
)
- [{:next_event, :internal, :transition}]
+ {%SL{sl | data: %Sess{data | request_id: req_id}},
+ [{:next_event, :internal, :transition}]}
- {:error, :no_live_callees} = er ->
- [{:next_event, :internal, er}]
+ {:error, :no_live_callees} ->
+ {%SL{sl | data: %Sess{data | error: "wamp.error.no_callees_registered"}},
+ [{:next_event, :internal, :transition}, {:next_event, :internal, :error}]}
end
- {:ok, %SL{sl | data: %Sess{data | request_id: req_id}}, actions}
+ {:ok, data, actions}
end
@impl true
subscriptions: subs
}
}) do
- filter = fn {id, values}, value ->
- {id, List.delete(values, value)}
- end
-
- me = {self(), Node.self()}
-
Enum.each(regs, fn {id, proc} ->
- ClusterKV.update(db, proc, {id, me}, filter)
+ remove_registration(regs, id, db)
end)
send_to_peer(
)
end
+ defp remove_registration(regs, registration_id, db) do
+ {id, proc} =
+ reg =
+ Enum.find(regs, fn
+ {^registration_id, proc} -> true
+ _ -> false
+ end)
+
+ filter = fn {id, values}, value ->
+ {id, List.delete(values, value)}
+ end
+
+ ClusterKV.update(db, proc, {id, {self(), Node.self()}}, filter)
+ List.delete(regs, reg)
+ end
+
defp get_live_callee(proxy, []), do: {:error, :no_live_callees}
defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do