From: Christopher Date: Fri, 20 Mar 2020 21:43:00 +0000 (-0500) Subject: remove bondy_config X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=10435decdc787e17bfd73a9911f386a4a2f30a66;p=wampex_router.git remove bondy_config --- diff --git a/bondy_config/bondy.conf b/bondy_config/bondy.conf deleted file mode 100644 index 79ed8ca..0000000 --- a/bondy_config/bondy.conf +++ /dev/null @@ -1,7 +0,0 @@ -distributed_cookie = bondy -nodename = bondy@127.0.0.1 -aae.data_exchange_timeout = 1m -aae.enabled = on -security.allow_anonymous_user = on -security.automatically_create_realms = off -security.config_file = $(platform_etc_dir)/security_config.json diff --git a/bondy_config/security_config.json b/bondy_config/security_config.json deleted file mode 100644 index 1e148fd..0000000 --- a/bondy_config/security_config.json +++ /dev/null @@ -1,36 +0,0 @@ -[ - { - "uri": "com.myrealm", - "description": "A test realm", - "authmethods": [ - "wampcra" - ], - "security_enabled": true, - "users": [], - "sources": [ - { - "usernames": "all", - "authmethod": "password", - "cidr": "0.0.0.0/0", - "meta": { - "description": "Allows all users from any network authenticate using password credentials." - } - } - ], - "grants": [ - { - "permissions": [ - "wamp.register", - "wamp.unregister", - "wamp.subscribe", - "wamp.unsubscribe", - "wamp.call", - "wamp.cancel", - "wamp.publish" - ], - "uri": "*", - "roles": "all" - } - ] - } -] diff --git a/lib/authentication.ex b/lib/authentication.ex deleted file mode 100644 index 419fb2d..0000000 --- a/lib/authentication.ex +++ /dev/null @@ -1,32 +0,0 @@ -defmodule Wampex.Authentication do - @moduledoc false - - alias Wampex.Crypto - - @enforce_keys [:authid, :authmethods, :secret] - defstruct [:authid, :authmethods, :secret] - - @callback handle(challenge :: binary(), auth :: __MODULE__.t()) :: {binary(), map()} - - @type t :: %__MODULE__{ - authid: binary(), - authmethods: [binary()], - secret: binary() - } - - def handle( - {"wampcra", %{"challenge" => ch, "salt" => salt, "iterations" => it, "keylen" => len}}, - auth - ) do - {auth - |> Map.get(:secret) - |> Crypto.pbkdf2(salt, it, len) - |> Crypto.hash_challenge(ch), %{}} - end - - def handle({"wampcra", %{"challenge" => ch}}, auth) do - {auth - |> Map.get(:secret) - |> Crypto.hash_challenge(ch), %{}} - end -end diff --git a/lib/client.ex b/lib/client.ex deleted file mode 100644 index b00a29a..0000000 --- a/lib/client.ex +++ /dev/null @@ -1,74 +0,0 @@ -defmodule Wampex.Client do - @moduledoc """ - Documentation for Wampex. - """ - use Supervisor - - alias Wampex.Client.Session, as: Sess - alias Wampex.Roles.{Callee, Subscriber} - alias Wampex.Roles.Callee.Register - alias Wampex.Roles.Subscriber.Subscribe - - @spec start_link(name: atom(), session_data: Sess.t()) :: - {:ok, pid()} - | {:error, {:already_started, pid()} | {:shutdown, term()} | term()} - def start_link(name: name, session: session_data) when is_atom(name) do - Supervisor.start_link(__MODULE__, {name, session_data}, name: name) - end - - @spec init({atom(), Sess.t()}) :: - {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore - def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do - session = session_name(name) - subscriber_registry = subscriber_registry_name(name) - callee_registry = callee_registry_name(name) - transport = transport_name(name) - session_data = %Sess{session_data | name: name, transport_pid: transport} - - children = [ - {Sess, [{:local, session}, session_data, []]}, - {t, [url: url, session: session, protocol: p, serializer: s, opts: [name: transport]]}, - {Registry, - [keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]}, - {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]} - ] - - Supervisor.init(children, strategy: :one_for_all, max_restarts: 0) - end - - @spec session_name(module()) :: module() - def session_name(name), do: Module.concat([name, Session]) - @spec subscriber_registry_name(module()) :: module() - def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry]) - @spec callee_registry_name(module()) :: module() - def callee_registry_name(name), do: Module.concat([name, CalleeRegistry]) - @spec transport_name(module()) :: module() - def transport_name(name), do: Module.concat([name, Transport]) - - @spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) :: - {:ok, integer()} - def subscribe(name, %Subscribe{topic: t} = sub, timeout \\ 5000) do - {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(sub), timeout) - Registry.register(subscriber_registry_name(name), id, t) - {:ok, id} - end - - @spec register(name :: module(), register :: Register.t(), timeout :: integer()) :: - {:ok, integer()} - def register(name, %Register{procedure: p} = reg, timeout \\ 5000) do - {:ok, id} = Sess.send_request(session_name(name), Callee.register(reg), timeout) - Registry.register(callee_registry_name(name), id, p) - {:ok, id} - end - - @spec send_request(name :: module(), request :: Wampex.message(), timeout :: integer()) :: - term() - def send_request(name, request, timeout \\ 5000) do - Sess.send_request(session_name(name), request, timeout) - end - - @spec cast_send_request(name :: module(), Wampex.message()) :: :ok - def cast_send_request(name, request) do - Sess.cast_send_request(session_name(name), request) - end -end diff --git a/lib/client/session.ex b/lib/client/session.ex deleted file mode 100644 index 5c55a1d..0000000 --- a/lib/client/session.ex +++ /dev/null @@ -1,451 +0,0 @@ -defmodule Wampex.Client.Session do - @moduledoc """ - A state machine based process for managing a WAMP Session. Utilizes StatesLanguage to implement the gen_statem process. See priv/session.json for the JSON representation - """ - use StatesLanguage, data: "priv/client.json" - - @max_id 9_007_199_254_740_992 - - alias StatesLanguage, as: SL - alias Wampex.Realm - alias Wampex.Roles.Peer - alias Wampex.Roles.Peer.{Authenticate, Hello} - alias Wampex.Serializers.MessagePack - alias __MODULE__, as: Sess - alias Wampex.Client - alias Wampex.Client.Transports.WebSocket - - @yield 70 - @hello 1 - @abort 3 - - @enforce_keys [:url, :roles] - - defstruct [ - :id, - :url, - :transport_pid, - :message, - :event, - :invocation, - :goodbye, - :error, - :realm, - :name, - :roles, - :challenge, - :interrupt, - message_queue: [], - request_id: 0, - protocol: "wamp.2.msgpack", - transport: WebSocket, - serializer: MessagePack, - requests: [] - ] - - @type t :: %__MODULE__{ - id: integer() | nil, - url: binary(), - transport_pid: pid() | module() | nil, - message: Wampex.message() | nil, - event: Wampex.event() | nil, - invocation: Wampex.invocation() | nil, - goodbye: binary() | nil, - realm: Realm.t(), - name: module() | nil, - roles: [module()], - challenge: {binary(), binary()} | nil, - interrupt: integer() | nil, - message_queue: [], - request_id: integer(), - protocol: binary(), - transport: module(), - serializer: module(), - requests: [] - } - - ## States - @established "Established" - @init "Init" - @challenge "Challenge" - @abort "Abort" - @goodbye "Goodbye" - @event "Event" - @invocation "Invocation" - @interrupt "Interrupt" - @handshake "Handshake" - - ## Resources - @init_transport "InitTransport" - @wait_transport "WaitForTransport" - @hello "Hello" - @handle_challenge "HandleChallenge" - @handle_established "HandleEstablished" - @handle_abort "HandleAbort" - @handle_goodbye "HandleGoodbye" - @handle_event "HandleEvent" - @handle_invocation "HandleInvocation" - @handle_message "HandleMessage" - @handle_interrupt "HandleInterrupt" - @handle_handshake "HandleHandshake" - - @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message(), from :: pid()) :: - :ok - def cast_send_request(name, request, pid) do - __MODULE__.cast(name, {:send_request, request, pid}) - end - - @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message()) :: :ok - def cast_send_request(name, request) do - __MODULE__.cast(name, {:send_request, request}) - end - - @spec send_request(name :: atom() | pid(), request :: Wampex.message(), timeout :: integer()) :: - term() - def send_request(name, request, timeout) do - __MODULE__.call(name, {:send_request, request}, timeout) - end - - @impl true - def handle_call( - {:send_request, request}, - from, - @established, - %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data - ) do - request_id = do_send(r_id, tt, t, request) - - {:ok, - %SL{ - data - | data: %Sess{ - sess - | request_id: request_id, - requests: [{request_id, from} | sess.requests] - } - }, []} - end - - @impl true - def handle_call( - {:send_request, request}, - from, - _, - %SL{data: %Sess{message_queue: mq} = sess} = data - ) do - Logger.debug("Queueing request: #{inspect(request)}") - {:ok, %SL{data | data: %Sess{sess | message_queue: [{request, from} | mq]}}, []} - end - - @impl true - def handle_cast( - {:send_request, request, from}, - @established, - %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data - ) do - request_id = do_send(r_id, tt, t, request) - - {:ok, - %SL{ - data - | data: %Sess{ - sess - | request_id: request_id, - requests: [{request_id, from} | sess.requests] - } - }, []} - end - - @impl true - def handle_cast( - {:send_request, request}, - @established, - %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data - ) do - request_id = do_send(r_id, tt, t, request) - - {:ok, - %SL{ - data - | data: %Sess{sess | request_id: request_id} - }, []} - end - - @impl true - def handle_cast( - {:send_request, request}, - _, - %SL{data: %Sess{message_queue: mq} = sess} = data - ) do - Logger.debug("Queueing request: #{inspect(request)}") - {:ok, %SL{data | data: %Sess{sess | message_queue: [{request, nil} | mq]}}, []} - end - - @impl true - def handle_resource( - @init_transport, - _, - @wait_transport, - data - ) do - Logger.debug("Waiting for transport to connect...") - {:ok, %SL{data | data: %Sess{data.data | roles: [Peer | data.data.roles]}}, []} - end - - @impl true - def handle_resource( - @hello, - _, - @init, - %SL{ - data: %Sess{ - transport: tt, - transport_pid: t, - roles: roles, - realm: %Realm{name: realm, authentication: auth} - } - } = data - ) do - message = maybe_authentication(%Hello{realm: realm, roles: roles}, auth) - tt.send_request(t, Peer.hello(message)) - {:ok, data, [{:next_event, :internal, :hello_sent}]} - end - - @impl true - def handle_resource( - @handle_challenge, - _, - @challenge, - %SL{ - data: %Sess{ - transport: tt, - transport_pid: t, - challenge: challenge, - realm: %Realm{authentication: auth} - } - } = sl - ) do - {signature, extra} = auth.__struct__.handle(challenge, auth) - tt.send_request(t, Peer.authenticate(%Authenticate{signature: signature, extra: extra})) - {:ok, sl, [{:next_event, :internal, :challenged}]} - end - - @impl true - def handle_resource( - @goodbye, - _, - @handle_goodbye, - %SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data - ) do - tt.send_message(t, Peer.goodbye(goodbye)) - {:ok, data, []} - end - - @impl true - def handle_resource( - @handle_message, - _, - _, - %SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data - ) do - Logger.debug("Handling Message #{inspect(msg)}") - {actions, id, response} = handle_message(msg, roles) - {%SL{data: sess} = data, response} = maybe_update_response(data, response) - {requests, actions} = resp = handle_response(id, actions, requests, response) - Logger.debug("Response: #{inspect(resp)}") - {:ok, %SL{data | data: %Sess{sess | requests: requests}}, actions} - end - - @impl true - def handle_resource(@handle_abort, _, @abort, data) do - Logger.warn("Aborting: #{data.data.error}") - {:ok, data, []} - end - - @impl true - def handle_resource( - @handle_established, - _, - @established, - %SL{ - data: - %Sess{ - request_id: r_id, - transport: tt, - transport_pid: t, - message_queue: mq, - requests: reqs - } = sess - } = data - ) do - {request_id, requests} = send_message_queue(r_id, mq, tt, t, reqs) - requests = remove_cast_requests(requests) - - {:ok, - %SL{ - data - | data: %Sess{sess | requests: requests, request_id: request_id, message_queue: []} - }, []} - end - - @impl true - def handle_resource(@handle_handshake, _, @handshake, data) do - {:ok, data, []} - end - - @impl true - def handle_resource( - @handle_event, - _, - @event, - %SL{data: %Sess{name: name, event: event}} = sl - ) do - Logger.debug("Received Event #{inspect(event)}") - - sub = Client.subscriber_registry_name(name) - - Registry.dispatch(sub, elem(event, 1), fn entries -> - for {pid, _topic} <- entries, do: send(pid, event) - end) - - {:ok, sl, [{:next_event, :internal, :transition}]} - end - - @impl true - def handle_resource( - @handle_invocation, - _, - @invocation, - %SL{data: %Sess{name: name, invocation: invocation}} = sl - ) do - Logger.debug("Received Invocation #{inspect(invocation)}") - reg = Client.callee_registry_name(name) - - Registry.dispatch(reg, elem(invocation, 2), fn entries -> - for {pid, _procedure} <- entries, do: send(pid, invocation) - end) - - {:ok, sl, [{:next_event, :internal, :transition}]} - end - - @impl true - def handle_resource( - @handle_interrupt, - _, - @interrupt, - %SL{data: %Sess{interrupt: {id, opts}, name: name}} = data - ) do - [{callee, _procedure}] = Registry.lookup(Client.callee_registry_name(name), id) - send(callee, {:interrupt, id, opts}) - {:ok, data, [{:next_event, :internal, :transition}]} - end - - @impl true - def handle_resource(resource, _, state, data) do - Logger.error("No specific resource handler for #{resource} in state #{state}") - {:ok, data, []} - end - - @impl true - def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do - {:ok, %SL{data | data: %Sess{sess | message: message}}, - [{:next_event, :internal, :message_received}]} - end - - defp handle_response(nil, actions, requests, _), do: {requests, actions} - - defp handle_response(id, actions, requests, response) do - Enum.reduce_while(requests, {requests, actions}, fn - {^id, nil}, _ -> - {:halt, {remove_request(id, requests), actions}} - - {^id, from}, _ -> - {:halt, {remove_request(id, requests), [get_response_action(from, response) | actions]}} - - {_, _}, acc -> - {:cont, acc} - end) - end - - defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do - send(from, {:progress, arg_l, arg_kw}) - [] - end - - defp get_response_action(from, response), do: {:reply, from, response} - - defp maybe_update_response(data, {:update, key, resp}) do - {%SL{data | data: Map.put(data.data, key, resp)}, resp} - end - - defp maybe_update_response(data, resp), do: {data, resp} - - defp do_send(r_id, tt, t, message) do - {request_id, message} = maybe_inject_request_id(r_id, message) - tt.send_request(t, remove_nil_values(message)) - request_id - end - - defp maybe_authentication(message, nil), do: message - - defp maybe_authentication(%Hello{} = message, auth) do - %Hello{message | options: Map.from_struct(auth)} - end - - defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1) - - defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {r_id, message} - defp maybe_inject_request_id(r_id, [@hello | _] = message), do: {r_id, message} - defp maybe_inject_request_id(r_id, [@abort | _] = message), do: {r_id, message} - - defp maybe_inject_request_id(r_id, message) do - request_id = get_request_id(r_id) - {request_id, List.insert_at(message, 1, request_id)} - end - - defp get_request_id(current_id) when current_id == @max_id do - 1 - end - - defp get_request_id(current_id) do - current_id + 1 - end - - defp remove_request(id, requests) do - Enum.filter(requests, fn - {^id, _} -> false - {_id, _} -> true - end) - end - - defp send_message_queue(r_id, [], _, _, reqs), do: {r_id, reqs} - - defp send_message_queue(r_id, mq, tt, t, reqs) do - mq - |> Enum.reverse() - |> Enum.reduce({r_id, reqs}, fn {request, from}, {id, requests} -> - r = do_send(id, tt, t, request) - {r, [{r, from} | requests]} - end) - end - - def remove_cast_requests([]), do: [] - - def remove_cast_requests(requests) do - Enum.filter(requests, fn - {_, nil} -> false - {_, _} -> true - end) - end - - defp handle_message(msg, roles) do - Enum.reduce_while(roles, nil, fn r, _ -> - try do - res = r.handle(msg) - {:halt, res} - rescue - FunctionClauseError -> {:cont, nil} - end - end) - end -end diff --git a/lib/client/transports/web_socket.ex b/lib/client/transports/web_socket.ex deleted file mode 100644 index b0c3d85..0000000 --- a/lib/client/transports/web_socket.ex +++ /dev/null @@ -1,78 +0,0 @@ -defmodule Wampex.Client.Transports.WebSocket do - use WebSockex - - @behaviour Wampex.Transport - - alias WebSockex.Conn - - require Logger - - @header "Sec-WebSocket-Protocol" - @ping_interval 20_000 - - @impl true - def send_request(t, data) do - WebSockex.cast(t, {:send_request, data}) - end - - @impl true - def start_link( - url: url, - session: session, - protocol: protocol, - serializer: serializer, - opts: opts - ) do - url - |> Conn.new(extra_headers: [{@header, protocol}]) - |> WebSockex.start_link( - __MODULE__, - %{session: session, serializer: serializer, connected: false}, - opts - ) - end - - @impl true - def handle_connect(%Conn{host: h, port: p, path: pa, query: q} = _conn, state) do - Logger.debug("Connected to #{h}:#{p}#{pa}?#{q}") - Process.send_after(self(), :ping, @ping_interval) - send(state.session, {:connected, true}) - {:ok, %{state | connected: true}} - end - - @impl true - def handle_ping(_ping, state) do - {:reply, :pong, state} - end - - @impl true - def handle_pong(_pong, state) do - {:ok, state} - end - - @impl true - def handle_cast({:send_request, data}, %{serializer: s} = state) do - Logger.debug("Sending Request #{inspect(data)}") - {:reply, {s.data_type(), s.serialize!(data)}, state} - end - - @impl true - def handle_cast({:connected, resp}, state) do - send(resp, {:connected, state.connected}) - {:ok, state} - end - - @impl true - def handle_info(:ping, state) do - Process.send_after(self(), :ping, @ping_interval) - {:reply, :ping, state} - end - - @impl true - def handle_frame({type, message}, %{serializer: s} = state) do - message = s.deserialize!(message) - Logger.debug("Received #{type} data: #{inspect(message)}") - send(state.session, {:set_message, message}) - {:ok, state} - end -end diff --git a/lib/crypto.ex b/lib/crypto.ex deleted file mode 100644 index b41c1b4..0000000 --- a/lib/crypto.ex +++ /dev/null @@ -1,19 +0,0 @@ -defmodule Wampex.Crypto do - @moduledoc false - def hash_challenge(key, data) do - :hmac - |> :crypto.mac(:sha256, key, data) - |> Base.encode64() - end - - def pbkdf2(secret, salt, iterations, keylen) do - {:ok, derived} = :pbkdf2.pbkdf2(:sha256, secret, salt, iterations, keylen) - Base.encode64(derived) - end - - def random_string(length) do - length - |> :crypto.strong_rand_bytes() - |> Base.encode64() - end -end diff --git a/lib/realm.ex b/lib/realm.ex deleted file mode 100644 index d218bbd..0000000 --- a/lib/realm.ex +++ /dev/null @@ -1,12 +0,0 @@ -defmodule Wampex.Realm do - @moduledoc "Defines the WAMP Realm" - alias Wampex.Authentication - - @enforce_keys [:name] - defstruct [:name, :authentication] - - @type t :: %__MODULE__{ - name: binary(), - authentication: Authentication.t() | nil - } -end diff --git a/lib/role.ex b/lib/role.ex deleted file mode 100644 index 5e68ce8..0000000 --- a/lib/role.ex +++ /dev/null @@ -1,6 +0,0 @@ -defmodule Wampex.Role do - @moduledoc "Behaviour for Roles" - @callback add(map()) :: map() - @callback handle(message :: Wampex.message()) :: - {[:gen_statem.action()], integer() | nil, Wampex.handle_response()} -end diff --git a/lib/roles/broker.ex b/lib/roles/broker.ex deleted file mode 100644 index 89036b4..0000000 --- a/lib/roles/broker.ex +++ /dev/null @@ -1,120 +0,0 @@ -defmodule Wampex.Roles.Broker do - @moduledoc """ - Handles requests and responses for a Broker - """ - alias Wampex.Role - @behaviour Role - - @publish 16 - @published 17 - @subscribe 32 - @subscribed 33 - @unsubscribe 34 - @unsubscribed 35 - @event 36 - - defmodule Event do - @moduledoc false - @enforce_keys [:subscription_id, :publication_id] - defstruct [:subscription_id, :publication_id, arg_list: [], arg_kw: %{}, options: %{}] - - @type t :: %__MODULE__{ - subscription_id: integer(), - publication_id: integer(), - arg_list: list(any()), - arg_kw: map(), - options: map() - } - end - - defmodule Subscribed do - @moduledoc false - @enforce_keys [:request_id, :subscription_id] - defstruct [:request_id, :subscription_id] - - @type t :: %__MODULE__{ - request_id: integer(), - subscription_id: integer() - } - end - - defmodule Unsubscribed do - @moduledoc false - @enforce_keys [:request_id] - defstruct [:request_id] - - @type t :: %__MODULE__{ - request_id: integer() - } - end - - defmodule Published do - @moduledoc false - @enforce_keys [:request_id, :publication_id] - defstruct [:request_id, :publication_id] - - @type t :: %__MODULE__{ - request_id: integer(), - publication_id: integer() - } - end - - @impl true - def add(roles) do - Map.put(roles, :broker, %{}) - end - - @spec event(Event.t()) :: Wampex.message() - def event(%Event{ - subscription_id: si, - publication_id: pi, - arg_list: al, - arg_kw: akw, - options: opts - }) do - [@event, si, pi, opts, al, akw] - end - - @spec subscribed(Subscribed.t()) :: Wampex.message() - def subscribed(%Subscribed{request_id: ri, subscription_id: si}) do - [@subscribed, ri, si] - end - - @spec unsubscribed(Unsubscribed.t()) :: Wampex.message() - def unsubscribed(%Unsubscribed{request_id: ri}) do - [@unsubscribed, ri] - end - - @spec published(Published.t()) :: Wampex.message() - def published(%Published{request_id: ri, publication_id: pi}) do - [@published, ri, pi] - end - - @impl true - def handle([@subscribe, request_id, opts, topic]) do - {[{:next_event, :internal, :subscribe}], request_id, - {:update, :subscribe, {:subscribe, request_id, opts, topic}}} - end - - @impl true - def handle([@unsubscribe, request_id, id]) do - {[{:next_event, :internal, :unsubscribe}], request_id, - {:update, :unsubscribe, {:unsubscribe, request_id, id}}} - end - - @impl true - def handle([@publish, id, opts, topic]) do - handle([@publish, id, opts, topic, [], %{}]) - end - - @impl true - def handle([@publish, id, opts, topic, arg_l]) do - handle([@publish, id, opts, topic, arg_l, %{}]) - end - - @impl true - def handle([@publish, id, opts, topic, arg_l, arg_kw]) do - {[{:next_event, :internal, :publish}], id, - {:update, :publish, {:publish, id, opts, topic, arg_l, arg_kw}}} - end -end diff --git a/lib/roles/callee.ex b/lib/roles/callee.ex deleted file mode 100644 index 54b0fd0..0000000 --- a/lib/roles/callee.ex +++ /dev/null @@ -1,101 +0,0 @@ -defmodule Wampex.Roles.Callee do - @moduledoc """ - Handles requests and responses for a Callee - """ - - alias Wampex.Role - @behaviour Role - - @register 64 - @registered 65 - @unregister 66 - @unregistered 67 - @invocation 68 - @yield 70 - @interrupt 69 - - defmodule Register do - @moduledoc false - @enforce_keys [:procedure] - defstruct [:procedure, options: %{}] - - @type t :: %__MODULE__{ - procedure: binary(), - options: map() - } - end - - defmodule Unregister do - @moduledoc false - @enforce_keys [:registration_id] - defstruct [:registration_id] - - @type t :: %__MODULE__{ - registration_id: integer() - } - end - - defmodule Yield do - @moduledoc false - @enforce_keys [:request_id] - defstruct [:request_id, arg_list: [], arg_kw: %{}, options: %{}] - - @type t :: %__MODULE__{ - request_id: integer(), - arg_list: list(any()), - arg_kw: map(), - options: map() - } - end - - @impl true - def add(roles) do - Map.put(roles, :callee, %{}) - end - - @spec register(Register.t()) :: Wampex.message() - def register(%Register{procedure: p, options: opts}) do - [@register, opts, p] - end - - @spec unregister(Unregister.t()) :: Wampex.message() - def unregister(%Unregister{registration_id: ri}) do - [@unregister, ri] - end - - @spec yield(Yield.t()) :: Wampex.message() - def yield(%Yield{request_id: ri, arg_list: al, arg_kw: akw, options: opts}) do - [@yield, ri, opts, al, akw] - end - - @impl true - def handle([@unregistered, request_id]) do - {[{:next_event, :internal, :established}], request_id, {:ok, request_id}} - end - - @impl true - def handle([@registered, request_id, id]) do - {[{:next_event, :internal, :established}], request_id, {:ok, id}} - end - - @impl true - def handle([@invocation, id, reg_id, dets]) do - handle([@invocation, id, reg_id, dets, [], %{}]) - end - - @impl true - def handle([@invocation, id, reg_id, dets, arg_l]) do - handle([@invocation, id, dets, reg_id, arg_l, %{}]) - end - - @impl true - def handle([@invocation, id, reg_id, dets, arg_l, arg_kw]) do - {[{:next_event, :internal, :invocation}], id, - {:update, :invocation, {:invocation, id, reg_id, dets, arg_l, arg_kw}}} - end - - @impl true - def handle([@interrupt, id, opts]) do - {[{:next_event, :internal, :interrupt}], id, {:update, :interrupt, {id, opts}}} - end -end diff --git a/lib/roles/caller.ex b/lib/roles/caller.ex deleted file mode 100644 index 068eb28..0000000 --- a/lib/roles/caller.ex +++ /dev/null @@ -1,74 +0,0 @@ -defmodule Wampex.Roles.Caller do - @moduledoc """ - Handles requests and responses for a Caller - """ - - alias Wampex.Role - alias Wampex.Roles.Peer.Error - @behaviour Role - - @call 48 - @cancel 49 - @error 8 - @result 50 - - defmodule Call do - @moduledoc false - @enforce_keys [:procedure] - - defstruct [:procedure, arg_list: [], arg_kw: %{}, options: %{}] - - @type t :: %__MODULE__{ - procedure: binary(), - arg_list: list(any()), - arg_kw: map(), - options: map() - } - end - - defmodule Cancel do - @moduledoc false - @enforce_keys [:request_id] - defstruct [:request_id, options: %{}] - - @type t :: %__MODULE__{ - request_id: integer(), - options: map() - } - end - - @impl true - def add(roles) do - Map.put(roles, :caller, %{}) - end - - @spec call(Call.t()) :: Wampex.message() - def call(%Call{procedure: p, arg_list: al, arg_kw: akw, options: opts}) do - [@call, opts, p, al, akw] - end - - @spec call_error(Error.t()) :: Wampex.message() - def call_error(%Error{request_id: rid, error: er, details: dets, arg_l: al, arg_kw: akw}) do - [@error, @call, rid, dets, er, al, akw] - end - - @spec cancel(Cancel.t()) :: Wampex.message() - def cancel(%Cancel{request_id: ri, options: opts}) do - [@cancel, ri, opts] - end - - @impl true - def handle([@result, id, dets]) do - handle([@result, id, dets, [], %{}]) - end - - @impl true - def handle([@result, id, dets, arg_l]) do - handle([@result, id, dets, arg_l, %{}]) - end - - @impl true - def handle([@result, id, dets, arg_l, arg_kw]) do - {[{:next_event, :internal, :established}], id, {:ok, dets, arg_l, arg_kw}} - end -end diff --git a/lib/roles/dealer.ex b/lib/roles/dealer.ex deleted file mode 100644 index d8e9ae0..0000000 --- a/lib/roles/dealer.ex +++ /dev/null @@ -1,143 +0,0 @@ -defmodule Wampex.Roles.Dealer do - @moduledoc """ - Handles requests and responses for a Dealer - """ - alias Wampex.Role - @behaviour Role - - @call 48 - @result 50 - @register 64 - @registered 65 - @unregister 66 - @unregistered 67 - @invocation 68 - @yield 70 - - defmodule Registered do - @moduledoc false - @enforce_keys [:request_id, :registration_id] - defstruct [:request_id, :registration_id] - - @type t :: %__MODULE__{ - request_id: integer(), - registration_id: integer() - } - end - - defmodule Unregistered do - @moduledoc false - @enforce_keys [:request_id] - defstruct [:request_id] - - @type t :: %__MODULE__{ - request_id: integer() - } - end - - defmodule Result do - @moduledoc false - @enforce_keys [:request_id] - defstruct [:request_id, arg_list: [], arg_kw: %{}, options: %{}] - - @type t :: %__MODULE__{ - request_id: integer(), - arg_list: list(any()), - arg_kw: map(), - options: map() - } - end - - defmodule Invocation do - @moduledoc false - @enforce_keys [:request_id, :registration_id] - defstruct [:request_id, :registration_id, arg_list: [], arg_kw: %{}, options: %{}] - - @type t :: %__MODULE__{ - request_id: integer(), - registration_id: integer(), - arg_list: list(any()), - arg_kw: map(), - options: map() - } - end - - @impl true - def add(roles) do - Map.put(roles, :dealer, %{}) - end - - @spec registered(Registered.t()) :: Wampex.message() - def registered(%Registered{request_id: ri, registration_id: reg_id}) do - [@registered, ri, reg_id] - end - - @spec unregistered(Unregistered.t()) :: Wampex.message() - def unregistered(%Unregistered{request_id: ri}) do - [@unregistered, ri] - end - - @spec invocation(Invocation.t()) :: Wampex.message() - def invocation(%Invocation{ - request_id: ri, - registration_id: reg_id, - options: opts, - arg_list: al, - arg_kw: akw - }) do - [@invocation, ri, reg_id, opts, al, akw] - end - - @spec result(Result.t()) :: Wampex.message() - def result(%Result{ - request_id: ri, - arg_list: al, - arg_kw: akw, - options: opts - }) do - [@result, ri, opts, al, akw] - end - - @impl true - def handle([@unregister, request_id, registration_id]) do - {[{:next_event, :internal, :unregister}], request_id, - {:update, :unregister, {:unregister, request_id, registration_id}}} - end - - @impl true - def handle([@register, request_id, opts, procedure]) do - {[{:next_event, :internal, :register}], request_id, - {:update, :register, {:register, request_id, opts, procedure}}} - end - - @impl true - def handle([@call, id, dets, proc]) do - handle([@call, id, dets, proc, [], %{}]) - end - - @impl true - def handle([@call, id, dets, proc, arg_l]) do - handle([@call, id, dets, proc, arg_l, %{}]) - end - - @impl true - def handle([@call, id, dets, proc, arg_l, arg_kw]) do - {[{:next_event, :internal, :call}], id, - {:update, :call, {:call, id, dets, proc, arg_l, arg_kw}}} - end - - @impl true - def handle([@yield, id, dets]) do - handle([@yield, id, dets, [], %{}]) - end - - @impl true - def handle([@yield, id, dets, arg_l]) do - handle([@yield, id, dets, arg_l, %{}]) - end - - @impl true - def handle([@yield, id, dets, arg_l, arg_kw]) do - {[{:next_event, :internal, :yield}], id, {:update, :yield, {:yield, id, dets, arg_l, arg_kw}}} - end -end diff --git a/lib/roles/peer.ex b/lib/roles/peer.ex deleted file mode 100644 index 23552fe..0000000 --- a/lib/roles/peer.ex +++ /dev/null @@ -1,179 +0,0 @@ -defmodule Wampex.Roles.Peer do - @moduledoc """ - Handles requests and responses for low-level Peer/Session interactions - """ - alias Wampex.Role - @behaviour Role - - @hello 1 - @welcome 2 - @abort 3 - @challenge 4 - @authenticate 5 - @goodbye 6 - @error 8 - - defmodule Hello do - @moduledoc false - @enforce_keys [:realm, :roles] - defstruct [:realm, :roles, options: %{}, agent: "WAMPex"] - - @type t :: %__MODULE__{ - realm: binary(), - roles: nonempty_list(module()), - agent: binary(), - options: map() - } - end - - defmodule Welcome do - @moduledoc false - @enforce_keys [:session_id] - defstruct [:session_id, options: %{}] - - @type t :: %__MODULE__{ - session_id: integer(), - options: map() - } - end - - defmodule Challenge do - @moduledoc false - @enforce_keys [:auth_method] - defstruct [:auth_method, options: %{}] - - @type t :: %__MODULE__{ - auth_method: String.t(), - options: %{} - } - end - - defmodule Goodbye do - @moduledoc false - @enforce_keys [:reason] - defstruct [:reason, options: %{}] - - @type t :: %__MODULE__{ - reason: binary(), - options: map() - } - end - - defmodule Authenticate do - @moduledoc false - @enforce_keys [:signature, :extra] - defstruct [:signature, :extra] - - @type t :: %__MODULE__{ - signature: binary(), - extra: map() - } - end - - defmodule Abort do - @moduledoc false - @enforce_keys [:reason] - defstruct [:reason, details: %{}] - - @type t :: %__MODULE__{ - reason: String.t(), - details: map() - } - end - - defmodule Error do - @moduledoc false - @enforce_keys [:error] - defstruct [:request_id, :error, arg_l: [], arg_kw: %{}, details: %{}] - - @type t :: %__MODULE__{ - request_id: integer() | nil, - error: String.t(), - arg_l: list(), - arg_kw: map(), - details: map() - } - end - - @impl true - def add(roles), do: roles - - @spec hello(Hello.t()) :: Wampex.message() - def hello(%Hello{realm: r, roles: roles, agent: agent, options: opts}) do - options = - Map.merge(opts, %{agent: agent, roles: Enum.reduce(roles, %{}, fn r, acc -> r.add(acc) end)}) - - [@hello, r, options] - end - - @spec welcome(Welcome.t()) :: Wampex.message() - def welcome(%Welcome{session_id: si, options: opts}) do - [@welcome, si, opts] - end - - @spec abort(Abort.t()) :: Wampex.message() - def abort(%Abort{reason: reason, details: opts}) do - [@abort, opts, reason] - end - - @spec challenge(Challenge.t()) :: Wampex.message() - def challenge(%Challenge{auth_method: am, options: opts}) do - [@challenge, am, opts] - end - - @spec goodbye(Goodbye.t()) :: Wampex.message() - def goodbye(%Goodbye{reason: r, options: opts}) do - [@goodbye, opts, r] - end - - @spec authenticate(Authenticate.t()) :: Wampex.message() - def authenticate(%Authenticate{signature: s, extra: e}) do - [@authenticate, s, e] - end - - @impl true - def handle([@hello, realm, dets]) do - {[{:next_event, :internal, :hello}], nil, {:update, :hello, {:hello, realm, dets}}} - end - - @impl true - def handle([@authenticate, sig, dets]) do - {[{:next_event, :internal, :authenticate}], nil, - {:update, :authenticate, {:authenticate, sig, dets}}} - end - - @impl true - def handle([@welcome, session_id, _dets]) do - {[{:next_event, :internal, :established}], nil, {:update, :id, session_id}} - end - - @impl true - def handle([@abort, _dets, reason]) do - {[{:next_event, :internal, :abort}], nil, {:update, :error, reason}} - end - - @impl true - def handle([@goodbye, _dets, reason]) do - {[{:next_event, :internal, :goodbye}], nil, {:update, :goodbye, reason}} - end - - @impl true - def handle([@challenge, auth_method, extra]) do - {[{:next_event, :internal, :challenge}], nil, {:update, :challenge, {auth_method, extra}}} - end - - @impl true - def handle([@error, type, id, dets, error]) do - handle([@error, type, id, dets, error, [], %{}]) - end - - @impl true - def handle([@error, type, id, dets, error, arg_l]) do - handle([@error, type, id, dets, error, arg_l, %{}]) - end - - @impl true - def handle([@error, type, id, dets, error, arg_l, arg_kw]) do - {[{:next_event, :internal, :established}], id, {:error, type, error, dets, arg_l, arg_kw}} - end -end diff --git a/lib/roles/publisher.ex b/lib/roles/publisher.ex deleted file mode 100644 index d31d7ca..0000000 --- a/lib/roles/publisher.ex +++ /dev/null @@ -1,39 +0,0 @@ -defmodule Wampex.Roles.Publisher do - @moduledoc """ - Handles requests and responses for Publishers - """ - - alias Wampex.Role - @behaviour Role - - @publish 16 - @published 17 - - defmodule Publish do - @moduledoc false - @enforce_keys [:topic] - defstruct [:topic, arg_list: [], arg_kw: %{}, options: %{}] - - @type t :: %__MODULE__{ - topic: binary(), - arg_list: list(any()), - arg_kw: map(), - options: map() - } - end - - @impl true - def add(roles) do - Map.put(roles, :publisher, %{}) - end - - @spec publish(Publish.t()) :: Wampex.message() - def publish(%Publish{topic: t, options: opts, arg_list: al, arg_kw: akw}) do - [@publish, opts, t, al, akw] - end - - @impl true - def handle([@published, request_id, id]) do - {[{:next_event, :internal, :established}], request_id, {:ok, id}} - end -end diff --git a/lib/roles/subscriber.ex b/lib/roles/subscriber.ex deleted file mode 100644 index db3e86f..0000000 --- a/lib/roles/subscriber.ex +++ /dev/null @@ -1,81 +0,0 @@ -defmodule Wampex.Roles.Subscriber do - @moduledoc """ - Handles requests and responses for Subscribers - """ - - alias Wampex.Role - @behaviour Role - - @subscribe 32 - @subscribed 33 - @unsubscribe 34 - @unsubscribed 35 - @event 36 - - defmodule Subscribe do - @moduledoc false - @enforce_keys [:topic] - defstruct [:topic, options: %{}] - - @type t :: %__MODULE__{ - topic: binary(), - options: map() - } - end - - defmodule Unsubscribe do - @moduledoc false - @enforce_keys [:subscription_id] - defstruct [:subscription_id] - - @type t :: %__MODULE__{ - subscription_id: integer() - } - end - - @impl true - def add(roles) do - Map.put(roles, :subscriber, %{}) - end - - @spec subscribe(Subscribe.t()) :: Wampex.message() - def subscribe(%Subscribe{topic: t, options: opts}) do - [@subscribe, opts, t] - end - - @spec unsubscribe(Unsubscribe.t()) :: Wampex.message() - def unsubscribe(%Unsubscribe{subscription_id: si}) do - [@unsubscribe, si] - end - - @impl true - def handle([@subscribed, request_id, id]) do - {[{:next_event, :internal, :established}], request_id, {:ok, id}} - end - - @impl true - def handle(<<@unsubscribed, request_id>>) do - handle([@unsubscribed, request_id]) - end - - @impl true - def handle([@unsubscribed, request_id]) do - {[{:next_event, :internal, :established}], request_id, :ok} - end - - @impl true - def handle([@event, sub_id, pub_id, dets]) do - handle([@event, sub_id, pub_id, dets, [], %{}]) - end - - @impl true - def handle([@event, sub_id, pub_id, dets, arg_l]) do - handle([@event, sub_id, pub_id, dets, arg_l, %{}]) - end - - @impl true - def handle([@event, sub_id, pub_id, dets, arg_l, arg_kw]) do - {[{:next_event, :internal, :event}], nil, - {:update, :event, {:event, sub_id, pub_id, dets, arg_l, arg_kw}}} - end -end diff --git a/lib/router/admin.ex b/lib/router/admin.ex index 51866dd..1d484fb 100644 --- a/lib/router/admin.ex +++ b/lib/router/admin.ex @@ -3,6 +3,7 @@ defmodule Wampex.Router.Admin do use GenServer require Logger alias Wampex.Roles.Dealer.{Invocation, Result} + alias Wampex.Roles.Peer.Error alias Wampex.Router.Authentication.{User, Realm} alias Wampex.Router.Session @@ -40,10 +41,16 @@ defmodule Wampex.Router.Admin do } = event, {pid, node}}, %{proxy: proxy} = state ) do - realm = Realm.get(uri: realm) - %User{id: id} = User.create(authid: authid, password: password, realm: realm) - Logger.info("Admin handled event: #{inspect(event)}") - send({proxy, node}, {%Result{request_id: req_id, arg_list: [id]}, pid}) + with realm <- Realm.get(uri: realm), + %User{id: id} <- User.create(authid: authid, password: password, realm: realm) do + Logger.info("Admin handled event: #{inspect(event)}") + + send({proxy, node}, {%Result{request_id: req_id, arg_list: [id]}, pid}) + else + _er -> + send({proxy, node}, {%Error{request_id: req_id, error: "Error registering user"}, pid}) + end + {:noreply, state} end end diff --git a/lib/router/authentication.ex b/lib/router/authentication.ex index 5ebf970..56c59e9 100644 --- a/lib/router/authentication.ex +++ b/lib/router/authentication.ex @@ -39,6 +39,13 @@ defmodule Wampex.Router.Authentication do } end + def parse_challenge(challenge) do + ch = JSON.deserialize!(challenge.challenge) + + {get_in(ch, ["authid"]), get_in(ch, ["authrole"]), get_in(ch, ["authmethod"]), + get_in(ch, ["authprovider"])} + end + def authenticate(signature, realm, authid, %{ challenge: challenge }) do diff --git a/lib/router/authentication/user.ex b/lib/router/authentication/user.ex index ea470af..65bf4f9 100644 --- a/lib/router/authentication/user.ex +++ b/lib/router/authentication/user.ex @@ -18,7 +18,6 @@ defmodule Wampex.Router.Authentication.User do use Ecto.Schema alias __MODULE__ - alias Ecto.Migration alias Wampex.Crypto alias Wampex.Router.Authentication.{Realm, Repo} @@ -34,6 +33,10 @@ defmodule Wampex.Router.Authentication.User do timestamps() end + def get(authid: _authid, realm: nil) do + :invalid_realm + end + def get(authid: authid, realm: realm) do Repo.get_by(User, authid: authid, realm_id: realm.id) end diff --git a/lib/router/session.ex b/lib/router/session.ex index 2cd7847..106437a 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -11,7 +11,6 @@ defmodule Wampex.Router.Session do alias Wampex.Roles.{Broker, Caller, Dealer, Peer} alias Wampex.Roles.Peer.{Abort, Challenge, Error, Welcome} alias Wampex.Router - alias Wampex.Serializers.JSON alias Broker.{Subscribed, Published, Event, Unsubscribed} alias Dealer.{Invocation, Registered, Result, Unregistered} alias Router.Authentication @@ -172,7 +171,7 @@ defmodule Wampex.Router.Session do %SL{ data: %Sess{ - id: id, + id: session_id, transport: tt, transport_pid: t, hello: {:hello, realm, dets}, @@ -181,35 +180,7 @@ defmodule Wampex.Router.Session do } = data } = sl ) do - Logger.info("Hello #{inspect(dets)}") - - {actions, challenge} = - case dets do - %{"authid" => ai, "authmethods" => am} -> - case auth.authenticate?(am) do - true -> - ch = auth.challenge(realm, ai, id) - - chal = %Challenge{ - auth_method: auth.method(), - options: ch - } - - send_to_peer( - Peer.challenge(chal), - tt, - t - ) - - {[], ch} - - false -> - {[{:next_event, :internal, :abort}], nil} - end - - %{} -> - {[{:next_event, :internal, :abort}], nil} - end + {actions, challenge} = maybe_challenge(auth, realm, dets, session_id, tt, t) {:ok, %SL{ @@ -241,36 +212,9 @@ defmodule Wampex.Router.Session do } } = sl ) do - ch = JSON.deserialize!(challenge.challenge) - authid = get_in(ch, ["authid"]) - authrole = get_in(ch, ["authrole"]) - authmethod = get_in(ch, ["authmethod"]) - authprovider = get_in(ch, ["authprovider"]) - - actions = - case auth.authenticate(sig, realm, authid, challenge) do - true -> - send_to_peer( - Peer.welcome(%Welcome{ - session_id: session_id, - options: %{ - agent: "WAMPex Router", - authid: authid, - authrole: authrole, - authmethod: authmethod, - authprovider: authprovider, - roles: %{broker: %{}, dealer: %{}} - } - }), - tt, - t - ) + info = auth.parse_challenge(challenge) - [{:next_event, :internal, :transition}] - - false -> - [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}] - end + actions = maybe_welcome(auth, session_id, sig, realm, challenge, info, tt, t) {:ok, sl, actions} end @@ -610,6 +554,12 @@ defmodule Wampex.Router.Session do {:ok, data, []} end + @impl true + def handle_info(%Error{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do + send_to_peer(Caller.call_error(e), tt, t) + {:ok, data, []} + end + @impl true def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do {:ok, %SL{data | data: %Sess{sess | message: message}}, @@ -650,6 +600,70 @@ defmodule Wampex.Router.Session do ) end + defp maybe_welcome( + auth, + session_id, + sig, + realm, + challenge, + {authid, authrole, authmethod, authprovider}, + tt, + t + ) do + case auth.authenticate(sig, realm, authid, challenge) do + true -> + send_to_peer( + Peer.welcome(%Welcome{ + session_id: session_id, + options: %{ + agent: "WAMPex Router", + authid: authid, + authrole: authrole, + authmethod: authmethod, + authprovider: authprovider, + roles: %{broker: %{}, dealer: %{}} + } + }), + tt, + t + ) + + [{:next_event, :internal, :transition}] + + false -> + [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}] + end + end + + defp maybe_challenge(auth, realm, details, session_id, tt, t) do + case details do + %{"authid" => ai, "authmethods" => am} -> + case auth.authenticate?(am) do + true -> + ch = auth.challenge(realm, ai, session_id) + + chal = %Challenge{ + auth_method: auth.method(), + options: ch + } + + send_to_peer( + Peer.challenge(chal), + tt, + t + ) + + {[], ch} + + false -> + {[{:next_event, :internal, :abort}], nil} + end + + %{} -> + {[{:next_event, :internal, :abort}], nil} + end + end + defp remove_subscription(subs, subscription_id, realm, db) do {id, topic, wc} = sub = diff --git a/lib/serializer.ex b/lib/serializer.ex deleted file mode 100644 index 7879878..0000000 --- a/lib/serializer.ex +++ /dev/null @@ -1,9 +0,0 @@ -defmodule Wampex.Serializer do - @moduledoc "Behaviour for Serializers" - @type data_type :: :binary | :text - @callback data_type() :: data_type() - @callback serialize!(data :: Wampex.message()) :: binary() - @callback serialize(data :: Wampex.message()) :: {:ok, binary()} - @callback deserialize!(data :: binary()) :: Wampex.message() - @callback deserialize(data :: binary()) :: {:ok, Wampex.message()} -end diff --git a/lib/serializers/json.ex b/lib/serializers/json.ex deleted file mode 100644 index 732bcdb..0000000 --- a/lib/serializers/json.ex +++ /dev/null @@ -1,15 +0,0 @@ -defmodule Wampex.Serializers.JSON do - @moduledoc "JSON Serializer" - @behaviour Wampex.Serializer - - @impl true - def data_type, do: :text - @impl true - def serialize!(data), do: Jason.encode!(data) - @impl true - def serialize(data), do: Jason.encode(data) - @impl true - def deserialize!(binary), do: Jason.decode!(binary) - @impl true - def deserialize(binary), do: Jason.decode(binary) -end diff --git a/lib/serializers/message_pack.ex b/lib/serializers/message_pack.ex deleted file mode 100644 index e048a12..0000000 --- a/lib/serializers/message_pack.ex +++ /dev/null @@ -1,20 +0,0 @@ -defmodule Wampex.Serializers.MessagePack do - @moduledoc "MessgePack Serializer" - @behaviour Wampex.Serializer - - @impl true - def data_type, do: :binary - @impl true - def serialize!(data), do: :msgpack.pack(data, []) - @impl true - def serialize(data), do: {:ok, :msgpack.pack(data, [])} - @impl true - def deserialize!(data) do - {:ok, res} = :msgpack.unpack(data, [{:known_atoms, []}, {:unpack_str, :as_binary}]) - res - end - - @impl true - def deserialize(data), - do: :msgpack.unpack(data, [{:known_atoms, []}, {:unpack_str, :as_binary}]) -end diff --git a/lib/transport.ex b/lib/transport.ex deleted file mode 100644 index b801962..0000000 --- a/lib/transport.ex +++ /dev/null @@ -1,13 +0,0 @@ -defmodule Wampex.Transport do - @moduledoc "Behaviour for Transports" - @callback send_request(transport :: atom() | pid(), message :: Wampex.message()) :: :ok - @callback start_link( - url: binary(), - session: Wampex.Client.Session.t(), - protocol: binary(), - serializer: module(), - opts: [] - ) :: - {:ok, pid} - | {:error, term} -end diff --git a/lib/wampex.ex b/lib/wampex.ex deleted file mode 100644 index bdf7f14..0000000 --- a/lib/wampex.ex +++ /dev/null @@ -1,64 +0,0 @@ -defmodule Wampex do - @moduledoc """ - Types for wampex - """ - @type message_part :: integer() | binary() | map() | list() - @type message :: nonempty_list(message_part()) - @type arg_list :: [] | nonempty_list(any()) - @type arg_keyword :: map() - - @type hello :: {:hello, realm :: String.t(), details :: map()} - - @type authenticate :: {:authenticate, signature :: binary(), details :: map()} - - @type call :: - {:call, request_id :: integer(), details :: map(), procedure :: String.t(), - arg_list :: arg_list(), arg_keywords :: arg_keyword()} - - @type yield :: - {:yield, request_id :: integer(), details :: map(), arg_list :: arg_list(), - arg_keywords :: arg_keyword()} - - @type publish :: - {:publish, request_id :: integer(), details :: map(), topic :: String.t(), - arg_list :: arg_list(), arg_keywords :: arg_keyword()} - - @type register :: - {:register, request_id :: integer(), details :: map(), procedure :: String.t()} - - @type unregister :: {:unregister, request_id :: integer(), registration_id :: integer()} - - @type unsubscribe :: - {:unsubscribe, request_id :: integer(), id :: integer()} - - @type subscribe :: - {:subscribe, request_id :: integer(), details :: map(), topic :: String.t()} - @type invocation :: - {:invocation, request_id :: integer(), registration_id :: integer(), details :: map(), - arg_list :: arg_list(), arg_keywords :: arg_keyword()} - @type event :: - {:event, subscription_id :: integer(), publication_id :: integer(), details :: map(), - arg_list :: arg_list(), arg_keyword :: arg_keyword()} - @type error :: - {:error, reason :: binary()} - | {:error, type :: integer(), error :: binary(), details :: map(), - arg_list :: arg_list(), arg_keyword :: arg_keyword()} - @type messages :: - publish() - | hello() - | authenticate() - | unsubscribe() - | subscribe() - | invocation() - | register() - | unregister() - | call() - | yield() - | error() - | event() - - @type handle_response :: - {:ok, integer()} - | {:ok, details :: map(), arg_list :: arg_list(), arg_keyword :: arg_keyword()} - | {:update, atom(), messages()} -end diff --git a/mix.exs b/mix.exs index 33ccc3e..fe1474b 100644 --- a/mix.exs +++ b/mix.exs @@ -1,9 +1,9 @@ -defmodule Wampex.MixProject do +defmodule Wampex.Router.MixProject do use Mix.Project def project do [ - app: :wampex, + app: :wampex_router, version: "0.1.0", elixir: "~> 1.9", start_permanent: Mix.env() == :prod, @@ -53,7 +53,8 @@ defmodule Wampex.MixProject do {:plug_cowboy, "~> 2.1"}, {:postgrex, ">= 0.0.0"}, {:states_language, "~> 0.2"}, - {:websockex, "~> 0.4"} + {:wampex, path: "../wampex"}, + {:wampex_client, path: "../wampex_client", only: [:dev, :test]} ] end diff --git a/mix.lock b/mix.lock index 3bae761..b3b8586 100644 --- a/mix.lock +++ b/mix.lock @@ -5,10 +5,9 @@ "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm", "4a0850c9be22a43af9920a71ab17c051f5f7d45c209e40269a1938832510e4d9"}, "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"}, "cors_plug": {:hex, :cors_plug, "2.0.2", "2b46083af45e4bc79632bd951550509395935d3e7973275b2b743bd63cc942ce", [:mix], [{:plug, "~> 1.8", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "f0d0e13f71c51fd4ef8b2c7e051388e4dfb267522a83a22392c856de7e46465f"}, - "coveralls": {:hex, :coveralls, "2.1.0", "b44cea21202c0e1994dfced8cedbff66bcd03eeeb565484345b28afacda0559b", [:rebar3], [{:jsx, "2.10.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "bbfbe2a7bebb2d22db15a657e6c1733243e1501714d60fe14a14790d96e7c3b2"}, "cowboy": {:hex, :cowboy, "2.7.0", "91ed100138a764355f43316b1d23d7ff6bdb0de4ea618cb5d8677c93a7a2f115", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "04fd8c6a39edc6aaa9c26123009200fc61f92a3a94f3178c527b70b767c6e605"}, "cowlib": {:hex, :cowlib, "2.8.0", "fd0ff1787db84ac415b8211573e9a30a3ebe71b5cbff7f720089972b2319c8a4", [:rebar3], [], "hexpm", "79f954a7021b302186a950a32869dbc185523d99d3e44ce430cd1f3289f41ed4"}, - "credo": {:hex, :credo, "1.2.2", "f57faf60e0a12b0ba9fd4bad07966057fde162b33496c509b95b027993494aab", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8f2623cd8c895a6f4a55ef10f3fdf6a55a9ca7bef09676bd835551687bf8a740"}, + "credo": {:hex, :credo, "1.3.1", "082e8d9268a489becf8e7aa75671a7b9088b1277cd6c1b13f40a55554b3f5126", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "0da816ed52fa520b9ea0e5d18a0d3ca269e0bd410b1174d88d8abd94be6cce3c"}, "db_connection": {:hex, :db_connection, "2.2.1", "caee17725495f5129cb7faebde001dc4406796f12a62b8949f4ac69315080566", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "2b02ece62d9f983fcd40954e443b7d9e6589664380e5546b2b9b523cd0fb59e1"}, "decimal": {:hex, :decimal, "1.8.1", "a4ef3f5f3428bdbc0d35374029ffcf4ede8533536fa79896dd450168d9acdf3c", [:mix], [], "hexpm", "3cb154b00225ac687f6cbd4acc4b7960027c757a5152b369923ead9ddbca7aec"}, "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"}, @@ -17,14 +16,13 @@ "ecto_sql": {:hex, :ecto_sql, "3.3.4", "aa18af12eb875fbcda2f75e608b3bd534ebf020fc4f6448e4672fcdcbb081244", [:mix], [{:db_connection, "~> 2.2", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.4 or ~> 3.3.3", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.3.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.15.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "5eccbdbf92e3c6f213007a82d5dbba4cd9bb659d1a21331f89f408e4c0efd7a8"}, "elixpath": {:hex, :elixpath, "0.1.0", "f860e931db7bda6856dc68145694ca429643cc068ef30d7ff6b4096d4357963e", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "30ce06079b41f1f5216ea2cd11605cfe4c82239628555cb3fde9f10055a6eb67"}, "ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "0db1ee8d1547ab4877c5b5dffc6604ef9454e189928d5ba8967d4a58a801f161"}, - "excoveralls": {:hex, :excoveralls, "0.12.2", "a513defac45c59e310ac42fcf2b8ae96f1f85746410f30b1ff2b710a4b6cd44b", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "151c476331d49b45601ffc45f43cb3a8beb396b02a34e3777fea0ad34ae57d89"}, + "excoveralls": {:hex, :excoveralls, "0.12.3", "2142be7cb978a3ae78385487edda6d1aff0e482ffc6123877bb7270a8ffbcfe0", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "568a3e616c264283f5dea5b020783ae40eef3f7ee2163f7a67cbd7b35bcadada"}, "hackney": {:hex, :hackney, "1.15.2", "07e33c794f8f8964ee86cebec1a8ed88db5070e52e904b8f12209773c1036085", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.5", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "e0100f8ef7d1124222c11ad362c857d3df7cb5f4204054f9f0f4a728666591fc"}, "idna": {:hex, :idna, "6.0.0", "689c46cbcdf3524c44d5f3dde8001f364cd7608a99556d8fbd8239a5798d4c10", [:rebar3], [{:unicode_util_compat, "0.4.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "4bdd305eb64e18b0273864920695cb18d7a2021f31a11b9c5fbcd9a253f936e2"}, - "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"}, + "jason": {:hex, :jason, "1.2.0", "10043418c42d2493d0ee212d3fddd25d7ffe484380afad769a0a38795938e448", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "116747dbe057794c3a3e4e143b7c8390b29f634e16c78a7f59ba75bfa6852e7f"}, "json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"}, - "jsx": {:hex, :jsx, "2.10.0", "77760560d6ac2b8c51fd4c980e9e19b784016aa70be354ce746472c33beb0b1c", [:rebar3], [], "hexpm", "9a83e3704807298016968db506f9fad0f027de37546eb838b3ae1064c3a0ad62"}, - "libcluster": {:hex, :libcluster, "3.2.0", "ed483eea1b960b89a4cee374898644930b28759226716cd7aeaf707976c5966d", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "350e7c2cd5311b9dfb0156d5f5778561e5833f90fabdaac3cd79ea022c4dc3dc"}, - "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"}, + "libcluster": {:hex, :libcluster, "3.2.1", "b2cd5b447cde25d5897749bee6f7aaeb6c96ac379481024e9b6ba495dabeb97d", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "89f225612d135edce9def56f43bf18d575d88ac4680e3f6161283f2e55cadca4"}, + "libring": {:hex, :libring, "1.5.0", "44313eb6862f5c9168594a061e9d5f556a9819da7c6444706a9e2da533396d70", [:mix], [], "hexpm", "04e843d4fdcff49a62d8e03778d17c6cb2a03fe2d14020d3825a1761b55bd6cc"}, "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a10c6eb62cca416019663129699769f0c2ccf39428b3bb3c0cb38c718a0c186d"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, diff --git a/priv/repo/migrations/20200319210508_create_realms.exs b/priv/repo/migrations/20200319210508_create_realms.exs index 0ec59d6..86f165b 100644 --- a/priv/repo/migrations/20200319210508_create_realms.exs +++ b/priv/repo/migrations/20200319210508_create_realms.exs @@ -18,7 +18,7 @@ defmodule Wampex.Router.Authentication.Repo.Migrations.CreateRealms do add(:salt, :string, null: false) add(:iterations, :integer, null: false) add(:keylen, :integer, null: false) - add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all)) + add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all), null: false) timestamps() end diff --git a/test/wampex_test.exs b/test/wampex_test.exs index 4e874f9..3544821 100644 --- a/test/wampex_test.exs +++ b/test/wampex_test.exs @@ -2,7 +2,7 @@ defmodule WampexTest do use ExUnit.Case, async: true doctest Wampex - alias Wampex.{Authentication, Realm} + alias Wampex.Client.{Authentication, Realm} alias Wampex.Client alias Wampex.Roles.{Broker, Callee, Caller, Dealer, Peer, Publisher, Subscriber} alias Wampex.Router @@ -351,6 +351,21 @@ defmodule WampexTest do assert is_binary(id) end + @tag :client + test "admin callee is invoked and responds with error" do + caller_name = TestAdminErrorCaller + Client.start_link(name: caller_name, session: @session) + + assert {:error, 48, "Error registering user", %{}, [], %{}} = + Client.send_request( + caller_name, + Caller.call(%Call{ + procedure: "admin.create_user", + arg_kw: %{authid: "chris", password: "woot!", realm: "not.real"} + }) + ) + end + @tag :client test "callee is invoked and responds and caller gets result" do callee_name = TestCalleeRespond