alias Wampex.Client.Transports.WebSocket
@yield 70
+ @hello 1
+ @abort 3
@enforce_keys [:url, :roles]
defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1)
defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {r_id, message}
+ defp maybe_inject_request_id(r_id, [@hello | _] = message), do: {r_id, message}
+ defp maybe_inject_request_id(r_id, [@abort | _] = message), do: {r_id, message}
defp maybe_inject_request_id(r_id, message) do
request_id = get_request_id(r_id)
@impl true
def handle([@publish, id, opts, topic]) do
- handle([@publish, id, opts, topic, [], %{}])
+ handle([@publish, id, opts, topic, nil, nil])
end
@impl true
def handle([@publish, id, opts, topic, arg_l]) do
- handle([@publish, id, opts, topic, arg_l, %{}])
+ handle([@publish, id, opts, topic, arg_l, nil])
end
@impl true
@impl true
def handle([@invocation, id, reg_id, dets]) do
- handle([@invocation, id, reg_id, dets, [], %{}])
+ handle([@invocation, id, reg_id, dets, nil, nil])
end
@impl true
def handle([@invocation, id, reg_id, dets, arg_l]) do
- handle([@invocation, id, dets, reg_id, arg_l, %{}])
+ handle([@invocation, id, dets, reg_id, arg_l, nil])
end
@impl true
@impl true
def handle([@result, id, dets]) do
- handle([@result, id, dets, [], %{}])
+ handle([@result, id, dets, nil, nil])
end
@impl true
def handle([@result, id, dets, arg_l]) do
- handle([@result, id, dets, arg_l, %{}])
+ handle([@result, id, dets, arg_l, nil])
end
@impl true
@impl true
def handle([@call, id, dets, proc]) do
- handle([@call, id, dets, proc, [], %{}])
+ handle([@call, id, dets, proc, nil, nil])
end
@impl true
def handle([@call, id, dets, proc, arg_l]) do
- handle([@call, id, dets, proc, arg_l, %{}])
+ handle([@call, id, dets, proc, arg_l, nil])
end
@impl true
@impl true
def handle([@yield, id, dets]) do
- handle([@yield, id, dets, [], %{}])
+ handle([@yield, id, dets, nil, nil])
end
@impl true
def handle([@yield, id, dets, arg_l]) do
- handle([@yield, id, dets, arg_l, %{}])
+ handle([@yield, id, dets, arg_l, nil])
end
@impl true
}
end
+ defmodule Abort do
+ @moduledoc false
+ @enforce_keys [:reason]
+ defstruct [:reason, details: %{}]
+
+ @type t :: %__MODULE__{
+ reason: String.t(),
+ details: map()
+ }
+ end
+
@impl true
def add(roles), do: roles
[@welcome, si, opts]
end
+ @spec abort(Abort.t()) :: Wampex.message()
+ def abort(%Abort{reason: reason, details: opts}) do
+ [@abort, opts, reason]
+ end
+
@spec challenge(Challenge.t()) :: Wampex.message()
def challenge(%Challenge{auth_method: am, options: opts}) do
[@challenge, am, opts]
@impl true
def handle([@error, type, id, dets, error]) do
- handle([@error, type, id, dets, error, [], %{}])
+ handle([@error, type, id, dets, error, nil, nil])
end
@impl true
def handle([@error, type, id, dets, error, arg_l]) do
- handle([@error, type, id, dets, error, arg_l, %{}])
+ handle([@error, type, id, dets, error, arg_l, nil])
end
@impl true
@impl true
def handle([@event, sub_id, pub_id, dets]) do
- handle([@event, sub_id, pub_id, dets, [], %{}])
+ handle([@event, sub_id, pub_id, dets, nil, nil])
end
@impl true
def handle([@event, sub_id, pub_id, dets, arg_l]) do
- handle([@event, sub_id, pub_id, dets, arg_l, %{}])
+ handle([@event, sub_id, pub_id, dets, arg_l, nil])
end
@impl true
alias StatesLanguage, as: SL
alias Wampex.Realm
alias Wampex.Roles.{Broker, Dealer, Peer}
- alias Wampex.Roles.Peer.{Challenge, Welcome}
+ alias Wampex.Roles.Peer.{Challenge, Welcome, Abort}
alias Wampex.Router
alias __MODULE__, as: Sess
alias Broker.{Subscribed, Published, Event}
invocations: [],
message_queue: [],
request_id: 0,
- requests: []
+ requests: [],
+ hello_received: false
]
@type t :: %__MODULE__{
invocations: [],
message_queue: [],
request_id: integer(),
- requests: []
+ requests: [],
+ hello_received: boolean()
}
## States
@handle_init,
_,
@init,
- data
+ %SL{data: %Sess{transport_pid: t}} = data
) do
debug("Init")
%SL{data: %Sess{message: msg, roles: roles}} = data
) do
debug("Handling Message #{inspect(msg)}")
- {actions, id, response} = handle_message(msg, roles)
- {data, response} = maybe_update_response(data, response)
- debug("Response: #{inspect(response)}")
+
+ {data, actions} =
+ case handle_message(msg, roles) do
+ {actions, id, response} ->
+ {data, response} = maybe_update_response(data, response)
+ debug("Response: #{inspect(response)}")
+ {data, actions}
+
+ nil ->
+ {data, [{:next_event, :internal, :abort}]}
+ end
+
{:ok, data, actions}
end
id: id,
transport: tt,
transport_pid: t,
- hello: {:hello, realm, dets}
+ hello: {:hello, realm, dets},
+ hello_received: false
} = data
} = sl
) do
# TODO check for authentication request
- tt.send_request(t, Peer.welcome(%Welcome{session_id: id}))
+ send_to_peer(Peer.welcome(%Welcome{session_id: id}), tt, t)
- {:ok, %SL{sl | data: %Sess{data | realm: realm, peer_information: dets}},
+ {:ok,
+ %SL{sl | data: %Sess{data | hello_received: true, realm: realm, peer_information: dets}},
[{:next_event, :internal, :transition}]}
end
+ @impl true
+ def handle_resource(
+ @handle_hello,
+ _,
+ @hello,
+ %SL{
+ data:
+ %Sess{
+ hello_received: true
+ } = data
+ } = sl
+ ) do
+ {:ok,
+ %SL{
+ sl
+ | data: %Sess{
+ data
+ | error: "wamp.error.protocol_violation"
+ }
+ }, [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}]}
+ end
+
@impl true
def handle_resource(
@handle_subscribe,
{:ok, %SL{sl | data: %Sess{data | request_id: req_id}}, actions}
end
- defp get_live_callee(proxy, []), do: {:error, :no_live_callees}
-
- defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do
- case GenServer.call({proxy, node}, {:is_up, pid}) do
- true -> c
- false -> get_live_callee(proxy, t)
- end
- end
-
@impl true
def handle_resource(
@handle_yield,
end
@impl true
- def handle_resource(@handle_abort, _, @abort, data) do
+ def handle_resource(
+ @handle_abort,
+ _,
+ @abort,
+ %SL{data: %Sess{transport: tt, transport_pid: t, error: er}} = data
+ ) do
+ send_to_peer(Peer.abort(%Abort{reason: er}), tt, t)
Logger.warn("Aborting: #{data.data.error}")
{:ok, data, []}
end
@impl true
def handle_termination(reason, _, %SL{
- data: %Sess{db: db, registrations: regs, subscriptions: subs}
+ data: %Sess{
+ db: db,
+ transport: tt,
+ transport_pid: t,
+ registrations: regs,
+ subscriptions: subs
+ }
}) do
filter = fn {id, values}, value ->
{id, List.delete(values, value)}
Enum.each(regs, fn {id, proc} ->
ClusterKV.update(db, proc, {id, me}, filter)
end)
+
+ send_to_peer(
+ Peer.abort(%Abort{reason: "wamp.error.protocol_violation"}),
+ tt,
+ t
+ )
+ end
+
+ defp get_live_callee(proxy, []), do: {:error, :no_live_callees}
+
+ defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do
+ case GenServer.call({proxy, node}, {:is_up, pid}) do
+ true -> c
+ false -> get_live_callee(proxy, t)
+ end
end
defp get_subscribers({db, key, acc}) do
end
@impl true
- def websocket_info(_event, state) do
+ def websocket_info(
+ {:EXIT, _pid = session_pid, reason},
+ %WebSocket{session: session_pid} = state
+ ) do
+ Logger.warn("Session ended because #{inspect(reason)}, closing connection")
+ {:stop, state}
+ end
+
+ @impl true
+ def websocket_info(event, state) do
+ Logger.debug("Received unhandled event: #{inspect(event)}")
{:ok, state}
end
+ @impl true
+ def terminate(reason, _req, _state) do
+ Logger.info("Websocket closing for reason #{inspect(reason)}")
+ :ok
+ end
+
defp handle_payload({payload, %WebSocket{serializer: s} = state}) do
payload
|> s.deserialize!()
@url "ws://localhost:4000/ws"
@auth %Authentication{authid: "entone", authmethods: ["wampcra"], secret: "test1234"}
- @realm %Realm{name: "myrealm"}
+ @realm %Realm{name: "org.entropealabs.iot"}
@roles [Callee, Caller, Publisher, Subscriber]
@device "as987d9a8sd79a87ds"
callee_name = TestAbort
{:ok, pid} = Client.start_link(name: callee_name, session: @session)
Client.cast_send_request(callee_name, Peer.hello(%Hello{realm: "test", roles: [Callee]}))
- assert_receive {:EXIT, ^pid, :shutdown}
+ assert_receive {:EXIT, ^pid, :shutdown}, 1000
end
@tag :client