]> Entropealabs - wampex_client.git/commitdiff
better handling of errors and aborts
authorChristopher <chris@entropealabs.com>
Mon, 16 Mar 2020 19:51:56 +0000 (14:51 -0500)
committerChristopher <chris@entropealabs.com>
Mon, 16 Mar 2020 19:51:56 +0000 (14:51 -0500)
lib/client/session.ex
lib/roles/broker.ex
lib/roles/callee.ex
lib/roles/caller.ex
lib/roles/dealer.ex
lib/roles/peer.ex
lib/roles/subscriber.ex
lib/router/session.ex
lib/router/transports/web_socket.ex
test/wampex_test.exs

index 73402c39d1908a714c3ca73f2b1f9a7fdef4f3bc..5c55a1de260a9279cc40b6d2d46931df3a37c309 100644 (file)
@@ -16,6 +16,8 @@ defmodule Wampex.Client.Session do
   alias Wampex.Client.Transports.WebSocket
 
   @yield 70
+  @hello 1
+  @abort 3
 
   @enforce_keys [:url, :roles]
 
@@ -393,6 +395,8 @@ defmodule Wampex.Client.Session do
   defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1)
 
   defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {r_id, message}
+  defp maybe_inject_request_id(r_id, [@hello | _] = message), do: {r_id, message}
+  defp maybe_inject_request_id(r_id, [@abort | _] = message), do: {r_id, message}
 
   defp maybe_inject_request_id(r_id, message) do
     request_id = get_request_id(r_id)
index 2f5e955eeb4a037048d1af735f3a208695f1a722..4d5daa23b3b9919762e73945ac016ca5c8424a06 100644 (file)
@@ -104,12 +104,12 @@ defmodule Wampex.Roles.Broker do
 
   @impl true
   def handle([@publish, id, opts, topic]) do
-    handle([@publish, id, opts, topic, [], %{}])
+    handle([@publish, id, opts, topic, nil, nil])
   end
 
   @impl true
   def handle([@publish, id, opts, topic, arg_l]) do
-    handle([@publish, id, opts, topic, arg_l, %{}])
+    handle([@publish, id, opts, topic, arg_l, nil])
   end
 
   @impl true
index abe039684b0db92920137ff2d92ed063379ea557..d88777af65582a4e1ca4677b28f3e46191eba1cf 100644 (file)
@@ -80,12 +80,12 @@ defmodule Wampex.Roles.Callee do
 
   @impl true
   def handle([@invocation, id, reg_id, dets]) do
-    handle([@invocation, id, reg_id, dets, [], %{}])
+    handle([@invocation, id, reg_id, dets, nil, nil])
   end
 
   @impl true
   def handle([@invocation, id, reg_id, dets, arg_l]) do
-    handle([@invocation, id, dets, reg_id, arg_l, %{}])
+    handle([@invocation, id, dets, reg_id, arg_l, nil])
   end
 
   @impl true
index 6e2fd2bf09fb422ef925fe18c94ebac97a9af17a..254a5abd7cb95bda531ec832071961b3edc25b45 100644 (file)
@@ -52,12 +52,12 @@ defmodule Wampex.Roles.Caller do
 
   @impl true
   def handle([@result, id, dets]) do
-    handle([@result, id, dets, [], %{}])
+    handle([@result, id, dets, nil, nil])
   end
 
   @impl true
   def handle([@result, id, dets, arg_l]) do
-    handle([@result, id, dets, arg_l, %{}])
+    handle([@result, id, dets, arg_l, nil])
   end
 
   @impl true
index 61bab0b912c5201c548b4f2a6e61d25d531d27b1..5bd712925cde9821267b84dbbb758c03ba8089b5 100644 (file)
@@ -112,12 +112,12 @@ defmodule Wampex.Roles.Dealer do
 
   @impl true
   def handle([@call, id, dets, proc]) do
-    handle([@call, id, dets, proc, [], %{}])
+    handle([@call, id, dets, proc, nil, nil])
   end
 
   @impl true
   def handle([@call, id, dets, proc, arg_l]) do
-    handle([@call, id, dets, proc, arg_l, %{}])
+    handle([@call, id, dets, proc, arg_l, nil])
   end
 
   @impl true
@@ -128,12 +128,12 @@ defmodule Wampex.Roles.Dealer do
 
   @impl true
   def handle([@yield, id, dets]) do
-    handle([@yield, id, dets, [], %{}])
+    handle([@yield, id, dets, nil, nil])
   end
 
   @impl true
   def handle([@yield, id, dets, arg_l]) do
-    handle([@yield, id, dets, arg_l, %{}])
+    handle([@yield, id, dets, arg_l, nil])
   end
 
   @impl true
index 714bb77ddbb44ec6b5d64f6c78892cdef16a2566..13c581e1a9342e312519430692c5f3f4d050596c 100644 (file)
@@ -70,6 +70,17 @@ defmodule Wampex.Roles.Peer do
           }
   end
 
+  defmodule Abort do
+    @moduledoc false
+    @enforce_keys [:reason]
+    defstruct [:reason, details: %{}]
+
+    @type t :: %__MODULE__{
+            reason: String.t(),
+            details: map()
+          }
+  end
+
   @impl true
   def add(roles), do: roles
 
@@ -86,6 +97,11 @@ defmodule Wampex.Roles.Peer do
     [@welcome, si, opts]
   end
 
+  @spec abort(Abort.t()) :: Wampex.message()
+  def abort(%Abort{reason: reason, details: opts}) do
+    [@abort, opts, reason]
+  end
+
   @spec challenge(Challenge.t()) :: Wampex.message()
   def challenge(%Challenge{auth_method: am, options: opts}) do
     [@challenge, am, opts]
@@ -134,12 +150,12 @@ defmodule Wampex.Roles.Peer do
 
   @impl true
   def handle([@error, type, id, dets, error]) do
-    handle([@error, type, id, dets, error, [], %{}])
+    handle([@error, type, id, dets, error, nil, nil])
   end
 
   @impl true
   def handle([@error, type, id, dets, error, arg_l]) do
-    handle([@error, type, id, dets, error, arg_l, %{}])
+    handle([@error, type, id, dets, error, arg_l, nil])
   end
 
   @impl true
index ed30fdc0772e987e7d535b6e3631407a80920d39..dfced57332f3b57e18ef8d812dc9b8833509bbf4 100644 (file)
@@ -60,12 +60,12 @@ defmodule Wampex.Roles.Subscriber do
 
   @impl true
   def handle([@event, sub_id, pub_id, dets]) do
-    handle([@event, sub_id, pub_id, dets, [], %{}])
+    handle([@event, sub_id, pub_id, dets, nil, nil])
   end
 
   @impl true
   def handle([@event, sub_id, pub_id, dets, arg_l]) do
-    handle([@event, sub_id, pub_id, dets, arg_l, %{}])
+    handle([@event, sub_id, pub_id, dets, arg_l, nil])
   end
 
   @impl true
index fda6cabfde622b99f0c6194c84c057830c655bfd..2f515c4dba1677089f79c627a3b83dc0ea85db1a 100644 (file)
@@ -9,7 +9,7 @@ defmodule Wampex.Router.Session do
   alias StatesLanguage, as: SL
   alias Wampex.Realm
   alias Wampex.Roles.{Broker, Dealer, Peer}
-  alias Wampex.Roles.Peer.{Challenge, Welcome}
+  alias Wampex.Roles.Peer.{Challenge, Welcome, Abort}
   alias Wampex.Router
   alias __MODULE__, as: Sess
   alias Broker.{Subscribed, Published, Event}
@@ -43,7 +43,8 @@ defmodule Wampex.Router.Session do
     invocations: [],
     message_queue: [],
     request_id: 0,
-    requests: []
+    requests: [],
+    hello_received: false
   ]
 
   @type t :: %__MODULE__{
@@ -70,7 +71,8 @@ defmodule Wampex.Router.Session do
           invocations: [],
           message_queue: [],
           request_id: integer(),
-          requests: []
+          requests: [],
+          hello_received: boolean()
         }
 
   ## States
@@ -113,7 +115,7 @@ defmodule Wampex.Router.Session do
         @handle_init,
         _,
         @init,
-        data
+        %SL{data: %Sess{transport_pid: t}} = data
       ) do
     debug("Init")
 
@@ -140,9 +142,18 @@ defmodule Wampex.Router.Session do
         %SL{data: %Sess{message: msg, roles: roles}} = data
       ) do
     debug("Handling Message #{inspect(msg)}")
-    {actions, id, response} = handle_message(msg, roles)
-    {data, response} = maybe_update_response(data, response)
-    debug("Response: #{inspect(response)}")
+
+    {data, actions} =
+      case handle_message(msg, roles) do
+        {actions, id, response} ->
+          {data, response} = maybe_update_response(data, response)
+          debug("Response: #{inspect(response)}")
+          {data, actions}
+
+        nil ->
+          {data, [{:next_event, :internal, :abort}]}
+      end
+
     {:ok, data, actions}
   end
 
@@ -157,17 +168,41 @@ defmodule Wampex.Router.Session do
               id: id,
               transport: tt,
               transport_pid: t,
-              hello: {:hello, realm, dets}
+              hello: {:hello, realm, dets},
+              hello_received: false
             } = data
         } = sl
       ) do
     # TODO check for authentication request
-    tt.send_request(t, Peer.welcome(%Welcome{session_id: id}))
+    send_to_peer(Peer.welcome(%Welcome{session_id: id}), tt, t)
 
-    {:ok, %SL{sl | data: %Sess{data | realm: realm, peer_information: dets}},
+    {:ok,
+     %SL{sl | data: %Sess{data | hello_received: true, realm: realm, peer_information: dets}},
      [{:next_event, :internal, :transition}]}
   end
 
+  @impl true
+  def handle_resource(
+        @handle_hello,
+        _,
+        @hello,
+        %SL{
+          data:
+            %Sess{
+              hello_received: true
+            } = data
+        } = sl
+      ) do
+    {:ok,
+     %SL{
+       sl
+       | data: %Sess{
+           data
+           | error: "wamp.error.protocol_violation"
+         }
+     }, [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}]}
+  end
+
   @impl true
   def handle_resource(
         @handle_subscribe,
@@ -359,15 +394,6 @@ defmodule Wampex.Router.Session do
     {:ok, %SL{sl | data: %Sess{data | request_id: req_id}}, actions}
   end
 
-  defp get_live_callee(proxy, []), do: {:error, :no_live_callees}
-
-  defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do
-    case GenServer.call({proxy, node}, {:is_up, pid}) do
-      true -> c
-      false -> get_live_callee(proxy, t)
-    end
-  end
-
   @impl true
   def handle_resource(
         @handle_yield,
@@ -398,7 +424,13 @@ defmodule Wampex.Router.Session do
   end
 
   @impl true
-  def handle_resource(@handle_abort, _, @abort, data) do
+  def handle_resource(
+        @handle_abort,
+        _,
+        @abort,
+        %SL{data: %Sess{transport: tt, transport_pid: t, error: er}} = data
+      ) do
+    send_to_peer(Peer.abort(%Abort{reason: er}), tt, t)
     Logger.warn("Aborting: #{data.data.error}")
     {:ok, data, []}
   end
@@ -457,7 +489,13 @@ defmodule Wampex.Router.Session do
 
   @impl true
   def handle_termination(reason, _, %SL{
-        data: %Sess{db: db, registrations: regs, subscriptions: subs}
+        data: %Sess{
+          db: db,
+          transport: tt,
+          transport_pid: t,
+          registrations: regs,
+          subscriptions: subs
+        }
       }) do
     filter = fn {id, values}, value ->
       {id, List.delete(values, value)}
@@ -468,6 +506,21 @@ defmodule Wampex.Router.Session do
     Enum.each(regs, fn {id, proc} ->
       ClusterKV.update(db, proc, {id, me}, filter)
     end)
+
+    send_to_peer(
+      Peer.abort(%Abort{reason: "wamp.error.protocol_violation"}),
+      tt,
+      t
+    )
+  end
+
+  defp get_live_callee(proxy, []), do: {:error, :no_live_callees}
+
+  defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do
+    case GenServer.call({proxy, node}, {:is_up, pid}) do
+      true -> c
+      false -> get_live_callee(proxy, t)
+    end
   end
 
   defp get_subscribers({db, key, acc}) do
index 02173cb8640a8a6538917c1788aa11dee6134743..88521d7613a0130399463115247acdf31ef02aaa 100644 (file)
@@ -45,10 +45,26 @@ defmodule Wampex.Router.Transports.WebSocket do
   end
 
   @impl true
-  def websocket_info(_event, state) do
+  def websocket_info(
+        {:EXIT, _pid = session_pid, reason},
+        %WebSocket{session: session_pid} = state
+      ) do
+    Logger.warn("Session ended because #{inspect(reason)}, closing connection")
+    {:stop, state}
+  end
+
+  @impl true
+  def websocket_info(event, state) do
+    Logger.debug("Received unhandled event: #{inspect(event)}")
     {:ok, state}
   end
 
+  @impl true
+  def terminate(reason, _req, _state) do
+    Logger.info("Websocket closing for reason #{inspect(reason)}")
+    :ok
+  end
+
   defp handle_payload({payload, %WebSocket{serializer: s} = state}) do
     payload
     |> s.deserialize!()
index a55961633d3fe2eac0b28fdbeceb53ae72e1570c..30cd19ab7bea4d9ea790d17481e24f3c7c306db3 100644 (file)
@@ -19,7 +19,7 @@ defmodule WampexTest do
 
   @url "ws://localhost:4000/ws"
   @auth %Authentication{authid: "entone", authmethods: ["wampcra"], secret: "test1234"}
-  @realm %Realm{name: "myrealm"}
+  @realm %Realm{name: "org.entropealabs.iot"}
   @roles [Callee, Caller, Publisher, Subscriber]
   @device "as987d9a8sd79a87ds"
 
@@ -287,7 +287,7 @@ defmodule WampexTest do
     callee_name = TestAbort
     {:ok, pid} = Client.start_link(name: callee_name, session: @session)
     Client.cast_send_request(callee_name, Peer.hello(%Hello{realm: "test", roles: [Callee]}))
-    assert_receive {:EXIT, ^pid, :shutdown}
+    assert_receive {:EXIT, ^pid, :shutdown}, 1000
   end
 
   @tag :client