From: Christopher Date: Mon, 16 Mar 2020 19:51:56 +0000 (-0500) Subject: better handling of errors and aborts X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=3b9d7be5d101a57aaccb37b4fdf67df8e3251ea4;p=wampex_client.git better handling of errors and aborts --- diff --git a/lib/client/session.ex b/lib/client/session.ex index 73402c3..5c55a1d 100644 --- a/lib/client/session.ex +++ b/lib/client/session.ex @@ -16,6 +16,8 @@ defmodule Wampex.Client.Session do alias Wampex.Client.Transports.WebSocket @yield 70 + @hello 1 + @abort 3 @enforce_keys [:url, :roles] @@ -393,6 +395,8 @@ defmodule Wampex.Client.Session do defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1) defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {r_id, message} + defp maybe_inject_request_id(r_id, [@hello | _] = message), do: {r_id, message} + defp maybe_inject_request_id(r_id, [@abort | _] = message), do: {r_id, message} defp maybe_inject_request_id(r_id, message) do request_id = get_request_id(r_id) diff --git a/lib/roles/broker.ex b/lib/roles/broker.ex index 2f5e955..4d5daa2 100644 --- a/lib/roles/broker.ex +++ b/lib/roles/broker.ex @@ -104,12 +104,12 @@ defmodule Wampex.Roles.Broker do @impl true def handle([@publish, id, opts, topic]) do - handle([@publish, id, opts, topic, [], %{}]) + handle([@publish, id, opts, topic, nil, nil]) end @impl true def handle([@publish, id, opts, topic, arg_l]) do - handle([@publish, id, opts, topic, arg_l, %{}]) + handle([@publish, id, opts, topic, arg_l, nil]) end @impl true diff --git a/lib/roles/callee.ex b/lib/roles/callee.ex index abe0396..d88777a 100644 --- a/lib/roles/callee.ex +++ b/lib/roles/callee.ex @@ -80,12 +80,12 @@ defmodule Wampex.Roles.Callee do @impl true def handle([@invocation, id, reg_id, dets]) do - handle([@invocation, id, reg_id, dets, [], %{}]) + handle([@invocation, id, reg_id, dets, nil, nil]) end @impl true def handle([@invocation, id, reg_id, dets, arg_l]) do - handle([@invocation, id, dets, reg_id, arg_l, %{}]) + handle([@invocation, id, dets, reg_id, arg_l, nil]) end @impl true diff --git a/lib/roles/caller.ex b/lib/roles/caller.ex index 6e2fd2b..254a5ab 100644 --- a/lib/roles/caller.ex +++ b/lib/roles/caller.ex @@ -52,12 +52,12 @@ defmodule Wampex.Roles.Caller do @impl true def handle([@result, id, dets]) do - handle([@result, id, dets, [], %{}]) + handle([@result, id, dets, nil, nil]) end @impl true def handle([@result, id, dets, arg_l]) do - handle([@result, id, dets, arg_l, %{}]) + handle([@result, id, dets, arg_l, nil]) end @impl true diff --git a/lib/roles/dealer.ex b/lib/roles/dealer.ex index 61bab0b..5bd7129 100644 --- a/lib/roles/dealer.ex +++ b/lib/roles/dealer.ex @@ -112,12 +112,12 @@ defmodule Wampex.Roles.Dealer do @impl true def handle([@call, id, dets, proc]) do - handle([@call, id, dets, proc, [], %{}]) + handle([@call, id, dets, proc, nil, nil]) end @impl true def handle([@call, id, dets, proc, arg_l]) do - handle([@call, id, dets, proc, arg_l, %{}]) + handle([@call, id, dets, proc, arg_l, nil]) end @impl true @@ -128,12 +128,12 @@ defmodule Wampex.Roles.Dealer do @impl true def handle([@yield, id, dets]) do - handle([@yield, id, dets, [], %{}]) + handle([@yield, id, dets, nil, nil]) end @impl true def handle([@yield, id, dets, arg_l]) do - handle([@yield, id, dets, arg_l, %{}]) + handle([@yield, id, dets, arg_l, nil]) end @impl true diff --git a/lib/roles/peer.ex b/lib/roles/peer.ex index 714bb77..13c581e 100644 --- a/lib/roles/peer.ex +++ b/lib/roles/peer.ex @@ -70,6 +70,17 @@ defmodule Wampex.Roles.Peer do } end + defmodule Abort do + @moduledoc false + @enforce_keys [:reason] + defstruct [:reason, details: %{}] + + @type t :: %__MODULE__{ + reason: String.t(), + details: map() + } + end + @impl true def add(roles), do: roles @@ -86,6 +97,11 @@ defmodule Wampex.Roles.Peer do [@welcome, si, opts] end + @spec abort(Abort.t()) :: Wampex.message() + def abort(%Abort{reason: reason, details: opts}) do + [@abort, opts, reason] + end + @spec challenge(Challenge.t()) :: Wampex.message() def challenge(%Challenge{auth_method: am, options: opts}) do [@challenge, am, opts] @@ -134,12 +150,12 @@ defmodule Wampex.Roles.Peer do @impl true def handle([@error, type, id, dets, error]) do - handle([@error, type, id, dets, error, [], %{}]) + handle([@error, type, id, dets, error, nil, nil]) end @impl true def handle([@error, type, id, dets, error, arg_l]) do - handle([@error, type, id, dets, error, arg_l, %{}]) + handle([@error, type, id, dets, error, arg_l, nil]) end @impl true diff --git a/lib/roles/subscriber.ex b/lib/roles/subscriber.ex index ed30fdc..dfced57 100644 --- a/lib/roles/subscriber.ex +++ b/lib/roles/subscriber.ex @@ -60,12 +60,12 @@ defmodule Wampex.Roles.Subscriber do @impl true def handle([@event, sub_id, pub_id, dets]) do - handle([@event, sub_id, pub_id, dets, [], %{}]) + handle([@event, sub_id, pub_id, dets, nil, nil]) end @impl true def handle([@event, sub_id, pub_id, dets, arg_l]) do - handle([@event, sub_id, pub_id, dets, arg_l, %{}]) + handle([@event, sub_id, pub_id, dets, arg_l, nil]) end @impl true diff --git a/lib/router/session.ex b/lib/router/session.ex index fda6cab..2f515c4 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -9,7 +9,7 @@ defmodule Wampex.Router.Session do alias StatesLanguage, as: SL alias Wampex.Realm alias Wampex.Roles.{Broker, Dealer, Peer} - alias Wampex.Roles.Peer.{Challenge, Welcome} + alias Wampex.Roles.Peer.{Challenge, Welcome, Abort} alias Wampex.Router alias __MODULE__, as: Sess alias Broker.{Subscribed, Published, Event} @@ -43,7 +43,8 @@ defmodule Wampex.Router.Session do invocations: [], message_queue: [], request_id: 0, - requests: [] + requests: [], + hello_received: false ] @type t :: %__MODULE__{ @@ -70,7 +71,8 @@ defmodule Wampex.Router.Session do invocations: [], message_queue: [], request_id: integer(), - requests: [] + requests: [], + hello_received: boolean() } ## States @@ -113,7 +115,7 @@ defmodule Wampex.Router.Session do @handle_init, _, @init, - data + %SL{data: %Sess{transport_pid: t}} = data ) do debug("Init") @@ -140,9 +142,18 @@ defmodule Wampex.Router.Session do %SL{data: %Sess{message: msg, roles: roles}} = data ) do debug("Handling Message #{inspect(msg)}") - {actions, id, response} = handle_message(msg, roles) - {data, response} = maybe_update_response(data, response) - debug("Response: #{inspect(response)}") + + {data, actions} = + case handle_message(msg, roles) do + {actions, id, response} -> + {data, response} = maybe_update_response(data, response) + debug("Response: #{inspect(response)}") + {data, actions} + + nil -> + {data, [{:next_event, :internal, :abort}]} + end + {:ok, data, actions} end @@ -157,17 +168,41 @@ defmodule Wampex.Router.Session do id: id, transport: tt, transport_pid: t, - hello: {:hello, realm, dets} + hello: {:hello, realm, dets}, + hello_received: false } = data } = sl ) do # TODO check for authentication request - tt.send_request(t, Peer.welcome(%Welcome{session_id: id})) + send_to_peer(Peer.welcome(%Welcome{session_id: id}), tt, t) - {:ok, %SL{sl | data: %Sess{data | realm: realm, peer_information: dets}}, + {:ok, + %SL{sl | data: %Sess{data | hello_received: true, realm: realm, peer_information: dets}}, [{:next_event, :internal, :transition}]} end + @impl true + def handle_resource( + @handle_hello, + _, + @hello, + %SL{ + data: + %Sess{ + hello_received: true + } = data + } = sl + ) do + {:ok, + %SL{ + sl + | data: %Sess{ + data + | error: "wamp.error.protocol_violation" + } + }, [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}]} + end + @impl true def handle_resource( @handle_subscribe, @@ -359,15 +394,6 @@ defmodule Wampex.Router.Session do {:ok, %SL{sl | data: %Sess{data | request_id: req_id}}, actions} end - defp get_live_callee(proxy, []), do: {:error, :no_live_callees} - - defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do - case GenServer.call({proxy, node}, {:is_up, pid}) do - true -> c - false -> get_live_callee(proxy, t) - end - end - @impl true def handle_resource( @handle_yield, @@ -398,7 +424,13 @@ defmodule Wampex.Router.Session do end @impl true - def handle_resource(@handle_abort, _, @abort, data) do + def handle_resource( + @handle_abort, + _, + @abort, + %SL{data: %Sess{transport: tt, transport_pid: t, error: er}} = data + ) do + send_to_peer(Peer.abort(%Abort{reason: er}), tt, t) Logger.warn("Aborting: #{data.data.error}") {:ok, data, []} end @@ -457,7 +489,13 @@ defmodule Wampex.Router.Session do @impl true def handle_termination(reason, _, %SL{ - data: %Sess{db: db, registrations: regs, subscriptions: subs} + data: %Sess{ + db: db, + transport: tt, + transport_pid: t, + registrations: regs, + subscriptions: subs + } }) do filter = fn {id, values}, value -> {id, List.delete(values, value)} @@ -468,6 +506,21 @@ defmodule Wampex.Router.Session do Enum.each(regs, fn {id, proc} -> ClusterKV.update(db, proc, {id, me}, filter) end) + + send_to_peer( + Peer.abort(%Abort{reason: "wamp.error.protocol_violation"}), + tt, + t + ) + end + + defp get_live_callee(proxy, []), do: {:error, :no_live_callees} + + defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do + case GenServer.call({proxy, node}, {:is_up, pid}) do + true -> c + false -> get_live_callee(proxy, t) + end end defp get_subscribers({db, key, acc}) do diff --git a/lib/router/transports/web_socket.ex b/lib/router/transports/web_socket.ex index 02173cb..88521d7 100644 --- a/lib/router/transports/web_socket.ex +++ b/lib/router/transports/web_socket.ex @@ -45,10 +45,26 @@ defmodule Wampex.Router.Transports.WebSocket do end @impl true - def websocket_info(_event, state) do + def websocket_info( + {:EXIT, _pid = session_pid, reason}, + %WebSocket{session: session_pid} = state + ) do + Logger.warn("Session ended because #{inspect(reason)}, closing connection") + {:stop, state} + end + + @impl true + def websocket_info(event, state) do + Logger.debug("Received unhandled event: #{inspect(event)}") {:ok, state} end + @impl true + def terminate(reason, _req, _state) do + Logger.info("Websocket closing for reason #{inspect(reason)}") + :ok + end + defp handle_payload({payload, %WebSocket{serializer: s} = state}) do payload |> s.deserialize!() diff --git a/test/wampex_test.exs b/test/wampex_test.exs index a559616..30cd19a 100644 --- a/test/wampex_test.exs +++ b/test/wampex_test.exs @@ -19,7 +19,7 @@ defmodule WampexTest do @url "ws://localhost:4000/ws" @auth %Authentication{authid: "entone", authmethods: ["wampcra"], secret: "test1234"} - @realm %Realm{name: "myrealm"} + @realm %Realm{name: "org.entropealabs.iot"} @roles [Callee, Caller, Publisher, Subscriber] @device "as987d9a8sd79a87ds" @@ -287,7 +287,7 @@ defmodule WampexTest do callee_name = TestAbort {:ok, pid} = Client.start_link(name: callee_name, session: @session) Client.cast_send_request(callee_name, Peer.hello(%Hello{realm: "test", roles: [Callee]})) - assert_receive {:EXIT, ^pid, :shutdown} + assert_receive {:EXIT, ^pid, :shutdown}, 1000 end @tag :client