]> Entropealabs - wampex.git/commitdiff
initial router session code and state machine in place
authorChristopher <chris@entropealabs.com>
Sun, 15 Mar 2020 02:14:47 +0000 (21:14 -0500)
committerChristopher <chris@entropealabs.com>
Sun, 15 Mar 2020 02:14:47 +0000 (21:14 -0500)
14 files changed:
lib/client/session.ex
lib/client/transports/web_socket.ex
lib/roles/broker.ex [new file with mode: 0644]
lib/roles/dealer.ex [new file with mode: 0644]
lib/roles/peer.ex
lib/router.ex
lib/router/rest.ex [new file with mode: 0644]
lib/router/session.ex [new file with mode: 0644]
lib/router/transports/web_socket.ex [new file with mode: 0644]
lib/wampex.ex
mix.exs
mix.lock
priv/router.json [new file with mode: 0644]
test/wampex_test.exs

index 997d557853833f9982f6e1cf6da84fdedad6ceac..73402c39d1908a714c3ca73f2b1f9a7fdef4f3bc 100644 (file)
@@ -13,7 +13,7 @@ defmodule Wampex.Client.Session do
   alias Wampex.Serializers.MessagePack
   alias __MODULE__, as: Sess
   alias Wampex.Client
-  alias Wampex.Client.Transport.WebSocket
+  alias Wampex.Client.Transports.WebSocket
 
   @yield 70
 
index ff1a1669ca33b898e9f519045438b949c029134b..b0c3d857d3a4613a6999f3c3b32fcdd7a62ce3e1 100644 (file)
@@ -1,4 +1,4 @@
-defmodule Wampex.Client.Transport.WebSocket do
+defmodule Wampex.Client.Transports.WebSocket do
   use WebSockex
 
   @behaviour Wampex.Transport
diff --git a/lib/roles/broker.ex b/lib/roles/broker.ex
new file mode 100644 (file)
index 0000000..2f5e955
--- /dev/null
@@ -0,0 +1,120 @@
+defmodule Wampex.Roles.Broker do
+  @moduledoc """
+  Handles requests and responses for a Broker
+  """
+  alias Wampex.Role
+  @behaviour Role
+
+  @publish 16
+  @published 17
+  @subscribe 32
+  @subscribed 33
+  @unsubscribe 34
+  @unsubscribed 35
+  @event 36
+
+  defmodule Event do
+    @moduledoc false
+    @enforce_keys [:subscription_id, :publication_id]
+    defstruct [:subscription_id, :publication_id, :arg_list, :arg_kw, options: %{}]
+
+    @type t :: %__MODULE__{
+            subscription_id: integer(),
+            publication_id: integer(),
+            arg_list: nonempty_list(any()) | nil,
+            arg_kw: map() | nil,
+            options: map()
+          }
+  end
+
+  defmodule Subscribed do
+    @moduledoc false
+    @enforce_keys [:request_id, :subscription_id]
+    defstruct [:request_id, :subscription_id]
+
+    @type t :: %__MODULE__{
+            request_id: integer(),
+            subscription_id: integer()
+          }
+  end
+
+  defmodule Unsubscribed do
+    @moduledoc false
+    @enforce_keys [:request_id]
+    defstruct [:request_id]
+
+    @type t :: %__MODULE__{
+            request_id: integer()
+          }
+  end
+
+  defmodule Published do
+    @moduledoc false
+    @enforce_keys [:request_id, :publication_id]
+    defstruct [:request_id, :publication_id]
+
+    @type t :: %__MODULE__{
+            request_id: integer(),
+            publication_id: integer()
+          }
+  end
+
+  @impl true
+  def add(roles) do
+    Map.put(roles, :broker, %{})
+  end
+
+  @spec event(Event.t()) :: Wampex.message()
+  def event(%Event{
+        subscription_id: si,
+        publication_id: pi,
+        arg_list: al,
+        arg_kw: akw,
+        options: opts
+      }) do
+    [@event, si, pi, opts, al, akw]
+  end
+
+  @spec subscribed(Subscribed.t()) :: Wampex.message()
+  def subscribed(%Subscribed{request_id: ri, subscription_id: si}) do
+    [@subscribed, ri, si]
+  end
+
+  @spec unsubscribed(Unsubscribed.t()) :: Wampex.message()
+  def unsubscribed(%Unsubscribed{request_id: ri}) do
+    [@unsubscribed, ri]
+  end
+
+  @spec published(Published.t()) :: Wampex.message()
+  def published(%Published{request_id: ri, publication_id: pi}) do
+    [@published, ri, pi]
+  end
+
+  @impl true
+  def handle([@subscribe, request_id, opts, topic]) do
+    {[{:next_event, :internal, :subscribe}], request_id,
+     {:update, :subscribe, {:subscribe, request_id, opts, topic}}}
+  end
+
+  @impl true
+  def handle([@unsubscribe, request_id, id]) do
+    {[{:next_event, :internal, :unsubscribe}], request_id,
+     {:update, :unsubscribe, {:unsubscribe, request_id, id}}}
+  end
+
+  @impl true
+  def handle([@publish, id, opts, topic]) do
+    handle([@publish, id, opts, topic, [], %{}])
+  end
+
+  @impl true
+  def handle([@publish, id, opts, topic, arg_l]) do
+    handle([@publish, id, opts, topic, arg_l, %{}])
+  end
+
+  @impl true
+  def handle([@publish, id, opts, topic, arg_l, arg_kw]) do
+    {[{:next_event, :internal, :publish}], id,
+     {:update, :publish, {:publish, id, opts, topic, arg_l, arg_kw}}}
+  end
+end
diff --git a/lib/roles/dealer.ex b/lib/roles/dealer.ex
new file mode 100644 (file)
index 0000000..a4c8624
--- /dev/null
@@ -0,0 +1,142 @@
+defmodule Wampex.Roles.Dealer do
+  @moduledoc """
+  Handles requests and responses for a Dealer
+  """
+  alias Wampex.Role
+  @behaviour Role
+
+  @call 48
+  @result 50
+  @register 64
+  @registered 65
+  @unregister 66
+  @unregistered 67
+  @invocation 68
+  @yield 70
+
+  defmodule Registered do
+    @moduledoc false
+    @enforce_keys [:request_id, :registration_id]
+    defstruct [:request_id, :registration_id]
+
+    @type t :: %__MODULE__{
+            request_id: integer(),
+            registration_id: integer()
+          }
+  end
+
+  defmodule Unregistered do
+    @moduledoc false
+    @enforce_keys [:request_id]
+    defstruct [:request_id]
+
+    @type t :: %__MODULE__{
+            request_id: integer()
+          }
+  end
+
+  defmodule Result do
+    @moduledoc false
+    @enforce_keys [:request_id]
+    defstruct [:request_id, :arg_list, :arg_kw, options: %{}]
+
+    @type t :: %__MODULE__{
+            request_id: integer(),
+            arg_list: nonempty_list(any()) | nil,
+            arg_kw: map() | nil,
+            options: map()
+          }
+  end
+
+  defmodule Invocation do
+    @moduledoc false
+    @enforce_keys [:request_id, :registration_id]
+    defstruct [:request_id, :registration_id, :arg_list, :arg_kw, options: %{}]
+
+    @type t :: %__MODULE__{
+            request_id: integer(),
+            registration_id: integer(),
+            arg_list: nonempty_list(any()) | nil,
+            arg_kw: map() | nil,
+            options: map()
+          }
+  end
+
+  @impl true
+  def add(roles) do
+    Map.put(roles, :dealer, %{})
+  end
+
+  @spec registered(Registered.t()) :: Wampex.message()
+  def registered(%Registered{request_id: ri, registration_id: reg_id}) do
+    [@registered, ri, reg_id]
+  end
+
+  @spec unregistered(Unregistered.t()) :: Wampex.message()
+  def unregistered(%Unregistered{request_id: ri}) do
+    [@unregistered, ri]
+  end
+
+  @spec invocation(Invocation.t()) :: Wampex.message()
+  def invocation(%Invocation{
+        request_id: ri,
+        registration_id: reg_id,
+        options: opts,
+        arg_list: al,
+        arg_kw: akw
+      }) do
+    [@invocation, ri, reg_id, opts, al, akw]
+  end
+
+  @spec result(Result.t()) :: Wampex.message()
+  def result(%Result{
+        request_id: ri,
+        arg_list: al,
+        arg_kw: akw,
+        options: opts
+      }) do
+    [@result, ri, opts, al, akw]
+  end
+
+  @impl true
+  def handle([@unregister, request_id, registration_id]) do
+    {[{:next_event, :internal, :unregister}], request_id,
+     {:update, :unregister, {:unregister, request_id, registration_id}}}
+  end
+
+  @impl true
+  def handle([@register, request_id, opts, procedure]) do
+    {[{:next_event, :internal, :register}], request_id,
+     {:update, :register, {:register, request_id, opts, procedure}}}
+  end
+
+  @impl true
+  def handle([@call, id, dets]) do
+    handle([@call, id, dets, [], %{}])
+  end
+
+  @impl true
+  def handle([@call, id, dets, arg_l]) do
+    handle([@call, id, dets, arg_l, %{}])
+  end
+
+  @impl true
+  def handle([@call, id, dets, arg_l, arg_kw]) do
+    {[{:next_event, :internal, :call}], id, {:update, :call, {:call, id, dets, arg_l, arg_kw}}}
+  end
+
+  @impl true
+  def handle([@yield, id, dets]) do
+    handle([@yield, id, dets, [], %{}])
+  end
+
+  @impl true
+  def handle([@yield, id, dets, arg_l]) do
+    handle([@yield, id, dets, arg_l, %{}])
+  end
+
+  @impl true
+  def handle([@yield, id, dets, arg_l, arg_kw]) do
+    {[{:next_event, :internal, :yield}], id, {:update, :yield, {:yield, id, dets, arg_l, arg_kw}}}
+  end
+end
index b1fffba0854709d36b36a200d13906050c24c749..714bb77ddbb44ec6b5d64f6c78892cdef16a2566 100644 (file)
@@ -26,6 +26,28 @@ defmodule Wampex.Roles.Peer do
           }
   end
 
+  defmodule Welcome do
+    @moduledoc false
+    @enforce_keys [:session_id]
+    defstruct [:session_id, options: %{}]
+
+    @type t :: %__MODULE__{
+            session_id: integer(),
+            options: %{}
+          }
+  end
+
+  defmodule Challenge do
+    @moduledoc false
+    @enforce_keys [:auth_method]
+    defstruct [:auth_method, options: %{}]
+
+    @type t :: %__MODULE__{
+            auth_method: String.t(),
+            options: %{}
+          }
+  end
+
   defmodule Goodbye do
     @moduledoc false
     @enforce_keys [:reason]
@@ -59,6 +81,16 @@ defmodule Wampex.Roles.Peer do
     [@hello, r, options]
   end
 
+  @spec welcome(Welcome.t()) :: Wampex.message()
+  def welcome(%Welcome{session_id: si, options: opts}) do
+    [@welcome, si, opts]
+  end
+
+  @spec challenge(Challenge.t()) :: Wampex.message()
+  def challenge(%Challenge{auth_method: am, options: opts}) do
+    [@challenge, am, opts]
+  end
+
   @spec goodbye(Goodbye.t()) :: Wampex.message()
   def goodbye(%Goodbye{reason: r, options: opts}) do
     [@goodbye, opts, r]
@@ -69,6 +101,17 @@ defmodule Wampex.Roles.Peer do
     [@authenticate, s, e]
   end
 
+  @impl true
+  def handle([@hello, realm, dets]) do
+    {[{:next_event, :internal, :hello}], nil, {:update, :hello, {:hello, realm, dets}}}
+  end
+
+  @impl true
+  def handle([@authenticate, sig, dets]) do
+    {[{:next_event, :internal, :authenticate}], nil,
+     {:update, :authenticate, {:authenticate, sig, dets}}}
+  end
+
   @impl true
   def handle([@welcome, session_id, _dets]) do
     {[{:next_event, :internal, :established}], nil, {:update, :id, session_id}}
index 0410ab5d83fee182d4223ee8818790ea593ae977..2bd3f6a9359178f33ebcb096dd1889b0d72e65f5 100644 (file)
@@ -1,2 +1,33 @@
 defmodule Wampex.Router do
+  use Supervisor
+
+  alias Wampex.Router.REST
+  alias Wampex.Router.Transports.WebSocket
+
+  @spec start_link(name: module(), port: pos_integer()) ::
+          {:ok, pid()}
+          | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
+  def start_link(name: name, port: port) when is_atom(name) do
+    Supervisor.start_link(__MODULE__, port, name: name)
+  end
+
+  @spec init(pos_integer()) ::
+          {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
+  def init(port) do
+    children = [
+      {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch()]}
+    ]
+
+    Supervisor.init(children, strategy: :one_for_one, max_restarts: 10, max_seconds: 10)
+  end
+
+  defp dispatch do
+    [
+      {:_,
+       [
+         {"/ws", WebSocket, []},
+         {:_, Plug.Cowboy.Handler, {REST, []}}
+       ]}
+    ]
+  end
 end
diff --git a/lib/router/rest.ex b/lib/router/rest.ex
new file mode 100644 (file)
index 0000000..a15ad87
--- /dev/null
@@ -0,0 +1,25 @@
+defmodule Wampex.Router.REST do
+  @moduledoc false
+  use Plug.Router
+  import Plug.Conn
+
+  plug(CORSPlug, origin: ["*"])
+  plug(:match)
+  plug(:dispatch)
+
+  get "/favicon.ico" do
+    send_resp(conn, 200, "ok")
+  end
+
+  get "/alivez" do
+    send_resp(conn, 200, "ok")
+  end
+
+  get "/readyz" do
+    send_resp(conn, 200, "ok")
+  end
+
+  match _ do
+    send_resp(conn, 404, "oops")
+  end
+end
diff --git a/lib/router/session.ex b/lib/router/session.ex
new file mode 100644 (file)
index 0000000..895eab5
--- /dev/null
@@ -0,0 +1,296 @@
+defmodule Wampex.Router.Session do
+  @moduledoc """
+  A state machine based process for managing a WAMP Session. Utilizes StatesLanguage to implement the gen_statem process. See priv/session.json for the JSON representation
+  """
+  use StatesLanguage, data: "priv/router.json"
+
+  @max_id 9_007_199_254_740_992
+
+  alias StatesLanguage, as: SL
+  alias Wampex.Realm
+  alias Wampex.Roles.{Broker, Dealer, Peer}
+  alias Wampex.Roles.Peer.{Challenge, Welcome}
+  alias Wampex.Router
+  alias __MODULE__, as: Sess
+
+  @enforce_keys [:transport, :transport_pid]
+  defstruct [
+    :id,
+    :transport,
+    :transport_pid,
+    :message,
+    :hello,
+    :authenticate,
+    :register,
+    :unregister,
+    :call,
+    :yield,
+    :subscribe,
+    :unsubscribe,
+    :publish,
+    :error,
+    :realm,
+    :name,
+    :goodbye,
+    roles: [Peer, Broker, Dealer],
+    message_queue: [],
+    request_id: 0,
+    requests: []
+  ]
+
+  @type t :: %__MODULE__{
+          id: integer() | nil,
+          transport_pid: pid() | module() | nil,
+          message: Wampex.message() | nil,
+          hello: Wampex.hello() | nil,
+          authenticate: Wampex.authenticate() | nil,
+          register: Wampex.register() | nil,
+          unregister: Wampex.unregister() | nil,
+          call: Wampex.call() | nil,
+          yield: Wampex.yield() | nil,
+          subscribe: Wampex.subscribe() | nil,
+          unsubscribe: Wampex.unsubscribe() | nil,
+          publish: Wampex.publish() | nil,
+          goodbye: binary() | nil,
+          realm: Realm.t(),
+          name: module() | nil,
+          roles: [module()],
+          message_queue: [],
+          request_id: integer(),
+          requests: []
+        }
+
+  ## States
+  @init "Init"
+  @established "Established"
+  @hello "Hello"
+  @authenticate "Authenticate"
+  @call "Call"
+  @yield "Yield"
+  @register "Register"
+  @unregister "Unregister"
+  @subscribe "Subscribe"
+  @unsubscribe "Unsubscribe"
+  @publish "Publish"
+  @error "Error"
+  @abort "Abort"
+  @goodbye "Goodbye"
+  @event "Cancel"
+
+  ## Resources
+  @handle_init "HandleInit"
+  @handle_established "HandleEstablished"
+  @handle_message "HandleMessage"
+  @handle_hello "HandleHello"
+  @handle_authenticate "HandleAuthenticate"
+  @handle_call "HandleCall"
+  @handle_yield "HandleYield"
+  @handle_register "HandleRegister"
+  @handle_unregister "HandleUnregister"
+  @handle_subscribe "HandleSubscribe"
+  @handle_unsubscribe "HandleUnsubscribe"
+  @handle_publish "HandlePublish"
+  @handle_interrupt "HandleError"
+  @handle_abort "HandleAbort"
+  @handle_goodbye "HandleGoodbye"
+  @handle_cancel "HandleCancel"
+
+  @impl true
+  def handle_resource(
+        @handle_init,
+        _,
+        @init,
+        data
+      ) do
+    debug("Init")
+
+    {:ok, %SL{data | data: %Sess{data.data | id: Enum.random(0..@max_id)}},
+     [{:next_event, :internal, :transition}]}
+  end
+
+  @impl true
+  def handle_resource(
+        @handle_established,
+        _,
+        @established,
+        data
+      ) do
+    debug("Established")
+    {:ok, data, []}
+  end
+
+  @impl true
+  def handle_resource(
+        @handle_message,
+        _,
+        _,
+        %SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data
+      ) do
+    debug("Handling Message #{inspect(msg)}")
+    {actions, id, response} = handle_message(msg, roles)
+    {%SL{data: sess} = data, response} = maybe_update_response(data, response)
+    {requests, actions} = resp = handle_response(id, actions, requests, response)
+    debug("Response: #{inspect(resp)}")
+    {:ok, %SL{data | data: %Sess{sess | requests: requests}}, actions}
+  end
+
+  @impl true
+  def handle_resource(
+        @handle_hello,
+        _,
+        @hello,
+        %SL{
+          data: %Sess{id: id, transport: tt, transport_pid: t}
+        } = sl
+      ) do
+    # TODO check for authentication request
+    tt.send_request(t, Peer.welcome(%Welcome{session_id: id}))
+    {:ok, sl, [{:next_event, :internal, :transition}]}
+  end
+
+  @impl true
+  def handle_resource(
+        @handle_register,
+        _,
+        @register,
+        %SL{
+          data: %Sess{transport: tt, transport_pid: t, register: r}
+        } = sl
+      ) do
+    # TODO check for authentication request
+    debug("Handling Registration #{inspect(r)}")
+    {:ok, sl, [{:next_event, :internal, :transition}]}
+  end
+
+  @impl true
+  def handle_resource(
+        @handle_subscribe,
+        _,
+        @subscribe,
+        %SL{
+          data: %Sess{transport: tt, transport_pid: t, subscribe: s}
+        } = sl
+      ) do
+    # TODO check for authentication request
+    debug("Handling Subscription #{inspect(s)}")
+    {:ok, sl, [{:next_event, :internal, :transition}]}
+  end
+
+  @impl true
+  def handle_resource(@handle_abort, _, @abort, data) do
+    Logger.warn("Aborting: #{data.data.error}")
+    {:ok, data, []}
+  end
+
+  @impl true
+  def handle_resource(
+        @handle_goodbye,
+        _,
+        @goodbye,
+        %SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data
+      ) do
+    tt.send_message(t, Peer.goodbye(goodbye))
+    {:ok, data, []}
+  end
+
+  @impl true
+  def handle_resource(resource, _, state, data) do
+    Logger.error("No specific resource handler for #{resource} in state #{state}")
+    {:ok, data, []}
+  end
+
+  @impl true
+  def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do
+    {:ok, %SL{data | data: %Sess{sess | message: message}},
+     [{:next_event, :internal, :message_received}]}
+  end
+
+  defp handle_response(nil, actions, requests, _), do: {requests, actions}
+
+  defp handle_response(id, actions, requests, response) do
+    Enum.reduce_while(requests, {requests, actions}, fn
+      {^id, nil}, _ ->
+        {:halt, {remove_request(id, requests), actions}}
+
+      {^id, from}, _ ->
+        {:halt, {remove_request(id, requests), [get_response_action(from, response) | actions]}}
+
+      {_, _}, acc ->
+        {:cont, acc}
+    end)
+  end
+
+  defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do
+    send(from, {:progress, arg_l, arg_kw})
+    []
+  end
+
+  defp get_response_action(from, response), do: {:reply, from, response}
+
+  defp maybe_update_response(data, {:update, key, resp}) do
+    {%SL{data | data: Map.put(data.data, key, resp)}, resp}
+  end
+
+  defp maybe_update_response(data, resp), do: {data, resp}
+
+  defp do_send(r_id, tt, t, message) do
+    {request_id, message} = maybe_inject_request_id(r_id, message)
+    tt.send_request(t, remove_nil_values(message))
+    request_id
+  end
+
+  defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1)
+
+  defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {r_id, message}
+
+  defp maybe_inject_request_id(r_id, message) do
+    request_id = get_request_id(r_id)
+    {request_id, List.insert_at(message, 1, request_id)}
+  end
+
+  defp get_request_id(current_id) when current_id == @max_id do
+    1
+  end
+
+  defp get_request_id(current_id) do
+    current_id + 1
+  end
+
+  defp remove_request(id, requests) do
+    Enum.filter(requests, fn
+      {^id, _} -> false
+      {_id, _} -> true
+    end)
+  end
+
+  defp send_message_queue(r_id, [], _, _, reqs), do: {r_id, reqs}
+
+  defp send_message_queue(r_id, mq, tt, t, reqs) do
+    mq
+    |> Enum.reverse()
+    |> Enum.reduce({r_id, reqs}, fn {request, from}, {id, requests} ->
+      r = do_send(id, tt, t, request)
+      {r, [{r, from} | requests]}
+    end)
+  end
+
+  def remove_cast_requests([]), do: []
+
+  def remove_cast_requests(requests) do
+    Enum.filter(requests, fn
+      {_, nil} -> false
+      {_, _} -> true
+    end)
+  end
+
+  defp handle_message(msg, roles) do
+    Enum.reduce_while(roles, nil, fn r, _ ->
+      try do
+        res = r.handle(msg)
+        {:halt, res}
+      rescue
+        FunctionClauseError -> {:cont, nil}
+      end
+    end)
+  end
+end
diff --git a/lib/router/transports/web_socket.ex b/lib/router/transports/web_socket.ex
new file mode 100644 (file)
index 0000000..54ae007
--- /dev/null
@@ -0,0 +1,76 @@
+defmodule Wampex.Router.Transports.WebSocket do
+  @moduledoc false
+  @behaviour :cowboy_websocket
+
+  require Logger
+
+  alias __MODULE__
+  alias Wampex.Router.Session
+  alias Wampex.Serializers.{JSON, MessagePack}
+
+  @protocol_header "sec-websocket-protocol"
+  @json 'wamp.2.json'
+  @msgpack "wamp.2.msgpack"
+
+  defstruct [:serializer, :session]
+
+  def send_request(transport, message) do
+    send(transport, {:send_request, message})
+  end
+
+  @impl true
+  def init(req, _state) do
+    {:ok, serializer, protocol} = get_serializer(req)
+    req = :cowboy_req.set_resp_header(@protocol_header, protocol, req)
+    {:cowboy_websocket, req, %WebSocket{serializer: serializer}}
+  end
+
+  @impl true
+  def websocket_init(state) do
+    {:ok, session} = start_session(state)
+    Logger.info("Websocket Initialized: #{inspect(state)}")
+    {:ok, %WebSocket{state | session: session}}
+  end
+
+  @impl true
+  def websocket_handle({type, payload}, state) do
+    Logger.debug("Received #{type} Payload: #{payload}")
+    handle_payload({payload, state})
+    {:ok, state, :hibernate}
+  end
+
+  @impl true
+  def websocket_info({:send_request, message}, %WebSocket{serializer: s} = state) do
+    {[{s.data_type(), s.serialize!(message)}], state, :hibernate}
+  end
+
+  @impl true
+  def websocket_info(_event, state) do
+    {:ok, state}
+  end
+
+  defp handle_payload({payload, %WebSocket{serializer: s} = state}) do
+    payload
+    |> s.deserialize!()
+    |> handle_event(state)
+  end
+
+  defp handle_event(ms, %WebSocket{session: s} = state) do
+    send(s, {:set_message, ms})
+    {:ok, state}
+  end
+
+  defp start_session(_state) do
+    Session.start_link(%Session{transport: WebSocket, transport_pid: self()})
+  end
+
+  defp get_serializer(req) do
+    @protocol_header
+    |> :cowboy_req.parse_header(req)
+    |> parse_protocol()
+  end
+
+  defp parse_protocol([@json]), do: {:ok, JSON, @json}
+  defp parse_protocol([@msgpack]), do: {:ok, MessagePack, @msgpack}
+  defp parse_protocol(test), do: Logger.error("Unknown protocol: #{inspect(test)}")
+end
index eb2bdb47b551357cb997db15b42ef7c4014e03e1..30a6f1f8349ed648deadc84749a649540bcdfce3 100644 (file)
@@ -6,6 +6,34 @@ defmodule Wampex do
   @type message :: nonempty_list(message_part())
   @type arg_list :: [] | nonempty_list(any())
   @type arg_keyword :: map()
+
+  @type hello :: {:hello, realm :: String.t(), details :: map()}
+
+  @type authenticate :: {:authenticate, signature :: binary(), details :: map()}
+
+  @type call ::
+          {:call, request_id :: integer(), details :: map(), arg_list :: arg_list(),
+           arg_keywords :: arg_keyword()}
+
+  @type yield ::
+          {:yield, request_id :: integer(), details :: map(), arg_list :: arg_list(),
+           arg_keywords :: arg_keyword()}
+
+  @type publish ::
+          {:publish, request_id :: integer(), details :: map(), topic :: String.t(),
+           arg_list :: arg_list(), arg_keywords :: arg_keyword()}
+
+  @type register ::
+          {:register, request_id :: integer(), details :: map(), procedure :: String.t()}
+
+  @type unregister ::
+          {:unregister, request_id :: integer(), id :: integer()}
+
+  @type unsubscribe ::
+          {:unsubscribe, request_id :: integer(), id :: integer()}
+
+  @type subscribe ::
+          {:subscribe, request_id :: integer(), topic :: String.t(), details :: map()}
   @type invocation ::
           {:invocation, request_id :: integer(), registration_id :: integer(), details :: map(),
            arg_list :: arg_list(), arg_keywords :: arg_keyword()}
@@ -18,7 +46,16 @@ defmodule Wampex do
              arg_keyword :: arg_keyword()}
   @type handle_response ::
           {:ok, integer()}
+          | publish()
+          | hello()
+          | authenticate()
+          | unsubscribe()
+          | subscribe()
           | invocation()
+          | register()
+          | unregister()
+          | call()
+          | yield()
           | {:ok, details :: map(), arg_list :: arg_list(), arg_keyword :: arg_keyword()}
           | error()
           | event()
diff --git a/mix.exs b/mix.exs
index f28dd69c905574c1a6cbd9cff41e7d29d738a2c1..4de3490d56664d0ce88a42b17d7297bcde735ade 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -37,13 +37,15 @@ defmodule Wampex.MixProject do
 
   defp deps do
     [
-      {:excoveralls, "~> 0.12.2", only: [:dev, :test], runtime: false},
+      {:cors_plug, "~> 2.0"},
       {:credo, "~> 1.2", only: [:dev, :test], runtime: false},
       {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
       {:ex_doc, "~> 0.21", only: :dev, runtime: false},
+      {:excoveralls, "~> 0.12.2", only: [:dev, :test], runtime: false},
       {:jason, "~> 1.1"},
       {:msgpack, "~> 0.7.0"},
       {:pbkdf2, "~> 2.0"},
+      {:plug_cowboy, "~> 2.1"},
       {:states_language, "~> 0.2"},
       {:websockex, "~> 0.4"}
     ]
index 5fb72e4212d426042919b8eac2fdee89b70908ed..bc5f8f6bb56b027737cdcd14b411d5e0eaf1babd 100644 (file)
--- a/mix.lock
+++ b/mix.lock
@@ -2,7 +2,10 @@
   "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
   "certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"},
   "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"},
+  "cors_plug": {:hex, :cors_plug, "2.0.2", "2b46083af45e4bc79632bd951550509395935d3e7973275b2b743bd63cc942ce", [:mix], [{:plug, "~> 1.8", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "f0d0e13f71c51fd4ef8b2c7e051388e4dfb267522a83a22392c856de7e46465f"},
   "coveralls": {:hex, :coveralls, "2.1.0", "b44cea21202c0e1994dfced8cedbff66bcd03eeeb565484345b28afacda0559b", [:rebar3], [{:jsx, "2.10.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "bbfbe2a7bebb2d22db15a657e6c1733243e1501714d60fe14a14790d96e7c3b2"},
+  "cowboy": {:hex, :cowboy, "2.7.0", "91ed100138a764355f43316b1d23d7ff6bdb0de4ea618cb5d8677c93a7a2f115", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "04fd8c6a39edc6aaa9c26123009200fc61f92a3a94f3178c527b70b767c6e605"},
+  "cowlib": {:hex, :cowlib, "2.8.0", "fd0ff1787db84ac415b8211573e9a30a3ebe71b5cbff7f720089972b2319c8a4", [:rebar3], [], "hexpm", "79f954a7021b302186a950a32869dbc185523d99d3e44ce430cd1f3289f41ed4"},
   "credo": {:hex, :credo, "1.2.2", "f57faf60e0a12b0ba9fd4bad07966057fde162b33496c509b95b027993494aab", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8f2623cd8c895a6f4a55ef10f3fdf6a55a9ca7bef09676bd835551687bf8a740"},
   "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"},
   "earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm", "8cf8a291ebf1c7b9539e3cddb19e9cef066c2441b1640f13c34c1d3cfc825fec"},
   "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a10c6eb62cca416019663129699769f0c2ccf39428b3bb3c0cb38c718a0c186d"},
   "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"},
   "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
+  "mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm", "6cbe761d6a0ca5a31a0931bf4c63204bceb64538e664a8ecf784a9a6f3b875f1"},
   "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
   "msgpack": {:hex, :msgpack, "0.7.0", "128ae0a2227c7e7a2847c0f0f73551c268464f8c1ee96bffb920bc0a5712b295", [:rebar3], [], "hexpm", "4649353da003e6f438d105e4b1e0f17757f6f5ec8687a6f30875ff3ac4ce2a51"},
   "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"},
   "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"},
   "pbkdf2": {:hex, :pbkdf2, "2.0.0", "11c23279fded5c0027ab3996cfae77805521d7ef4babde2bd7ec04a9086cf499", [:rebar3], [], "hexpm", "1e793ce6fdb0576613115714deae9dfc1d1537eaba74f07efb36de139774488d"},
+  "plug": {:hex, :plug, "1.9.0", "8d7c4e26962283ff9f8f3347bd73838e2413fbc38b7bb5467d5924f68f3a5a4a", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "9902eda2c52ada2a096434682e99a2493f5d06a94d6ac6bcfff9805f952350f1"},
+  "plug_cowboy": {:hex, :plug_cowboy, "2.1.2", "8b0addb5908c5238fac38e442e81b6fcd32788eaa03246b4d55d147c47c5805e", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "7d722581ce865a237e14da6d946f92704101740a256bd13ec91e63c0b122fc70"},
+  "plug_crypto": {:hex, :plug_crypto, "1.1.2", "bdd187572cc26dbd95b87136290425f2b580a116d3fb1f564216918c9730d227", [:mix], [], "hexpm", "6b8b608f895b6ffcfad49c37c7883e8df98ae19c6a28113b02aa1e9c5b22d6b5"},
+  "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
   "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm", "13104d7897e38ed7f044c4de953a6c28597d1c952075eb2e328bc6d6f2bfc496"},
   "states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"},
   "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
diff --git a/priv/router.json b/priv/router.json
new file mode 100644 (file)
index 0000000..a0b99a3
--- /dev/null
@@ -0,0 +1,147 @@
+{
+  "Comment": "Router Session State Machine",
+  "StartAt": "Init",
+  "States": {
+    "Init": {
+      "Type": "Task",
+      "Resource": "HandleInit",
+      "Next": "Established"
+    },
+    "Established": {
+      "Type": "Choice",
+      "Resource": "HandleEstablished",
+      "Choices": [
+        {
+          "StringEquals": ":message_received",
+          "Next": "Message"
+        },
+        {
+          "StringEquals": ":goodbye",
+          "Next": "GoodBye"
+        },
+        {
+          "StringEquals": ":abort",
+          "Next": "Abort"
+        }
+      ]
+    },
+    "Message": {
+      "Type": "Choice",
+      "Resource": "HandleMessage",
+      "Choices": [
+        {
+          "StringEquals": ":hello",
+          "Next": "Hello"
+        },
+        {
+          "StringEquals": ":authenticate",
+          "Next": "Authenticate"
+        },
+        {
+          "StringEquals": ":call",
+          "Next": "Call"
+        },
+        {
+          "StringEquals": ":yield",
+          "Next": "Yield"
+        },
+        {
+          "StringEquals": ":register",
+          "Next": "Register"
+        },
+        {
+          "StringEquals": ":unregister",
+          "Next": "Unregister"
+        },
+        {
+          "StringEquals": ":subscribe",
+          "Next": "Subscribe"
+        },
+        {
+          "StringEquals": ":unsubscribe",
+          "Next": "Unsubscribe"
+        },
+        {
+          "StringEquals": ":publish",
+          "Next": "Publish"
+        },
+        {
+          "StringEquals": ":error",
+          "Next": "Error"
+        },
+        {
+          "StringEquals": ":abort",
+          "Next": "Abort"
+        },
+        {
+          "StringEquals": ":goodbye",
+          "Next": "GoodBye"
+        },
+        {
+          "StringEquals": ":cancel",
+          "Next": "Cancel"
+        }
+      ]
+    },
+    "Hello": {
+      "Type": "Task",
+      "Resource": "HandleHello",
+      "Next": "Established"
+    },
+    "Authenticate": {
+      "Type": "Task",
+      "Resource": "HandleAuthenticate",
+      "Next": "Established"
+    },
+    "Call": {
+      "Type": "Task",
+      "Resource": "HandleCall",
+      "Next": "Established"
+    },
+    "Yield": {
+      "Type": "Task",
+      "Resource": "HandleYield",
+      "Next": "Established"
+    },
+    "Register":{
+      "Type": "Task",
+      "Resource": "HandleRegister",
+      "Next": "Established"
+    },
+    "Unregister":{
+      "Type": "Task",
+      "Resource": "HandleUnregister",
+      "Next": "Established"
+    },
+    "Subscribe":{
+      "Type": "Task",
+      "Resource": "HandleSubscribe",
+      "Next": "Established"
+    },
+    "Unsubscribe":{
+      "Type": "Task",
+      "Resource": "HandleUnsubscribe",
+      "Next": "Established"
+    },
+    "Publish":{
+      "Type": "Task",
+      "Resource": "HandlePublish",
+      "Next": "Established"
+    },
+    "Error":{
+      "Type": "Task",
+      "Resource": "HandleError",
+      "Next": "Established"
+    },
+    "GoodBye": {
+      "Type": "Task",
+      "Resource": "HandleGoodbye",
+      "End": true
+    },
+    "Abort": {
+      "Type": "Task",
+      "Resource": "HandleAbort",
+      "End": true
+    }
+  }
+}
index 903f1b594ff949d733050f9f413fa34ed63cc752..b9b8ef4f622c97feef61f0de37811074427c47bd 100644 (file)
@@ -4,52 +4,67 @@ defmodule WampexTest do
 
   alias Wampex.{Authentication, Realm}
   alias Wampex.Client
-  alias Wampex.Roles.{Callee, Caller, Peer, Publisher, Subscriber}
+  alias Wampex.Roles.{Broker, Callee, Caller, Dealer, Peer, Publisher, Subscriber}
+  alias Wampex.Router
   alias Wampex.Serializers.{JSON, MessagePack}
+  alias Broker.{Event, Published, Subscribed, Unsubscribed}
   alias Callee.{Register, Unregister, Yield}
   alias Caller.Call
   alias Client.Session
+  alias Dealer.{Invocation, Registered, Result, Unregistered}
   alias Peer.{Authenticate, Goodbye, Hello}
   alias Publisher.Publish
   alias Subscriber.{Subscribe, Unsubscribe}
   require Logger
 
-  @url "ws://localhost:18080/ws"
+  @url "ws://localhost:4000/ws"
   @auth %Authentication{authid: "entone", authmethods: ["wampcra"], secret: "test1234"}
   @realm %Realm{name: "com.myrealm"}
   @roles [Callee, Caller, Publisher, Subscriber]
   @device "as987d9a8sd79a87ds"
 
+  setup_all do
+    [server: Router.start_link(name: TestRouter, port: 4000)]
+  end
+
   @session %Session{url: @url, realm: @realm, roles: @roles}
 
+  @tag :client
   test "session_name" do
     assert Test.Session = Client.session_name(Test)
   end
 
+  @tag :client
   test "subscriber_registry_name" do
     assert Test.SubscriberRegistry = Client.subscriber_registry_name(Test)
   end
 
+  @tag :client
   test "callee_registry_name" do
     assert Test.CalleeRegistry = Client.callee_registry_name(Test)
   end
 
+  @tag :client
   test "transport_name" do
     assert Test.Transport = Client.transport_name(Test)
   end
 
+  @tag :client
   test "Callee.add" do
     assert %{callee: %{}} = Callee.add(%{})
   end
 
+  @tag :client
   test "Callee.register" do
     assert [64, %{}, "test"] = Callee.register(%Register{procedure: "test"})
   end
 
+  @tag :client
   test "Callee.unregister" do
     assert [66, 123_456] = Callee.unregister(%Unregister{registration_id: 123_456})
   end
 
+  @tag :client
   test "Callee.yield" do
     assert [70, 1234, %{}, nil, nil] = Callee.yield(%Yield{request_id: 1234})
     assert [70, 1234, %{}, ["1"], nil] = Callee.yield(%Yield{request_id: 1234, arg_list: ["1"]})
@@ -63,10 +78,12 @@ defmodule WampexTest do
              })
   end
 
+  @tag :client
   test "Caller.add" do
     assert %{caller: %{}} = Caller.add(%{})
   end
 
+  @tag :client
   test "Caller.call" do
     assert [48, %{}, "test", nil, nil] = Caller.call(%Call{procedure: "test"})
     assert [48, %{}, "test", [1], nil] = Caller.call(%Call{procedure: "test", arg_list: [1]})
@@ -80,28 +97,34 @@ defmodule WampexTest do
              })
   end
 
+  @tag :client
   test "Peer.add" do
     assert %{} = Caller.add(%{})
   end
 
+  @tag :client
   test "Peer.hello" do
     assert [1, "test", %{agent: "WAMPex", roles: %{callee: %{}}}] =
              Peer.hello(%Hello{realm: "test", roles: [Callee]})
   end
 
+  @tag :client
   test "Peer.authenticate" do
     assert [5, "a-signa-ture", %{}] ==
              Peer.authenticate(%Authenticate{signature: "a-signa-ture", extra: %{}})
   end
 
+  @tag :client
   test "Peer.gooodbye" do
     assert [6, %{}, "test"] = Peer.goodbye(%Goodbye{reason: "test"})
   end
 
+  @tag :client
   test "Publisher.add" do
     assert %{publisher: %{}} = Publisher.add(%{})
   end
 
+  @tag :client
   test "Publisher.publish" do
     assert [16, %{}, "test", nil, nil] = Publisher.publish(%Publish{topic: "test"})
     assert [16, %{}, "test", [1], nil] = Publisher.publish(%Publish{topic: "test", arg_list: [1]})
@@ -115,19 +138,114 @@ defmodule WampexTest do
              })
   end
 
+  @tag :client
   test "Subscriber.add" do
     assert %{subscriber: %{}} = Subscriber.add(%{})
   end
 
+  @tag :client
   test "Subscriber.subscribe" do
     assert [32, %{test: 1}, "test"] =
              Subscriber.subscribe(%Subscribe{topic: "test", options: %{test: 1}})
   end
 
+  @tag :client
   test "Subscriber.unsubscribe" do
     assert [34, 1234] = Subscriber.unsubscribe(%Unsubscribe{subscription_id: 1234})
   end
 
+  @tag :router
+  test "Dealer.add" do
+    assert %{dealer: %{}} = Dealer.add(%{})
+  end
+
+  @tag :router
+  test "Dealer.registered" do
+    assert [65, 123_456, 765_432] =
+             Dealer.registered(%Registered{request_id: 123_456, registration_id: 765_432})
+  end
+
+  @tag :router
+  test "Dealer.unregistered" do
+    assert [67, 123_456] = Dealer.unregistered(%Unregistered{request_id: 123_456})
+  end
+
+  @tag :router
+  test "Dealer.result" do
+    assert [50, 1234, %{}, nil, nil] = Dealer.result(%Result{request_id: 1234})
+    assert [50, 1234, %{}, ["1"], nil] = Dealer.result(%Result{request_id: 1234, arg_list: ["1"]})
+
+    assert [50, 1234, %{test: 1}, [2], %{}] =
+             Dealer.result(%Result{
+               request_id: 1234,
+               arg_list: [2],
+               arg_kw: %{},
+               options: %{test: 1}
+             })
+  end
+
+  @tag :router
+  test "Dealer.invocation" do
+    assert [68, 1234, 1234, %{}, nil, nil] =
+             Dealer.invocation(%Invocation{request_id: 1234, registration_id: 1234})
+
+    assert [68, 1234, 1234, %{}, ["1"], nil] =
+             Dealer.invocation(%Invocation{
+               request_id: 1234,
+               registration_id: 1234,
+               arg_list: ["1"]
+             })
+
+    assert [68, 1234, 1234, %{test: 1}, [2], %{test: 2}] =
+             Dealer.invocation(%Invocation{
+               request_id: 1234,
+               registration_id: 1234,
+               arg_list: [2],
+               arg_kw: %{test: 2},
+               options: %{test: 1}
+             })
+  end
+
+  @tag :router
+  test "Broker.add" do
+    assert %{broker: %{}} = Broker.add(%{})
+  end
+
+  @tag :router
+  test "Broker.event" do
+    assert [36, 123_456, 765_432, %{}, nil, nil] =
+             Broker.event(%Event{subscription_id: 123_456, publication_id: 765_432})
+
+    assert [36, 123_456, 765_432, %{}, [2], nil] =
+             Broker.event(%Event{subscription_id: 123_456, publication_id: 765_432, arg_list: [2]})
+
+    assert [36, 123_456, 765_432, %{}, [2], %{test: 1}] =
+             Broker.event(%Event{
+               subscription_id: 123_456,
+               publication_id: 765_432,
+               arg_list: [2],
+               arg_kw: %{test: 1}
+             })
+  end
+
+  @tag :router
+  test "Broker.subscribed" do
+    assert [33, 123_456, 123_456] =
+             Broker.subscribed(%Subscribed{request_id: 123_456, subscription_id: 123_456})
+  end
+
+  @tag :router
+  test "Broker.unsubscribed" do
+    assert [35, 123_456] = Broker.unsubscribed(%Unsubscribed{request_id: 123_456})
+  end
+
+  @tag :router
+  test "Broker.published" do
+    assert [17, 123_456, 654_321] =
+             Broker.published(%Published{request_id: 123_456, publication_id: 654_321})
+  end
+
+  @tag :client
   test "MessagePack Serializer" do
     assert :binary = MessagePack.data_type()
     assert "\x93\x05\xC4\t123456789\x80" = MessagePack.serialize!([5, "123456789", %{}])
@@ -136,6 +254,7 @@ defmodule WampexTest do
     assert {:ok, [5, "123456789", %{}]} = MessagePack.deserialize("\x93\x05\xC4\t123456789\x80")
   end
 
+  @tag :client
   test "JSON Serializer" do
     assert :text = JSON.data_type()
     assert "[5,\"123456789\",{}]" = JSON.serialize!([5, "123456789", %{}])
@@ -144,6 +263,7 @@ defmodule WampexTest do
     assert {:ok, [5, "123456789", %{}]} = JSON.deserialize("[5,\"123456789\",{}]")
   end
 
+  @tag :client
   test "callee registration" do
     name = TestCalleeRegistration
     Client.start_link(name: name, session: @session)
@@ -151,6 +271,7 @@ defmodule WampexTest do
     assert_receive {:registered, id}
   end
 
+  @tag :client
   test "authentication" do
     callee_name = TestCalleeRespondAuthenticated
     session = %Session{@session | realm: %Realm{@session.realm | authentication: @auth}}
@@ -159,6 +280,7 @@ defmodule WampexTest do
     assert_receive {:registered, id}
   end
 
+  @tag :client
   test "abort" do
     Process.flag(:trap_exit, true)
     callee_name = TestAbort
@@ -167,6 +289,7 @@ defmodule WampexTest do
     assert_receive {:EXIT, ^pid, :shutdown}
   end
 
+  @tag :client
   test "callee is invoked and responds and caller gets result" do
     callee_name = TestCalleeRespond
     Client.start_link(name: callee_name, session: @session)
@@ -188,6 +311,7 @@ defmodule WampexTest do
     assert_receive {:invocation, _, _, _, _, _}
   end
 
+  @tag :client
   test "subscriber registration" do
     name = TestSubscriberRegister
     Client.start_link(name: name, session: @session)
@@ -195,6 +319,7 @@ defmodule WampexTest do
     assert_receive {:subscribed, id}
   end
 
+  @tag :client
   test "subscriber receives events from publisher" do
     name = TestSubscriberEvents
     Client.start_link(name: name, session: @session)