From: Christopher Date: Sun, 15 Mar 2020 02:14:47 +0000 (-0500) Subject: initial router session code and state machine in place X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=d38bd99e0eca578cb8497002a79c4c7c49456794;p=wampex.git initial router session code and state machine in place --- diff --git a/lib/client/session.ex b/lib/client/session.ex index 997d557..73402c3 100644 --- a/lib/client/session.ex +++ b/lib/client/session.ex @@ -13,7 +13,7 @@ defmodule Wampex.Client.Session do alias Wampex.Serializers.MessagePack alias __MODULE__, as: Sess alias Wampex.Client - alias Wampex.Client.Transport.WebSocket + alias Wampex.Client.Transports.WebSocket @yield 70 diff --git a/lib/client/transports/web_socket.ex b/lib/client/transports/web_socket.ex index ff1a166..b0c3d85 100644 --- a/lib/client/transports/web_socket.ex +++ b/lib/client/transports/web_socket.ex @@ -1,4 +1,4 @@ -defmodule Wampex.Client.Transport.WebSocket do +defmodule Wampex.Client.Transports.WebSocket do use WebSockex @behaviour Wampex.Transport diff --git a/lib/roles/broker.ex b/lib/roles/broker.ex new file mode 100644 index 0000000..2f5e955 --- /dev/null +++ b/lib/roles/broker.ex @@ -0,0 +1,120 @@ +defmodule Wampex.Roles.Broker do + @moduledoc """ + Handles requests and responses for a Broker + """ + alias Wampex.Role + @behaviour Role + + @publish 16 + @published 17 + @subscribe 32 + @subscribed 33 + @unsubscribe 34 + @unsubscribed 35 + @event 36 + + defmodule Event do + @moduledoc false + @enforce_keys [:subscription_id, :publication_id] + defstruct [:subscription_id, :publication_id, :arg_list, :arg_kw, options: %{}] + + @type t :: %__MODULE__{ + subscription_id: integer(), + publication_id: integer(), + arg_list: nonempty_list(any()) | nil, + arg_kw: map() | nil, + options: map() + } + end + + defmodule Subscribed do + @moduledoc false + @enforce_keys [:request_id, :subscription_id] + defstruct [:request_id, :subscription_id] + + @type t :: %__MODULE__{ + request_id: integer(), + subscription_id: integer() + } + end + + defmodule Unsubscribed do + @moduledoc false + @enforce_keys [:request_id] + defstruct [:request_id] + + @type t :: %__MODULE__{ + request_id: integer() + } + end + + defmodule Published do + @moduledoc false + @enforce_keys [:request_id, :publication_id] + defstruct [:request_id, :publication_id] + + @type t :: %__MODULE__{ + request_id: integer(), + publication_id: integer() + } + end + + @impl true + def add(roles) do + Map.put(roles, :broker, %{}) + end + + @spec event(Event.t()) :: Wampex.message() + def event(%Event{ + subscription_id: si, + publication_id: pi, + arg_list: al, + arg_kw: akw, + options: opts + }) do + [@event, si, pi, opts, al, akw] + end + + @spec subscribed(Subscribed.t()) :: Wampex.message() + def subscribed(%Subscribed{request_id: ri, subscription_id: si}) do + [@subscribed, ri, si] + end + + @spec unsubscribed(Unsubscribed.t()) :: Wampex.message() + def unsubscribed(%Unsubscribed{request_id: ri}) do + [@unsubscribed, ri] + end + + @spec published(Published.t()) :: Wampex.message() + def published(%Published{request_id: ri, publication_id: pi}) do + [@published, ri, pi] + end + + @impl true + def handle([@subscribe, request_id, opts, topic]) do + {[{:next_event, :internal, :subscribe}], request_id, + {:update, :subscribe, {:subscribe, request_id, opts, topic}}} + end + + @impl true + def handle([@unsubscribe, request_id, id]) do + {[{:next_event, :internal, :unsubscribe}], request_id, + {:update, :unsubscribe, {:unsubscribe, request_id, id}}} + end + + @impl true + def handle([@publish, id, opts, topic]) do + handle([@publish, id, opts, topic, [], %{}]) + end + + @impl true + def handle([@publish, id, opts, topic, arg_l]) do + handle([@publish, id, opts, topic, arg_l, %{}]) + end + + @impl true + def handle([@publish, id, opts, topic, arg_l, arg_kw]) do + {[{:next_event, :internal, :publish}], id, + {:update, :publish, {:publish, id, opts, topic, arg_l, arg_kw}}} + end +end diff --git a/lib/roles/dealer.ex b/lib/roles/dealer.ex new file mode 100644 index 0000000..a4c8624 --- /dev/null +++ b/lib/roles/dealer.ex @@ -0,0 +1,142 @@ +defmodule Wampex.Roles.Dealer do + @moduledoc """ + Handles requests and responses for a Dealer + """ + alias Wampex.Role + @behaviour Role + + @call 48 + @result 50 + @register 64 + @registered 65 + @unregister 66 + @unregistered 67 + @invocation 68 + @yield 70 + + defmodule Registered do + @moduledoc false + @enforce_keys [:request_id, :registration_id] + defstruct [:request_id, :registration_id] + + @type t :: %__MODULE__{ + request_id: integer(), + registration_id: integer() + } + end + + defmodule Unregistered do + @moduledoc false + @enforce_keys [:request_id] + defstruct [:request_id] + + @type t :: %__MODULE__{ + request_id: integer() + } + end + + defmodule Result do + @moduledoc false + @enforce_keys [:request_id] + defstruct [:request_id, :arg_list, :arg_kw, options: %{}] + + @type t :: %__MODULE__{ + request_id: integer(), + arg_list: nonempty_list(any()) | nil, + arg_kw: map() | nil, + options: map() + } + end + + defmodule Invocation do + @moduledoc false + @enforce_keys [:request_id, :registration_id] + defstruct [:request_id, :registration_id, :arg_list, :arg_kw, options: %{}] + + @type t :: %__MODULE__{ + request_id: integer(), + registration_id: integer(), + arg_list: nonempty_list(any()) | nil, + arg_kw: map() | nil, + options: map() + } + end + + @impl true + def add(roles) do + Map.put(roles, :dealer, %{}) + end + + @spec registered(Registered.t()) :: Wampex.message() + def registered(%Registered{request_id: ri, registration_id: reg_id}) do + [@registered, ri, reg_id] + end + + @spec unregistered(Unregistered.t()) :: Wampex.message() + def unregistered(%Unregistered{request_id: ri}) do + [@unregistered, ri] + end + + @spec invocation(Invocation.t()) :: Wampex.message() + def invocation(%Invocation{ + request_id: ri, + registration_id: reg_id, + options: opts, + arg_list: al, + arg_kw: akw + }) do + [@invocation, ri, reg_id, opts, al, akw] + end + + @spec result(Result.t()) :: Wampex.message() + def result(%Result{ + request_id: ri, + arg_list: al, + arg_kw: akw, + options: opts + }) do + [@result, ri, opts, al, akw] + end + + @impl true + def handle([@unregister, request_id, registration_id]) do + {[{:next_event, :internal, :unregister}], request_id, + {:update, :unregister, {:unregister, request_id, registration_id}}} + end + + @impl true + def handle([@register, request_id, opts, procedure]) do + {[{:next_event, :internal, :register}], request_id, + {:update, :register, {:register, request_id, opts, procedure}}} + end + + @impl true + def handle([@call, id, dets]) do + handle([@call, id, dets, [], %{}]) + end + + @impl true + def handle([@call, id, dets, arg_l]) do + handle([@call, id, dets, arg_l, %{}]) + end + + @impl true + def handle([@call, id, dets, arg_l, arg_kw]) do + {[{:next_event, :internal, :call}], id, {:update, :call, {:call, id, dets, arg_l, arg_kw}}} + end + + @impl true + def handle([@yield, id, dets]) do + handle([@yield, id, dets, [], %{}]) + end + + @impl true + def handle([@yield, id, dets, arg_l]) do + handle([@yield, id, dets, arg_l, %{}]) + end + + @impl true + def handle([@yield, id, dets, arg_l, arg_kw]) do + {[{:next_event, :internal, :yield}], id, {:update, :yield, {:yield, id, dets, arg_l, arg_kw}}} + end +end diff --git a/lib/roles/peer.ex b/lib/roles/peer.ex index b1fffba..714bb77 100644 --- a/lib/roles/peer.ex +++ b/lib/roles/peer.ex @@ -26,6 +26,28 @@ defmodule Wampex.Roles.Peer do } end + defmodule Welcome do + @moduledoc false + @enforce_keys [:session_id] + defstruct [:session_id, options: %{}] + + @type t :: %__MODULE__{ + session_id: integer(), + options: %{} + } + end + + defmodule Challenge do + @moduledoc false + @enforce_keys [:auth_method] + defstruct [:auth_method, options: %{}] + + @type t :: %__MODULE__{ + auth_method: String.t(), + options: %{} + } + end + defmodule Goodbye do @moduledoc false @enforce_keys [:reason] @@ -59,6 +81,16 @@ defmodule Wampex.Roles.Peer do [@hello, r, options] end + @spec welcome(Welcome.t()) :: Wampex.message() + def welcome(%Welcome{session_id: si, options: opts}) do + [@welcome, si, opts] + end + + @spec challenge(Challenge.t()) :: Wampex.message() + def challenge(%Challenge{auth_method: am, options: opts}) do + [@challenge, am, opts] + end + @spec goodbye(Goodbye.t()) :: Wampex.message() def goodbye(%Goodbye{reason: r, options: opts}) do [@goodbye, opts, r] @@ -69,6 +101,17 @@ defmodule Wampex.Roles.Peer do [@authenticate, s, e] end + @impl true + def handle([@hello, realm, dets]) do + {[{:next_event, :internal, :hello}], nil, {:update, :hello, {:hello, realm, dets}}} + end + + @impl true + def handle([@authenticate, sig, dets]) do + {[{:next_event, :internal, :authenticate}], nil, + {:update, :authenticate, {:authenticate, sig, dets}}} + end + @impl true def handle([@welcome, session_id, _dets]) do {[{:next_event, :internal, :established}], nil, {:update, :id, session_id}} diff --git a/lib/router.ex b/lib/router.ex index 0410ab5..2bd3f6a 100644 --- a/lib/router.ex +++ b/lib/router.ex @@ -1,2 +1,33 @@ defmodule Wampex.Router do + use Supervisor + + alias Wampex.Router.REST + alias Wampex.Router.Transports.WebSocket + + @spec start_link(name: module(), port: pos_integer()) :: + {:ok, pid()} + | {:error, {:already_started, pid()} | {:shutdown, term()} | term()} + def start_link(name: name, port: port) when is_atom(name) do + Supervisor.start_link(__MODULE__, port, name: name) + end + + @spec init(pos_integer()) :: + {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore + def init(port) do + children = [ + {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch()]} + ] + + Supervisor.init(children, strategy: :one_for_one, max_restarts: 10, max_seconds: 10) + end + + defp dispatch do + [ + {:_, + [ + {"/ws", WebSocket, []}, + {:_, Plug.Cowboy.Handler, {REST, []}} + ]} + ] + end end diff --git a/lib/router/rest.ex b/lib/router/rest.ex new file mode 100644 index 0000000..a15ad87 --- /dev/null +++ b/lib/router/rest.ex @@ -0,0 +1,25 @@ +defmodule Wampex.Router.REST do + @moduledoc false + use Plug.Router + import Plug.Conn + + plug(CORSPlug, origin: ["*"]) + plug(:match) + plug(:dispatch) + + get "/favicon.ico" do + send_resp(conn, 200, "ok") + end + + get "/alivez" do + send_resp(conn, 200, "ok") + end + + get "/readyz" do + send_resp(conn, 200, "ok") + end + + match _ do + send_resp(conn, 404, "oops") + end +end diff --git a/lib/router/session.ex b/lib/router/session.ex new file mode 100644 index 0000000..895eab5 --- /dev/null +++ b/lib/router/session.ex @@ -0,0 +1,296 @@ +defmodule Wampex.Router.Session do + @moduledoc """ + A state machine based process for managing a WAMP Session. Utilizes StatesLanguage to implement the gen_statem process. See priv/session.json for the JSON representation + """ + use StatesLanguage, data: "priv/router.json" + + @max_id 9_007_199_254_740_992 + + alias StatesLanguage, as: SL + alias Wampex.Realm + alias Wampex.Roles.{Broker, Dealer, Peer} + alias Wampex.Roles.Peer.{Challenge, Welcome} + alias Wampex.Router + alias __MODULE__, as: Sess + + @enforce_keys [:transport, :transport_pid] + defstruct [ + :id, + :transport, + :transport_pid, + :message, + :hello, + :authenticate, + :register, + :unregister, + :call, + :yield, + :subscribe, + :unsubscribe, + :publish, + :error, + :realm, + :name, + :goodbye, + roles: [Peer, Broker, Dealer], + message_queue: [], + request_id: 0, + requests: [] + ] + + @type t :: %__MODULE__{ + id: integer() | nil, + transport_pid: pid() | module() | nil, + message: Wampex.message() | nil, + hello: Wampex.hello() | nil, + authenticate: Wampex.authenticate() | nil, + register: Wampex.register() | nil, + unregister: Wampex.unregister() | nil, + call: Wampex.call() | nil, + yield: Wampex.yield() | nil, + subscribe: Wampex.subscribe() | nil, + unsubscribe: Wampex.unsubscribe() | nil, + publish: Wampex.publish() | nil, + goodbye: binary() | nil, + realm: Realm.t(), + name: module() | nil, + roles: [module()], + message_queue: [], + request_id: integer(), + requests: [] + } + + ## States + @init "Init" + @established "Established" + @hello "Hello" + @authenticate "Authenticate" + @call "Call" + @yield "Yield" + @register "Register" + @unregister "Unregister" + @subscribe "Subscribe" + @unsubscribe "Unsubscribe" + @publish "Publish" + @error "Error" + @abort "Abort" + @goodbye "Goodbye" + @event "Cancel" + + ## Resources + @handle_init "HandleInit" + @handle_established "HandleEstablished" + @handle_message "HandleMessage" + @handle_hello "HandleHello" + @handle_authenticate "HandleAuthenticate" + @handle_call "HandleCall" + @handle_yield "HandleYield" + @handle_register "HandleRegister" + @handle_unregister "HandleUnregister" + @handle_subscribe "HandleSubscribe" + @handle_unsubscribe "HandleUnsubscribe" + @handle_publish "HandlePublish" + @handle_interrupt "HandleError" + @handle_abort "HandleAbort" + @handle_goodbye "HandleGoodbye" + @handle_cancel "HandleCancel" + + @impl true + def handle_resource( + @handle_init, + _, + @init, + data + ) do + debug("Init") + + {:ok, %SL{data | data: %Sess{data.data | id: Enum.random(0..@max_id)}}, + [{:next_event, :internal, :transition}]} + end + + @impl true + def handle_resource( + @handle_established, + _, + @established, + data + ) do + debug("Established") + {:ok, data, []} + end + + @impl true + def handle_resource( + @handle_message, + _, + _, + %SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data + ) do + debug("Handling Message #{inspect(msg)}") + {actions, id, response} = handle_message(msg, roles) + {%SL{data: sess} = data, response} = maybe_update_response(data, response) + {requests, actions} = resp = handle_response(id, actions, requests, response) + debug("Response: #{inspect(resp)}") + {:ok, %SL{data | data: %Sess{sess | requests: requests}}, actions} + end + + @impl true + def handle_resource( + @handle_hello, + _, + @hello, + %SL{ + data: %Sess{id: id, transport: tt, transport_pid: t} + } = sl + ) do + # TODO check for authentication request + tt.send_request(t, Peer.welcome(%Welcome{session_id: id})) + {:ok, sl, [{:next_event, :internal, :transition}]} + end + + @impl true + def handle_resource( + @handle_register, + _, + @register, + %SL{ + data: %Sess{transport: tt, transport_pid: t, register: r} + } = sl + ) do + # TODO check for authentication request + debug("Handling Registration #{inspect(r)}") + {:ok, sl, [{:next_event, :internal, :transition}]} + end + + @impl true + def handle_resource( + @handle_subscribe, + _, + @subscribe, + %SL{ + data: %Sess{transport: tt, transport_pid: t, subscribe: s} + } = sl + ) do + # TODO check for authentication request + debug("Handling Subscription #{inspect(s)}") + {:ok, sl, [{:next_event, :internal, :transition}]} + end + + @impl true + def handle_resource(@handle_abort, _, @abort, data) do + Logger.warn("Aborting: #{data.data.error}") + {:ok, data, []} + end + + @impl true + def handle_resource( + @handle_goodbye, + _, + @goodbye, + %SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data + ) do + tt.send_message(t, Peer.goodbye(goodbye)) + {:ok, data, []} + end + + @impl true + def handle_resource(resource, _, state, data) do + Logger.error("No specific resource handler for #{resource} in state #{state}") + {:ok, data, []} + end + + @impl true + def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do + {:ok, %SL{data | data: %Sess{sess | message: message}}, + [{:next_event, :internal, :message_received}]} + end + + defp handle_response(nil, actions, requests, _), do: {requests, actions} + + defp handle_response(id, actions, requests, response) do + Enum.reduce_while(requests, {requests, actions}, fn + {^id, nil}, _ -> + {:halt, {remove_request(id, requests), actions}} + + {^id, from}, _ -> + {:halt, {remove_request(id, requests), [get_response_action(from, response) | actions]}} + + {_, _}, acc -> + {:cont, acc} + end) + end + + defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do + send(from, {:progress, arg_l, arg_kw}) + [] + end + + defp get_response_action(from, response), do: {:reply, from, response} + + defp maybe_update_response(data, {:update, key, resp}) do + {%SL{data | data: Map.put(data.data, key, resp)}, resp} + end + + defp maybe_update_response(data, resp), do: {data, resp} + + defp do_send(r_id, tt, t, message) do + {request_id, message} = maybe_inject_request_id(r_id, message) + tt.send_request(t, remove_nil_values(message)) + request_id + end + + defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1) + + defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {r_id, message} + + defp maybe_inject_request_id(r_id, message) do + request_id = get_request_id(r_id) + {request_id, List.insert_at(message, 1, request_id)} + end + + defp get_request_id(current_id) when current_id == @max_id do + 1 + end + + defp get_request_id(current_id) do + current_id + 1 + end + + defp remove_request(id, requests) do + Enum.filter(requests, fn + {^id, _} -> false + {_id, _} -> true + end) + end + + defp send_message_queue(r_id, [], _, _, reqs), do: {r_id, reqs} + + defp send_message_queue(r_id, mq, tt, t, reqs) do + mq + |> Enum.reverse() + |> Enum.reduce({r_id, reqs}, fn {request, from}, {id, requests} -> + r = do_send(id, tt, t, request) + {r, [{r, from} | requests]} + end) + end + + def remove_cast_requests([]), do: [] + + def remove_cast_requests(requests) do + Enum.filter(requests, fn + {_, nil} -> false + {_, _} -> true + end) + end + + defp handle_message(msg, roles) do + Enum.reduce_while(roles, nil, fn r, _ -> + try do + res = r.handle(msg) + {:halt, res} + rescue + FunctionClauseError -> {:cont, nil} + end + end) + end +end diff --git a/lib/router/transports/web_socket.ex b/lib/router/transports/web_socket.ex new file mode 100644 index 0000000..54ae007 --- /dev/null +++ b/lib/router/transports/web_socket.ex @@ -0,0 +1,76 @@ +defmodule Wampex.Router.Transports.WebSocket do + @moduledoc false + @behaviour :cowboy_websocket + + require Logger + + alias __MODULE__ + alias Wampex.Router.Session + alias Wampex.Serializers.{JSON, MessagePack} + + @protocol_header "sec-websocket-protocol" + @json 'wamp.2.json' + @msgpack "wamp.2.msgpack" + + defstruct [:serializer, :session] + + def send_request(transport, message) do + send(transport, {:send_request, message}) + end + + @impl true + def init(req, _state) do + {:ok, serializer, protocol} = get_serializer(req) + req = :cowboy_req.set_resp_header(@protocol_header, protocol, req) + {:cowboy_websocket, req, %WebSocket{serializer: serializer}} + end + + @impl true + def websocket_init(state) do + {:ok, session} = start_session(state) + Logger.info("Websocket Initialized: #{inspect(state)}") + {:ok, %WebSocket{state | session: session}} + end + + @impl true + def websocket_handle({type, payload}, state) do + Logger.debug("Received #{type} Payload: #{payload}") + handle_payload({payload, state}) + {:ok, state, :hibernate} + end + + @impl true + def websocket_info({:send_request, message}, %WebSocket{serializer: s} = state) do + {[{s.data_type(), s.serialize!(message)}], state, :hibernate} + end + + @impl true + def websocket_info(_event, state) do + {:ok, state} + end + + defp handle_payload({payload, %WebSocket{serializer: s} = state}) do + payload + |> s.deserialize!() + |> handle_event(state) + end + + defp handle_event(ms, %WebSocket{session: s} = state) do + send(s, {:set_message, ms}) + {:ok, state} + end + + defp start_session(_state) do + Session.start_link(%Session{transport: WebSocket, transport_pid: self()}) + end + + defp get_serializer(req) do + @protocol_header + |> :cowboy_req.parse_header(req) + |> parse_protocol() + end + + defp parse_protocol([@json]), do: {:ok, JSON, @json} + defp parse_protocol([@msgpack]), do: {:ok, MessagePack, @msgpack} + defp parse_protocol(test), do: Logger.error("Unknown protocol: #{inspect(test)}") +end diff --git a/lib/wampex.ex b/lib/wampex.ex index eb2bdb4..30a6f1f 100644 --- a/lib/wampex.ex +++ b/lib/wampex.ex @@ -6,6 +6,34 @@ defmodule Wampex do @type message :: nonempty_list(message_part()) @type arg_list :: [] | nonempty_list(any()) @type arg_keyword :: map() + + @type hello :: {:hello, realm :: String.t(), details :: map()} + + @type authenticate :: {:authenticate, signature :: binary(), details :: map()} + + @type call :: + {:call, request_id :: integer(), details :: map(), arg_list :: arg_list(), + arg_keywords :: arg_keyword()} + + @type yield :: + {:yield, request_id :: integer(), details :: map(), arg_list :: arg_list(), + arg_keywords :: arg_keyword()} + + @type publish :: + {:publish, request_id :: integer(), details :: map(), topic :: String.t(), + arg_list :: arg_list(), arg_keywords :: arg_keyword()} + + @type register :: + {:register, request_id :: integer(), details :: map(), procedure :: String.t()} + + @type unregister :: + {:unregister, request_id :: integer(), id :: integer()} + + @type unsubscribe :: + {:unsubscribe, request_id :: integer(), id :: integer()} + + @type subscribe :: + {:subscribe, request_id :: integer(), topic :: String.t(), details :: map()} @type invocation :: {:invocation, request_id :: integer(), registration_id :: integer(), details :: map(), arg_list :: arg_list(), arg_keywords :: arg_keyword()} @@ -18,7 +46,16 @@ defmodule Wampex do arg_keyword :: arg_keyword()} @type handle_response :: {:ok, integer()} + | publish() + | hello() + | authenticate() + | unsubscribe() + | subscribe() | invocation() + | register() + | unregister() + | call() + | yield() | {:ok, details :: map(), arg_list :: arg_list(), arg_keyword :: arg_keyword()} | error() | event() diff --git a/mix.exs b/mix.exs index f28dd69..4de3490 100644 --- a/mix.exs +++ b/mix.exs @@ -37,13 +37,15 @@ defmodule Wampex.MixProject do defp deps do [ - {:excoveralls, "~> 0.12.2", only: [:dev, :test], runtime: false}, + {:cors_plug, "~> 2.0"}, {:credo, "~> 1.2", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false}, {:ex_doc, "~> 0.21", only: :dev, runtime: false}, + {:excoveralls, "~> 0.12.2", only: [:dev, :test], runtime: false}, {:jason, "~> 1.1"}, {:msgpack, "~> 0.7.0"}, {:pbkdf2, "~> 2.0"}, + {:plug_cowboy, "~> 2.1"}, {:states_language, "~> 0.2"}, {:websockex, "~> 0.4"} ] diff --git a/mix.lock b/mix.lock index 5fb72e4..bc5f8f6 100644 --- a/mix.lock +++ b/mix.lock @@ -2,7 +2,10 @@ "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, "certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"}, "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"}, + "cors_plug": {:hex, :cors_plug, "2.0.2", "2b46083af45e4bc79632bd951550509395935d3e7973275b2b743bd63cc942ce", [:mix], [{:plug, "~> 1.8", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "f0d0e13f71c51fd4ef8b2c7e051388e4dfb267522a83a22392c856de7e46465f"}, "coveralls": {:hex, :coveralls, "2.1.0", "b44cea21202c0e1994dfced8cedbff66bcd03eeeb565484345b28afacda0559b", [:rebar3], [{:jsx, "2.10.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "bbfbe2a7bebb2d22db15a657e6c1733243e1501714d60fe14a14790d96e7c3b2"}, + "cowboy": {:hex, :cowboy, "2.7.0", "91ed100138a764355f43316b1d23d7ff6bdb0de4ea618cb5d8677c93a7a2f115", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "04fd8c6a39edc6aaa9c26123009200fc61f92a3a94f3178c527b70b767c6e605"}, + "cowlib": {:hex, :cowlib, "2.8.0", "fd0ff1787db84ac415b8211573e9a30a3ebe71b5cbff7f720089972b2319c8a4", [:rebar3], [], "hexpm", "79f954a7021b302186a950a32869dbc185523d99d3e44ce430cd1f3289f41ed4"}, "credo": {:hex, :credo, "1.2.2", "f57faf60e0a12b0ba9fd4bad07966057fde162b33496c509b95b027993494aab", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8f2623cd8c895a6f4a55ef10f3fdf6a55a9ca7bef09676bd835551687bf8a740"}, "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"}, "earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm", "8cf8a291ebf1c7b9539e3cddb19e9cef066c2441b1640f13c34c1d3cfc825fec"}, @@ -17,11 +20,16 @@ "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a10c6eb62cca416019663129699769f0c2ccf39428b3bb3c0cb38c718a0c186d"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, + "mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm", "6cbe761d6a0ca5a31a0931bf4c63204bceb64538e664a8ecf784a9a6f3b875f1"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "msgpack": {:hex, :msgpack, "0.7.0", "128ae0a2227c7e7a2847c0f0f73551c268464f8c1ee96bffb920bc0a5712b295", [:rebar3], [], "hexpm", "4649353da003e6f438d105e4b1e0f17757f6f5ec8687a6f30875ff3ac4ce2a51"}, "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"}, "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"}, "pbkdf2": {:hex, :pbkdf2, "2.0.0", "11c23279fded5c0027ab3996cfae77805521d7ef4babde2bd7ec04a9086cf499", [:rebar3], [], "hexpm", "1e793ce6fdb0576613115714deae9dfc1d1537eaba74f07efb36de139774488d"}, + "plug": {:hex, :plug, "1.9.0", "8d7c4e26962283ff9f8f3347bd73838e2413fbc38b7bb5467d5924f68f3a5a4a", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "9902eda2c52ada2a096434682e99a2493f5d06a94d6ac6bcfff9805f952350f1"}, + "plug_cowboy": {:hex, :plug_cowboy, "2.1.2", "8b0addb5908c5238fac38e442e81b6fcd32788eaa03246b4d55d147c47c5805e", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "7d722581ce865a237e14da6d946f92704101740a256bd13ec91e63c0b122fc70"}, + "plug_crypto": {:hex, :plug_crypto, "1.1.2", "bdd187572cc26dbd95b87136290425f2b580a116d3fb1f564216918c9730d227", [:mix], [], "hexpm", "6b8b608f895b6ffcfad49c37c7883e8df98ae19c6a28113b02aa1e9c5b22d6b5"}, + "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm", "13104d7897e38ed7f044c4de953a6c28597d1c952075eb2e328bc6d6f2bfc496"}, "states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"}, "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"}, diff --git a/priv/router.json b/priv/router.json new file mode 100644 index 0000000..a0b99a3 --- /dev/null +++ b/priv/router.json @@ -0,0 +1,147 @@ +{ + "Comment": "Router Session State Machine", + "StartAt": "Init", + "States": { + "Init": { + "Type": "Task", + "Resource": "HandleInit", + "Next": "Established" + }, + "Established": { + "Type": "Choice", + "Resource": "HandleEstablished", + "Choices": [ + { + "StringEquals": ":message_received", + "Next": "Message" + }, + { + "StringEquals": ":goodbye", + "Next": "GoodBye" + }, + { + "StringEquals": ":abort", + "Next": "Abort" + } + ] + }, + "Message": { + "Type": "Choice", + "Resource": "HandleMessage", + "Choices": [ + { + "StringEquals": ":hello", + "Next": "Hello" + }, + { + "StringEquals": ":authenticate", + "Next": "Authenticate" + }, + { + "StringEquals": ":call", + "Next": "Call" + }, + { + "StringEquals": ":yield", + "Next": "Yield" + }, + { + "StringEquals": ":register", + "Next": "Register" + }, + { + "StringEquals": ":unregister", + "Next": "Unregister" + }, + { + "StringEquals": ":subscribe", + "Next": "Subscribe" + }, + { + "StringEquals": ":unsubscribe", + "Next": "Unsubscribe" + }, + { + "StringEquals": ":publish", + "Next": "Publish" + }, + { + "StringEquals": ":error", + "Next": "Error" + }, + { + "StringEquals": ":abort", + "Next": "Abort" + }, + { + "StringEquals": ":goodbye", + "Next": "GoodBye" + }, + { + "StringEquals": ":cancel", + "Next": "Cancel" + } + ] + }, + "Hello": { + "Type": "Task", + "Resource": "HandleHello", + "Next": "Established" + }, + "Authenticate": { + "Type": "Task", + "Resource": "HandleAuthenticate", + "Next": "Established" + }, + "Call": { + "Type": "Task", + "Resource": "HandleCall", + "Next": "Established" + }, + "Yield": { + "Type": "Task", + "Resource": "HandleYield", + "Next": "Established" + }, + "Register":{ + "Type": "Task", + "Resource": "HandleRegister", + "Next": "Established" + }, + "Unregister":{ + "Type": "Task", + "Resource": "HandleUnregister", + "Next": "Established" + }, + "Subscribe":{ + "Type": "Task", + "Resource": "HandleSubscribe", + "Next": "Established" + }, + "Unsubscribe":{ + "Type": "Task", + "Resource": "HandleUnsubscribe", + "Next": "Established" + }, + "Publish":{ + "Type": "Task", + "Resource": "HandlePublish", + "Next": "Established" + }, + "Error":{ + "Type": "Task", + "Resource": "HandleError", + "Next": "Established" + }, + "GoodBye": { + "Type": "Task", + "Resource": "HandleGoodbye", + "End": true + }, + "Abort": { + "Type": "Task", + "Resource": "HandleAbort", + "End": true + } + } +} diff --git a/test/wampex_test.exs b/test/wampex_test.exs index 903f1b5..b9b8ef4 100644 --- a/test/wampex_test.exs +++ b/test/wampex_test.exs @@ -4,52 +4,67 @@ defmodule WampexTest do alias Wampex.{Authentication, Realm} alias Wampex.Client - alias Wampex.Roles.{Callee, Caller, Peer, Publisher, Subscriber} + alias Wampex.Roles.{Broker, Callee, Caller, Dealer, Peer, Publisher, Subscriber} + alias Wampex.Router alias Wampex.Serializers.{JSON, MessagePack} + alias Broker.{Event, Published, Subscribed, Unsubscribed} alias Callee.{Register, Unregister, Yield} alias Caller.Call alias Client.Session + alias Dealer.{Invocation, Registered, Result, Unregistered} alias Peer.{Authenticate, Goodbye, Hello} alias Publisher.Publish alias Subscriber.{Subscribe, Unsubscribe} require Logger - @url "ws://localhost:18080/ws" + @url "ws://localhost:4000/ws" @auth %Authentication{authid: "entone", authmethods: ["wampcra"], secret: "test1234"} @realm %Realm{name: "com.myrealm"} @roles [Callee, Caller, Publisher, Subscriber] @device "as987d9a8sd79a87ds" + setup_all do + [server: Router.start_link(name: TestRouter, port: 4000)] + end + @session %Session{url: @url, realm: @realm, roles: @roles} + @tag :client test "session_name" do assert Test.Session = Client.session_name(Test) end + @tag :client test "subscriber_registry_name" do assert Test.SubscriberRegistry = Client.subscriber_registry_name(Test) end + @tag :client test "callee_registry_name" do assert Test.CalleeRegistry = Client.callee_registry_name(Test) end + @tag :client test "transport_name" do assert Test.Transport = Client.transport_name(Test) end + @tag :client test "Callee.add" do assert %{callee: %{}} = Callee.add(%{}) end + @tag :client test "Callee.register" do assert [64, %{}, "test"] = Callee.register(%Register{procedure: "test"}) end + @tag :client test "Callee.unregister" do assert [66, 123_456] = Callee.unregister(%Unregister{registration_id: 123_456}) end + @tag :client test "Callee.yield" do assert [70, 1234, %{}, nil, nil] = Callee.yield(%Yield{request_id: 1234}) assert [70, 1234, %{}, ["1"], nil] = Callee.yield(%Yield{request_id: 1234, arg_list: ["1"]}) @@ -63,10 +78,12 @@ defmodule WampexTest do }) end + @tag :client test "Caller.add" do assert %{caller: %{}} = Caller.add(%{}) end + @tag :client test "Caller.call" do assert [48, %{}, "test", nil, nil] = Caller.call(%Call{procedure: "test"}) assert [48, %{}, "test", [1], nil] = Caller.call(%Call{procedure: "test", arg_list: [1]}) @@ -80,28 +97,34 @@ defmodule WampexTest do }) end + @tag :client test "Peer.add" do assert %{} = Caller.add(%{}) end + @tag :client test "Peer.hello" do assert [1, "test", %{agent: "WAMPex", roles: %{callee: %{}}}] = Peer.hello(%Hello{realm: "test", roles: [Callee]}) end + @tag :client test "Peer.authenticate" do assert [5, "a-signa-ture", %{}] == Peer.authenticate(%Authenticate{signature: "a-signa-ture", extra: %{}}) end + @tag :client test "Peer.gooodbye" do assert [6, %{}, "test"] = Peer.goodbye(%Goodbye{reason: "test"}) end + @tag :client test "Publisher.add" do assert %{publisher: %{}} = Publisher.add(%{}) end + @tag :client test "Publisher.publish" do assert [16, %{}, "test", nil, nil] = Publisher.publish(%Publish{topic: "test"}) assert [16, %{}, "test", [1], nil] = Publisher.publish(%Publish{topic: "test", arg_list: [1]}) @@ -115,19 +138,114 @@ defmodule WampexTest do }) end + @tag :client test "Subscriber.add" do assert %{subscriber: %{}} = Subscriber.add(%{}) end + @tag :client test "Subscriber.subscribe" do assert [32, %{test: 1}, "test"] = Subscriber.subscribe(%Subscribe{topic: "test", options: %{test: 1}}) end + @tag :client test "Subscriber.unsubscribe" do assert [34, 1234] = Subscriber.unsubscribe(%Unsubscribe{subscription_id: 1234}) end + @tag :router + test "Dealer.add" do + assert %{dealer: %{}} = Dealer.add(%{}) + end + + @tag :router + test "Dealer.registered" do + assert [65, 123_456, 765_432] = + Dealer.registered(%Registered{request_id: 123_456, registration_id: 765_432}) + end + + @tag :router + test "Dealer.unregistered" do + assert [67, 123_456] = Dealer.unregistered(%Unregistered{request_id: 123_456}) + end + + @tag :router + test "Dealer.result" do + assert [50, 1234, %{}, nil, nil] = Dealer.result(%Result{request_id: 1234}) + assert [50, 1234, %{}, ["1"], nil] = Dealer.result(%Result{request_id: 1234, arg_list: ["1"]}) + + assert [50, 1234, %{test: 1}, [2], %{}] = + Dealer.result(%Result{ + request_id: 1234, + arg_list: [2], + arg_kw: %{}, + options: %{test: 1} + }) + end + + @tag :router + test "Dealer.invocation" do + assert [68, 1234, 1234, %{}, nil, nil] = + Dealer.invocation(%Invocation{request_id: 1234, registration_id: 1234}) + + assert [68, 1234, 1234, %{}, ["1"], nil] = + Dealer.invocation(%Invocation{ + request_id: 1234, + registration_id: 1234, + arg_list: ["1"] + }) + + assert [68, 1234, 1234, %{test: 1}, [2], %{test: 2}] = + Dealer.invocation(%Invocation{ + request_id: 1234, + registration_id: 1234, + arg_list: [2], + arg_kw: %{test: 2}, + options: %{test: 1} + }) + end + + @tag :router + test "Broker.add" do + assert %{broker: %{}} = Broker.add(%{}) + end + + @tag :router + test "Broker.event" do + assert [36, 123_456, 765_432, %{}, nil, nil] = + Broker.event(%Event{subscription_id: 123_456, publication_id: 765_432}) + + assert [36, 123_456, 765_432, %{}, [2], nil] = + Broker.event(%Event{subscription_id: 123_456, publication_id: 765_432, arg_list: [2]}) + + assert [36, 123_456, 765_432, %{}, [2], %{test: 1}] = + Broker.event(%Event{ + subscription_id: 123_456, + publication_id: 765_432, + arg_list: [2], + arg_kw: %{test: 1} + }) + end + + @tag :router + test "Broker.subscribed" do + assert [33, 123_456, 123_456] = + Broker.subscribed(%Subscribed{request_id: 123_456, subscription_id: 123_456}) + end + + @tag :router + test "Broker.unsubscribed" do + assert [35, 123_456] = Broker.unsubscribed(%Unsubscribed{request_id: 123_456}) + end + + @tag :router + test "Broker.published" do + assert [17, 123_456, 654_321] = + Broker.published(%Published{request_id: 123_456, publication_id: 654_321}) + end + + @tag :client test "MessagePack Serializer" do assert :binary = MessagePack.data_type() assert "\x93\x05\xC4\t123456789\x80" = MessagePack.serialize!([5, "123456789", %{}]) @@ -136,6 +254,7 @@ defmodule WampexTest do assert {:ok, [5, "123456789", %{}]} = MessagePack.deserialize("\x93\x05\xC4\t123456789\x80") end + @tag :client test "JSON Serializer" do assert :text = JSON.data_type() assert "[5,\"123456789\",{}]" = JSON.serialize!([5, "123456789", %{}]) @@ -144,6 +263,7 @@ defmodule WampexTest do assert {:ok, [5, "123456789", %{}]} = JSON.deserialize("[5,\"123456789\",{}]") end + @tag :client test "callee registration" do name = TestCalleeRegistration Client.start_link(name: name, session: @session) @@ -151,6 +271,7 @@ defmodule WampexTest do assert_receive {:registered, id} end + @tag :client test "authentication" do callee_name = TestCalleeRespondAuthenticated session = %Session{@session | realm: %Realm{@session.realm | authentication: @auth}} @@ -159,6 +280,7 @@ defmodule WampexTest do assert_receive {:registered, id} end + @tag :client test "abort" do Process.flag(:trap_exit, true) callee_name = TestAbort @@ -167,6 +289,7 @@ defmodule WampexTest do assert_receive {:EXIT, ^pid, :shutdown} end + @tag :client test "callee is invoked and responds and caller gets result" do callee_name = TestCalleeRespond Client.start_link(name: callee_name, session: @session) @@ -188,6 +311,7 @@ defmodule WampexTest do assert_receive {:invocation, _, _, _, _, _} end + @tag :client test "subscriber registration" do name = TestSubscriberRegister Client.start_link(name: name, session: @session) @@ -195,6 +319,7 @@ defmodule WampexTest do assert_receive {:subscribed, id} end + @tag :client test "subscriber receives events from publisher" do name = TestSubscriberEvents Client.start_link(name: name, session: @session)