From: Christopher Date: Fri, 20 Mar 2020 21:40:49 +0000 (-0500) Subject: moving router and client to their own repos X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=88662824b7d0e1747583fc657b1f9f9f83bed37f;p=wampex.git moving router and client to their own repos --- diff --git a/lib/authentication.ex b/lib/authentication.ex deleted file mode 100644 index 419fb2d..0000000 --- a/lib/authentication.ex +++ /dev/null @@ -1,32 +0,0 @@ -defmodule Wampex.Authentication do - @moduledoc false - - alias Wampex.Crypto - - @enforce_keys [:authid, :authmethods, :secret] - defstruct [:authid, :authmethods, :secret] - - @callback handle(challenge :: binary(), auth :: __MODULE__.t()) :: {binary(), map()} - - @type t :: %__MODULE__{ - authid: binary(), - authmethods: [binary()], - secret: binary() - } - - def handle( - {"wampcra", %{"challenge" => ch, "salt" => salt, "iterations" => it, "keylen" => len}}, - auth - ) do - {auth - |> Map.get(:secret) - |> Crypto.pbkdf2(salt, it, len) - |> Crypto.hash_challenge(ch), %{}} - end - - def handle({"wampcra", %{"challenge" => ch}}, auth) do - {auth - |> Map.get(:secret) - |> Crypto.hash_challenge(ch), %{}} - end -end diff --git a/lib/client.ex b/lib/client.ex deleted file mode 100644 index b00a29a..0000000 --- a/lib/client.ex +++ /dev/null @@ -1,74 +0,0 @@ -defmodule Wampex.Client do - @moduledoc """ - Documentation for Wampex. - """ - use Supervisor - - alias Wampex.Client.Session, as: Sess - alias Wampex.Roles.{Callee, Subscriber} - alias Wampex.Roles.Callee.Register - alias Wampex.Roles.Subscriber.Subscribe - - @spec start_link(name: atom(), session_data: Sess.t()) :: - {:ok, pid()} - | {:error, {:already_started, pid()} | {:shutdown, term()} | term()} - def start_link(name: name, session: session_data) when is_atom(name) do - Supervisor.start_link(__MODULE__, {name, session_data}, name: name) - end - - @spec init({atom(), Sess.t()}) :: - {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore - def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do - session = session_name(name) - subscriber_registry = subscriber_registry_name(name) - callee_registry = callee_registry_name(name) - transport = transport_name(name) - session_data = %Sess{session_data | name: name, transport_pid: transport} - - children = [ - {Sess, [{:local, session}, session_data, []]}, - {t, [url: url, session: session, protocol: p, serializer: s, opts: [name: transport]]}, - {Registry, - [keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]}, - {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]} - ] - - Supervisor.init(children, strategy: :one_for_all, max_restarts: 0) - end - - @spec session_name(module()) :: module() - def session_name(name), do: Module.concat([name, Session]) - @spec subscriber_registry_name(module()) :: module() - def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry]) - @spec callee_registry_name(module()) :: module() - def callee_registry_name(name), do: Module.concat([name, CalleeRegistry]) - @spec transport_name(module()) :: module() - def transport_name(name), do: Module.concat([name, Transport]) - - @spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) :: - {:ok, integer()} - def subscribe(name, %Subscribe{topic: t} = sub, timeout \\ 5000) do - {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(sub), timeout) - Registry.register(subscriber_registry_name(name), id, t) - {:ok, id} - end - - @spec register(name :: module(), register :: Register.t(), timeout :: integer()) :: - {:ok, integer()} - def register(name, %Register{procedure: p} = reg, timeout \\ 5000) do - {:ok, id} = Sess.send_request(session_name(name), Callee.register(reg), timeout) - Registry.register(callee_registry_name(name), id, p) - {:ok, id} - end - - @spec send_request(name :: module(), request :: Wampex.message(), timeout :: integer()) :: - term() - def send_request(name, request, timeout \\ 5000) do - Sess.send_request(session_name(name), request, timeout) - end - - @spec cast_send_request(name :: module(), Wampex.message()) :: :ok - def cast_send_request(name, request) do - Sess.cast_send_request(session_name(name), request) - end -end diff --git a/lib/client/session.ex b/lib/client/session.ex deleted file mode 100644 index 5c55a1d..0000000 --- a/lib/client/session.ex +++ /dev/null @@ -1,451 +0,0 @@ -defmodule Wampex.Client.Session do - @moduledoc """ - A state machine based process for managing a WAMP Session. Utilizes StatesLanguage to implement the gen_statem process. See priv/session.json for the JSON representation - """ - use StatesLanguage, data: "priv/client.json" - - @max_id 9_007_199_254_740_992 - - alias StatesLanguage, as: SL - alias Wampex.Realm - alias Wampex.Roles.Peer - alias Wampex.Roles.Peer.{Authenticate, Hello} - alias Wampex.Serializers.MessagePack - alias __MODULE__, as: Sess - alias Wampex.Client - alias Wampex.Client.Transports.WebSocket - - @yield 70 - @hello 1 - @abort 3 - - @enforce_keys [:url, :roles] - - defstruct [ - :id, - :url, - :transport_pid, - :message, - :event, - :invocation, - :goodbye, - :error, - :realm, - :name, - :roles, - :challenge, - :interrupt, - message_queue: [], - request_id: 0, - protocol: "wamp.2.msgpack", - transport: WebSocket, - serializer: MessagePack, - requests: [] - ] - - @type t :: %__MODULE__{ - id: integer() | nil, - url: binary(), - transport_pid: pid() | module() | nil, - message: Wampex.message() | nil, - event: Wampex.event() | nil, - invocation: Wampex.invocation() | nil, - goodbye: binary() | nil, - realm: Realm.t(), - name: module() | nil, - roles: [module()], - challenge: {binary(), binary()} | nil, - interrupt: integer() | nil, - message_queue: [], - request_id: integer(), - protocol: binary(), - transport: module(), - serializer: module(), - requests: [] - } - - ## States - @established "Established" - @init "Init" - @challenge "Challenge" - @abort "Abort" - @goodbye "Goodbye" - @event "Event" - @invocation "Invocation" - @interrupt "Interrupt" - @handshake "Handshake" - - ## Resources - @init_transport "InitTransport" - @wait_transport "WaitForTransport" - @hello "Hello" - @handle_challenge "HandleChallenge" - @handle_established "HandleEstablished" - @handle_abort "HandleAbort" - @handle_goodbye "HandleGoodbye" - @handle_event "HandleEvent" - @handle_invocation "HandleInvocation" - @handle_message "HandleMessage" - @handle_interrupt "HandleInterrupt" - @handle_handshake "HandleHandshake" - - @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message(), from :: pid()) :: - :ok - def cast_send_request(name, request, pid) do - __MODULE__.cast(name, {:send_request, request, pid}) - end - - @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message()) :: :ok - def cast_send_request(name, request) do - __MODULE__.cast(name, {:send_request, request}) - end - - @spec send_request(name :: atom() | pid(), request :: Wampex.message(), timeout :: integer()) :: - term() - def send_request(name, request, timeout) do - __MODULE__.call(name, {:send_request, request}, timeout) - end - - @impl true - def handle_call( - {:send_request, request}, - from, - @established, - %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data - ) do - request_id = do_send(r_id, tt, t, request) - - {:ok, - %SL{ - data - | data: %Sess{ - sess - | request_id: request_id, - requests: [{request_id, from} | sess.requests] - } - }, []} - end - - @impl true - def handle_call( - {:send_request, request}, - from, - _, - %SL{data: %Sess{message_queue: mq} = sess} = data - ) do - Logger.debug("Queueing request: #{inspect(request)}") - {:ok, %SL{data | data: %Sess{sess | message_queue: [{request, from} | mq]}}, []} - end - - @impl true - def handle_cast( - {:send_request, request, from}, - @established, - %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data - ) do - request_id = do_send(r_id, tt, t, request) - - {:ok, - %SL{ - data - | data: %Sess{ - sess - | request_id: request_id, - requests: [{request_id, from} | sess.requests] - } - }, []} - end - - @impl true - def handle_cast( - {:send_request, request}, - @established, - %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data - ) do - request_id = do_send(r_id, tt, t, request) - - {:ok, - %SL{ - data - | data: %Sess{sess | request_id: request_id} - }, []} - end - - @impl true - def handle_cast( - {:send_request, request}, - _, - %SL{data: %Sess{message_queue: mq} = sess} = data - ) do - Logger.debug("Queueing request: #{inspect(request)}") - {:ok, %SL{data | data: %Sess{sess | message_queue: [{request, nil} | mq]}}, []} - end - - @impl true - def handle_resource( - @init_transport, - _, - @wait_transport, - data - ) do - Logger.debug("Waiting for transport to connect...") - {:ok, %SL{data | data: %Sess{data.data | roles: [Peer | data.data.roles]}}, []} - end - - @impl true - def handle_resource( - @hello, - _, - @init, - %SL{ - data: %Sess{ - transport: tt, - transport_pid: t, - roles: roles, - realm: %Realm{name: realm, authentication: auth} - } - } = data - ) do - message = maybe_authentication(%Hello{realm: realm, roles: roles}, auth) - tt.send_request(t, Peer.hello(message)) - {:ok, data, [{:next_event, :internal, :hello_sent}]} - end - - @impl true - def handle_resource( - @handle_challenge, - _, - @challenge, - %SL{ - data: %Sess{ - transport: tt, - transport_pid: t, - challenge: challenge, - realm: %Realm{authentication: auth} - } - } = sl - ) do - {signature, extra} = auth.__struct__.handle(challenge, auth) - tt.send_request(t, Peer.authenticate(%Authenticate{signature: signature, extra: extra})) - {:ok, sl, [{:next_event, :internal, :challenged}]} - end - - @impl true - def handle_resource( - @goodbye, - _, - @handle_goodbye, - %SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data - ) do - tt.send_message(t, Peer.goodbye(goodbye)) - {:ok, data, []} - end - - @impl true - def handle_resource( - @handle_message, - _, - _, - %SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data - ) do - Logger.debug("Handling Message #{inspect(msg)}") - {actions, id, response} = handle_message(msg, roles) - {%SL{data: sess} = data, response} = maybe_update_response(data, response) - {requests, actions} = resp = handle_response(id, actions, requests, response) - Logger.debug("Response: #{inspect(resp)}") - {:ok, %SL{data | data: %Sess{sess | requests: requests}}, actions} - end - - @impl true - def handle_resource(@handle_abort, _, @abort, data) do - Logger.warn("Aborting: #{data.data.error}") - {:ok, data, []} - end - - @impl true - def handle_resource( - @handle_established, - _, - @established, - %SL{ - data: - %Sess{ - request_id: r_id, - transport: tt, - transport_pid: t, - message_queue: mq, - requests: reqs - } = sess - } = data - ) do - {request_id, requests} = send_message_queue(r_id, mq, tt, t, reqs) - requests = remove_cast_requests(requests) - - {:ok, - %SL{ - data - | data: %Sess{sess | requests: requests, request_id: request_id, message_queue: []} - }, []} - end - - @impl true - def handle_resource(@handle_handshake, _, @handshake, data) do - {:ok, data, []} - end - - @impl true - def handle_resource( - @handle_event, - _, - @event, - %SL{data: %Sess{name: name, event: event}} = sl - ) do - Logger.debug("Received Event #{inspect(event)}") - - sub = Client.subscriber_registry_name(name) - - Registry.dispatch(sub, elem(event, 1), fn entries -> - for {pid, _topic} <- entries, do: send(pid, event) - end) - - {:ok, sl, [{:next_event, :internal, :transition}]} - end - - @impl true - def handle_resource( - @handle_invocation, - _, - @invocation, - %SL{data: %Sess{name: name, invocation: invocation}} = sl - ) do - Logger.debug("Received Invocation #{inspect(invocation)}") - reg = Client.callee_registry_name(name) - - Registry.dispatch(reg, elem(invocation, 2), fn entries -> - for {pid, _procedure} <- entries, do: send(pid, invocation) - end) - - {:ok, sl, [{:next_event, :internal, :transition}]} - end - - @impl true - def handle_resource( - @handle_interrupt, - _, - @interrupt, - %SL{data: %Sess{interrupt: {id, opts}, name: name}} = data - ) do - [{callee, _procedure}] = Registry.lookup(Client.callee_registry_name(name), id) - send(callee, {:interrupt, id, opts}) - {:ok, data, [{:next_event, :internal, :transition}]} - end - - @impl true - def handle_resource(resource, _, state, data) do - Logger.error("No specific resource handler for #{resource} in state #{state}") - {:ok, data, []} - end - - @impl true - def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do - {:ok, %SL{data | data: %Sess{sess | message: message}}, - [{:next_event, :internal, :message_received}]} - end - - defp handle_response(nil, actions, requests, _), do: {requests, actions} - - defp handle_response(id, actions, requests, response) do - Enum.reduce_while(requests, {requests, actions}, fn - {^id, nil}, _ -> - {:halt, {remove_request(id, requests), actions}} - - {^id, from}, _ -> - {:halt, {remove_request(id, requests), [get_response_action(from, response) | actions]}} - - {_, _}, acc -> - {:cont, acc} - end) - end - - defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do - send(from, {:progress, arg_l, arg_kw}) - [] - end - - defp get_response_action(from, response), do: {:reply, from, response} - - defp maybe_update_response(data, {:update, key, resp}) do - {%SL{data | data: Map.put(data.data, key, resp)}, resp} - end - - defp maybe_update_response(data, resp), do: {data, resp} - - defp do_send(r_id, tt, t, message) do - {request_id, message} = maybe_inject_request_id(r_id, message) - tt.send_request(t, remove_nil_values(message)) - request_id - end - - defp maybe_authentication(message, nil), do: message - - defp maybe_authentication(%Hello{} = message, auth) do - %Hello{message | options: Map.from_struct(auth)} - end - - defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1) - - defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {r_id, message} - defp maybe_inject_request_id(r_id, [@hello | _] = message), do: {r_id, message} - defp maybe_inject_request_id(r_id, [@abort | _] = message), do: {r_id, message} - - defp maybe_inject_request_id(r_id, message) do - request_id = get_request_id(r_id) - {request_id, List.insert_at(message, 1, request_id)} - end - - defp get_request_id(current_id) when current_id == @max_id do - 1 - end - - defp get_request_id(current_id) do - current_id + 1 - end - - defp remove_request(id, requests) do - Enum.filter(requests, fn - {^id, _} -> false - {_id, _} -> true - end) - end - - defp send_message_queue(r_id, [], _, _, reqs), do: {r_id, reqs} - - defp send_message_queue(r_id, mq, tt, t, reqs) do - mq - |> Enum.reverse() - |> Enum.reduce({r_id, reqs}, fn {request, from}, {id, requests} -> - r = do_send(id, tt, t, request) - {r, [{r, from} | requests]} - end) - end - - def remove_cast_requests([]), do: [] - - def remove_cast_requests(requests) do - Enum.filter(requests, fn - {_, nil} -> false - {_, _} -> true - end) - end - - defp handle_message(msg, roles) do - Enum.reduce_while(roles, nil, fn r, _ -> - try do - res = r.handle(msg) - {:halt, res} - rescue - FunctionClauseError -> {:cont, nil} - end - end) - end -end diff --git a/lib/client/transports/web_socket.ex b/lib/client/transports/web_socket.ex deleted file mode 100644 index b0c3d85..0000000 --- a/lib/client/transports/web_socket.ex +++ /dev/null @@ -1,78 +0,0 @@ -defmodule Wampex.Client.Transports.WebSocket do - use WebSockex - - @behaviour Wampex.Transport - - alias WebSockex.Conn - - require Logger - - @header "Sec-WebSocket-Protocol" - @ping_interval 20_000 - - @impl true - def send_request(t, data) do - WebSockex.cast(t, {:send_request, data}) - end - - @impl true - def start_link( - url: url, - session: session, - protocol: protocol, - serializer: serializer, - opts: opts - ) do - url - |> Conn.new(extra_headers: [{@header, protocol}]) - |> WebSockex.start_link( - __MODULE__, - %{session: session, serializer: serializer, connected: false}, - opts - ) - end - - @impl true - def handle_connect(%Conn{host: h, port: p, path: pa, query: q} = _conn, state) do - Logger.debug("Connected to #{h}:#{p}#{pa}?#{q}") - Process.send_after(self(), :ping, @ping_interval) - send(state.session, {:connected, true}) - {:ok, %{state | connected: true}} - end - - @impl true - def handle_ping(_ping, state) do - {:reply, :pong, state} - end - - @impl true - def handle_pong(_pong, state) do - {:ok, state} - end - - @impl true - def handle_cast({:send_request, data}, %{serializer: s} = state) do - Logger.debug("Sending Request #{inspect(data)}") - {:reply, {s.data_type(), s.serialize!(data)}, state} - end - - @impl true - def handle_cast({:connected, resp}, state) do - send(resp, {:connected, state.connected}) - {:ok, state} - end - - @impl true - def handle_info(:ping, state) do - Process.send_after(self(), :ping, @ping_interval) - {:reply, :ping, state} - end - - @impl true - def handle_frame({type, message}, %{serializer: s} = state) do - message = s.deserialize!(message) - Logger.debug("Received #{type} data: #{inspect(message)}") - send(state.session, {:set_message, message}) - {:ok, state} - end -end diff --git a/lib/realm.ex b/lib/realm.ex deleted file mode 100644 index d218bbd..0000000 --- a/lib/realm.ex +++ /dev/null @@ -1,12 +0,0 @@ -defmodule Wampex.Realm do - @moduledoc "Defines the WAMP Realm" - alias Wampex.Authentication - - @enforce_keys [:name] - defstruct [:name, :authentication] - - @type t :: %__MODULE__{ - name: binary(), - authentication: Authentication.t() | nil - } -end diff --git a/lib/router.ex b/lib/router.ex deleted file mode 100644 index 2650155..0000000 --- a/lib/router.ex +++ /dev/null @@ -1,76 +0,0 @@ -defmodule Wampex.Router do - use Supervisor - - alias Wampex.Router.{Admin, Authentication, Proxy, REST} - alias Wampex.Router.Authentication.EnsureDefaultAdmin - alias Wampex.Router.Transports.WebSocket - - @spec start_link( - name: module(), - port: pos_integer(), - topologies: [], - replicas: integer(), - quorum: integer(), - admin_realm: String.t(), - admin_authid: String.t(), - admin_password: String.t() - ) :: - {:ok, pid()} - | {:error, {:already_started, pid()} | {:shutdown, term()} | term()} - def start_link( - name: name, - port: port, - topologies: topologies, - replicas: repls, - quorum: quorum, - admin_realm: admin_realm, - admin_authid: admin_authid, - admin_password: admin_password - ) - when is_atom(name) do - Supervisor.start_link( - __MODULE__, - {name, port, topologies, repls, quorum, admin_realm, admin_authid, admin_password}, - name: name - ) - end - - @spec init( - {module(), pos_integer(), list(), integer(), integer(), String.t(), String.t(), - String.t()} - ) :: - {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore - def init({name, port, topologies, replicas, quorum, admin_realm, admin_authid, admin_password}) do - children = [ - Authentication.Repo, - {EnsureDefaultAdmin, [uri: admin_realm, authid: admin_authid, password: admin_password]}, - {ClusterKV, - [ - name: db_name(name), - topologies: topologies, - replicas: replicas, - quorum: quorum - ]}, - {Proxy, [name: proxy_name(name)]}, - {Admin, - [name: admin_name(name), db: db_name(name), realm: admin_realm, proxy: proxy_name(name)]}, - {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch(name)]} - ] - - Supervisor.init(children, strategy: :one_for_one, max_restarts: 10, max_seconds: 10) - end - - def db_name(name), do: Module.concat([name, KV]) - def proxy_name(name), do: Module.concat([name, Proxy]) - def admin_name(name), do: Module.concat([name, Admin]) - - defp dispatch(name) do - [ - {:_, - [ - {"/ws", WebSocket, %{db: db_name(name), proxy: proxy_name(name)}}, - {:_, Plug.Cowboy.Handler, {REST, []}} - ]} - ] - end -end diff --git a/lib/router/admin.ex b/lib/router/admin.ex deleted file mode 100644 index 51866dd..0000000 --- a/lib/router/admin.ex +++ /dev/null @@ -1,49 +0,0 @@ -defmodule Wampex.Router.Admin do - @moduledoc false - use GenServer - require Logger - alias Wampex.Roles.Dealer.{Invocation, Result} - alias Wampex.Router.Authentication.{User, Realm} - alias Wampex.Router.Session - - @procedures [ - "admin.create_user", - "admin.create_realm" - ] - - def start_link(name: name, db: db, realm: realm, proxy: proxy) do - GenServer.start_link(__MODULE__, {db, realm, proxy}, name: name) - end - - def init({db, realm, proxy}) do - {:ok, %{db: db, realm: realm, proxy: proxy, regs: []}, {:continue, :ok}} - end - - def handle_continue(:ok, %{db: db, realm: realm, regs: regs} = state) do - regs = register_procedures(db, realm, regs) - {:noreply, %{state | regs: regs}} - end - - def register_procedures(db, realm, regs) do - Enum.reduce(@procedures, regs, fn proc, acc -> - val = {Session.get_global_id(), {self(), Node.self()}} - ClusterKV.put(db, realm, proc, val) - [{proc, val} | acc] - end) - end - - def handle_info( - {req_id, - %Invocation{ - arg_kw: %{"realm" => realm, "authid" => authid, "password" => password}, - options: %{"procedure" => "admin.create_user"} - } = event, {pid, node}}, - %{proxy: proxy} = state - ) do - realm = Realm.get(uri: realm) - %User{id: id} = User.create(authid: authid, password: password, realm: realm) - Logger.info("Admin handled event: #{inspect(event)}") - send({proxy, node}, {%Result{request_id: req_id, arg_list: [id]}, pid}) - {:noreply, state} - end -end diff --git a/lib/router/authentication.ex b/lib/router/authentication.ex deleted file mode 100644 index 5ebf970..0000000 --- a/lib/router/authentication.ex +++ /dev/null @@ -1,60 +0,0 @@ -defmodule Wampex.Router.Authentication do - @moduledoc false - - require Logger - - alias Wampex.Crypto - alias Wampex.Serializers.JSON - alias Wampex.Router.Authentication.{Realm, User} - - @wampcra "wampcra" - @auth_provider "userdb" - @auth_role "user" - - def authenticate?(methods) do - can_auth(methods) - end - - def method, do: @wampcra - - def challenge(realm, authid, session_id) do - %Realm{} = realm = Realm.get(uri: realm) - %User{} = user = User.get(authid: authid, realm: realm) - now = DateTime.to_iso8601(DateTime.utc_now()) - - %{ - challenge: - JSON.serialize!(%{ - nonce: Crypto.random_string(user.keylen), - authprovider: @auth_provider, - authid: authid, - timestamp: now, - authrole: @auth_role, - authmethod: @wampcra, - session: session_id - }), - salt: user.salt, - keylen: user.keylen, - iterations: user.iterations - } - end - - def authenticate(signature, realm, authid, %{ - challenge: challenge - }) do - authid - |> get_secret(realm) - |> Crypto.hash_challenge(challenge) - |> :pbkdf2.compare_secure(signature) - end - - defp get_secret(authid, uri) do - realm = Realm.get(uri: uri) - %User{password: password} = User.get(authid: authid, realm: realm) - password - end - - defp can_auth([]), do: false - defp can_auth([@wampcra | _]), do: true - defp can_auth([_ | t]), do: can_auth(t) -end diff --git a/lib/router/authentication/ensure_default_admin.ex b/lib/router/authentication/ensure_default_admin.ex deleted file mode 100644 index 9052191..0000000 --- a/lib/router/authentication/ensure_default_admin.ex +++ /dev/null @@ -1,19 +0,0 @@ -defmodule Wampex.Router.Authentication.EnsureDefaultAdmin do - @moduledoc false - require Logger - use GenServer - - alias Wampex.Router.Authentication.{Realm, User} - - def start_link(uri: uri, authid: authid, password: password) do - GenServer.start_link(__MODULE__, {uri, authid, password}) - end - - def init({uri, authid, password}) do - %Realm{} = realm = Realm.create(uri: uri) - %User{} = user = User.create(authid: authid, password: password, realm: realm) - Logger.info("Realm: #{inspect(realm)}") - Logger.info("User: #{inspect(user)}") - :ignore - end -end diff --git a/lib/router/authentication/realm.ex b/lib/router/authentication/realm.ex deleted file mode 100644 index 10fa68b..0000000 --- a/lib/router/authentication/realm.ex +++ /dev/null @@ -1,41 +0,0 @@ -defmodule Wampex.Router.Authentication.Realm do - @moduledoc """ - CREATE TABLE authentication.realms ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - uri STRING(255) NOT NULL UNIQUE, - parent STRING NULL, - inserted_at TIMESTAMP NOT NULL, - updated_at TIMESTAMP NOT NULL - ); - - """ - use Ecto.Schema - - alias __MODULE__ - alias Wampex.Router.Authentication.Repo - - @primary_key {:id, :binary_id, autogenerate: false, read_after_writes: true} - @foreign_key_type :binary_id - schema "realms" do - field(:uri, :string) - field(:parent, :string) - timestamps() - end - - def get(uri: uri) do - Repo.get_by(Realm, uri: uri) - end - - def create(uri: uri) do - try do - {:ok, r} = - %Realm{uri: uri, parent: uri} - |> Repo.insert() - - r - rescue - _er -> - get(uri: uri) - end - end -end diff --git a/lib/router/authentication/repo.ex b/lib/router/authentication/repo.ex deleted file mode 100644 index 9fe8e40..0000000 --- a/lib/router/authentication/repo.ex +++ /dev/null @@ -1,5 +0,0 @@ -defmodule Wampex.Router.Authentication.Repo do - use Ecto.Repo, - otp_app: :wampex, - adapter: Ecto.Adapters.Postgres -end diff --git a/lib/router/authentication/user.ex b/lib/router/authentication/user.ex deleted file mode 100644 index ea470af..0000000 --- a/lib/router/authentication/user.ex +++ /dev/null @@ -1,66 +0,0 @@ -defmodule Wampex.Router.Authentication.User do - @moduledoc """ - CREATE TABLE authentication.users ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - authid STRING(255) NOT NULL, - password STRING NOT NULL, - salt STRING NOT NULL, - iterations INT NOT NULL, - keylen INT NOT NULL, - realm_id UUID NOT NULL REFERENCES authentication.realms (id) ON DELETE CASCADE, - inserted_at TIMESTAMP NOT NULL, - updated_at TIMESTAMP NOT NULL, - UNIQUE (authid, realm_id), - INDEX (authid) - ); - - """ - - use Ecto.Schema - alias __MODULE__ - alias Ecto.Migration - alias Wampex.Crypto - alias Wampex.Router.Authentication.{Realm, Repo} - - @primary_key {:id, :binary_id, autogenerate: false, read_after_writes: true} - @foreign_key_type :binary_id - schema "users" do - field(:authid, :string) - field(:password, :string) - field(:salt, :string) - field(:iterations, :integer) - field(:keylen, :integer) - belongs_to(:realm, Realm) - timestamps() - end - - def get(authid: authid, realm: realm) do - Repo.get_by(User, authid: authid, realm_id: realm.id) - end - - def create(authid: authid, password: password, realm: realm) do - keylen = Application.get_env(:wampex, :keylen) - salt = Crypto.random_string(keylen) - iterations = Enum.random(10_000..50_000) - - try do - password = Crypto.pbkdf2(password, salt, iterations, keylen) - - {:ok, u} = - %User{ - authid: authid, - password: password, - realm: realm, - salt: salt, - iterations: iterations, - keylen: keylen - } - |> Repo.insert() - - u - rescue - _er -> - get(authid: authid, realm: realm) - end - end -end diff --git a/lib/router/proxy.ex b/lib/router/proxy.ex deleted file mode 100644 index 2ddb7d6..0000000 --- a/lib/router/proxy.ex +++ /dev/null @@ -1,19 +0,0 @@ -defmodule Wampex.Router.Proxy do - @moduledoc false - use GenServer - - def start_link(name: name) do - GenServer.start_link(__MODULE__, :ok, name: name) - end - - def init(:ok) do - {:ok, []} - end - - def handle_info({e, to}, state) do - send(to, e) - {:noreply, state} - end - - def handle_call({:is_up, pid}, _from, s), do: {:reply, Process.alive?(pid), s} -end diff --git a/lib/router/rest.ex b/lib/router/rest.ex deleted file mode 100644 index a15ad87..0000000 --- a/lib/router/rest.ex +++ /dev/null @@ -1,25 +0,0 @@ -defmodule Wampex.Router.REST do - @moduledoc false - use Plug.Router - import Plug.Conn - - plug(CORSPlug, origin: ["*"]) - plug(:match) - plug(:dispatch) - - get "/favicon.ico" do - send_resp(conn, 200, "ok") - end - - get "/alivez" do - send_resp(conn, 200, "ok") - end - - get "/readyz" do - send_resp(conn, 200, "ok") - end - - match _ do - send_resp(conn, 404, "oops") - end -end diff --git a/lib/router/session.ex b/lib/router/session.ex deleted file mode 100644 index 2cd7847..0000000 --- a/lib/router/session.ex +++ /dev/null @@ -1,779 +0,0 @@ -defmodule Wampex.Router.Session do - @moduledoc """ - A state machine based process for managing a WAMP Session. Utilizes StatesLanguage to implement the gen_statem process. See priv/session.json for the JSON representation - """ - use StatesLanguage, data: "priv/router.json" - - @max_id 9_007_199_254_740_992 - - alias __MODULE__, as: Sess - alias StatesLanguage, as: SL - alias Wampex.Roles.{Broker, Caller, Dealer, Peer} - alias Wampex.Roles.Peer.{Abort, Challenge, Error, Welcome} - alias Wampex.Router - alias Wampex.Serializers.JSON - alias Broker.{Subscribed, Published, Event, Unsubscribed} - alias Dealer.{Invocation, Registered, Result, Unregistered} - alias Router.Authentication - - @enforce_keys [:transport, :transport_pid] - defstruct [ - :id, - :db, - :proxy, - :transport, - :transport_pid, - :message, - :hello, - :authenticate, - :register, - :unregister, - :call, - :yield, - :subscribe, - :unsubscribe, - :publish, - :realm, - :name, - :goodbye, - :peer_information, - :challenge, - error: "wamp.error.protocol_violation", - authentication: Authentication, - roles: [Peer, Broker, Dealer], - registrations: [], - subscriptions: [], - invocations: [], - message_queue: [], - request_id: 0, - requests: [], - hello_received: false - ] - - @type t :: %__MODULE__{ - id: integer() | nil, - db: module() | nil, - proxy: module() | nil, - transport_pid: pid() | module() | nil, - message: Wampex.message() | nil, - hello: Wampex.hello() | nil, - authenticate: Wampex.authenticate() | nil, - register: Wampex.register() | nil, - unregister: Wampex.unregister() | nil, - call: Wampex.call() | nil, - yield: Wampex.yield() | nil, - subscribe: Wampex.subscribe() | nil, - unsubscribe: Wampex.unsubscribe() | nil, - publish: Wampex.publish() | nil, - goodbye: binary() | nil, - realm: String.t() | nil, - name: module() | nil, - challenge: map() | nil, - roles: [module()], - registrations: [], - subscriptions: [], - invocations: [], - message_queue: [], - request_id: integer(), - requests: [], - hello_received: boolean() - } - - ## States - @init "Init" - @established "Established" - @hello "Hello" - @authenticate "Authenticate" - @call "Call" - @yield "Yield" - @register "Register" - @unregister "Unregister" - @subscribe "Subscribe" - @unsubscribe "Unsubscribe" - @publish "Publish" - # @error "Error" - @abort "Abort" - @goodbye "Goodbye" - - ## Resources - @handle_init "HandleInit" - @handle_established "HandleEstablished" - @handle_message "HandleMessage" - @handle_hello "HandleHello" - @handle_authenticate "HandleAuthenticate" - @handle_call "HandleCall" - @handle_yield "HandleYield" - @handle_register "HandleRegister" - @handle_unregister "HandleUnregister" - @handle_subscribe "HandleSubscribe" - @handle_unsubscribe "HandleUnsubscribe" - @handle_publish "HandlePublish" - # @handle_interrupt "HandleInterrupt" - @handle_abort "HandleAbort" - @handle_goodbye "HandleGoodbye" - # @handle_cancel "HandleCancel" - - def get_global_id do - Enum.random(0..@max_id) - end - - @impl true - def handle_resource( - @handle_init, - _, - @init, - data - ) do - debug("Init") - - {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}}, - [{:next_event, :internal, :transition}]} - end - - @impl true - def handle_resource( - @handle_established, - _, - @established, - data - ) do - debug("Established") - {:ok, data, []} - end - - @impl true - def handle_resource( - @handle_message, - _, - _, - %SL{data: %Sess{message: msg, roles: roles}} = data - ) do - debug("Handling Message #{inspect(msg)}") - - {data, actions} = - case handle_message(msg, roles) do - {actions, _id, response} -> - {data, response} = maybe_update_response(data, response) - debug("Response: #{inspect(response)}") - {data, actions} - - nil -> - {data, [{:next_event, :internal, :abort}]} - end - - {:ok, data, actions} - end - - @impl true - def handle_resource( - @handle_hello, - _, - @hello, - %SL{ - data: - %Sess{ - id: id, - transport: tt, - transport_pid: t, - hello: {:hello, realm, dets}, - hello_received: false, - authentication: auth - } = data - } = sl - ) do - Logger.info("Hello #{inspect(dets)}") - - {actions, challenge} = - case dets do - %{"authid" => ai, "authmethods" => am} -> - case auth.authenticate?(am) do - true -> - ch = auth.challenge(realm, ai, id) - - chal = %Challenge{ - auth_method: auth.method(), - options: ch - } - - send_to_peer( - Peer.challenge(chal), - tt, - t - ) - - {[], ch} - - false -> - {[{:next_event, :internal, :abort}], nil} - end - - %{} -> - {[{:next_event, :internal, :abort}], nil} - end - - {:ok, - %SL{ - sl - | data: %Sess{ - data - | challenge: challenge, - hello_received: true, - realm: realm, - peer_information: dets - } - }, [{:next_event, :internal, :transition}] ++ actions} - end - - @impl true - def handle_resource( - @handle_authenticate, - _, - @authenticate, - %SL{ - data: %Sess{ - id: session_id, - transport: tt, - transport_pid: t, - authentication: auth, - challenge: challenge, - realm: realm, - authenticate: {:authenticate, sig, _dets} - } - } = sl - ) do - ch = JSON.deserialize!(challenge.challenge) - authid = get_in(ch, ["authid"]) - authrole = get_in(ch, ["authrole"]) - authmethod = get_in(ch, ["authmethod"]) - authprovider = get_in(ch, ["authprovider"]) - - actions = - case auth.authenticate(sig, realm, authid, challenge) do - true -> - send_to_peer( - Peer.welcome(%Welcome{ - session_id: session_id, - options: %{ - agent: "WAMPex Router", - authid: authid, - authrole: authrole, - authmethod: authmethod, - authprovider: authprovider, - roles: %{broker: %{}, dealer: %{}} - } - }), - tt, - t - ) - - [{:next_event, :internal, :transition}] - - false -> - [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}] - end - - {:ok, sl, actions} - end - - @impl true - def handle_resource( - @handle_hello, - _, - @hello, - %SL{ - data: - %Sess{ - hello_received: true - } = data - } = sl - ) do - {:ok, - %SL{ - sl - | data: %Sess{ - data - | error: "wamp.error.protocol_violation" - } - }, [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}]} - end - - @impl true - def handle_resource( - @handle_subscribe, - _, - @subscribe, - %SL{ - data: - %Sess{ - transport: tt, - transport_pid: t, - subscriptions: subs, - realm: realm, - subscribe: {:subscribe, ri, opts, topic}, - db: db - } = data - } = sl - ) do - id = get_global_id() - - wc = - case opts do - %{"match" => "wildcard"} -> - ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "") - true - - _ -> - ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}}) - false - end - - send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t) - - {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic, wc} | subs]}}, - [{:next_event, :internal, :transition}]} - end - - @impl true - def handle_resource( - @handle_unsubscribe, - _, - @unsubscribe, - %SL{ - data: %Sess{ - realm: realm, - transport: tt, - transport_pid: t, - db: db, - subscriptions: subs, - unsubscribe: {:unsubscribe, rid, subscription_id} - } - } = sl - ) do - subs = remove_subscription(subs, subscription_id, realm, db) - - send_to_peer(Broker.unsubscribed(%Unsubscribed{request_id: rid}), tt, t) - - {:ok, %SL{sl | data: %Sess{sl.data | subscriptions: subs}}, - [{:next_event, :internal, :transition}]} - end - - @impl true - def handle_resource( - @handle_publish, - _, - @publish, - %SL{ - data: %Sess{ - proxy: proxy, - realm: realm, - transport: tt, - transport_pid: t, - db: db, - publish: {:publish, rid, opts, topic, arg_l, arg_kw} - } - } = sl - ) do - pub_id = get_global_id() - - {_, _, subs} = - {db, {realm, topic}, []} - |> get_subscribers() - |> get_prefix() - |> get_wildcard() - - subs = Enum.uniq(subs) - - Enum.each(subs, fn {id, {pid, node}} -> - send( - {proxy, node}, - {%Event{ - subscription_id: id, - publication_id: pub_id, - arg_list: arg_l, - arg_kw: arg_kw, - options: opts - }, pid} - ) - end) - - case opts do - %{acknowledge: true} -> - send_to_peer(Broker.published(%Published{request_id: rid, publication_id: pub_id}), tt, t) - - %{} -> - :noop - end - - {:ok, sl, [{:next_event, :internal, :transition}]} - end - - @impl true - def handle_resource( - @handle_unregister, - _, - @unregister, - %SL{ - data: - %Sess{ - db: db, - transport: tt, - transport_pid: t, - registrations: regs, - realm: realm, - unregister: {:unregister, request_id, registration_id} - } = data - } = sl - ) do - regs = remove_registration(regs, realm, registration_id, db) - - send_to_peer(Dealer.unregistered(%Unregistered{request_id: request_id}), tt, t) - {:ok, %SL{sl | data: %Sess{data | registrations: regs}}} - end - - @impl true - def handle_resource( - @handle_register, - _, - @register, - %SL{ - data: - %Sess{ - transport: tt, - transport_pid: t, - registrations: regs, - db: db, - realm: realm, - register: {:register, rid, _opts, procedure} - } = data - } = sl - ) do - id = get_global_id() - ClusterKV.put(db, realm, procedure, {id, {self(), Node.self()}}) - regd = %Registered{request_id: rid, registration_id: id} - send_to_peer(Dealer.registered(regd), tt, t) - - {:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}}, - [{:next_event, :internal, :transition}]} - end - - @impl true - def handle_resource( - @handle_call, - _, - @call, - %SL{ - data: - %Sess{ - db: db, - proxy: proxy, - transport: tt, - transport_pid: t, - realm: realm, - request_id: ri, - call: {:call, call_id, dets, proc, al, akw} - } = data - } = sl - ) do - data = - with {_key, callees} <- ClusterKV.get(db, realm, proc, 500), - {id, {pid, node}} <- get_live_callee(proxy, callees) do - req_id = get_request_id(ri) - dets = Map.put(dets, "procedure", proc) - - send( - {proxy, node}, - { - { - call_id, - %Invocation{ - request_id: req_id, - registration_id: id, - options: dets, - arg_list: al, - arg_kw: akw - }, - {self(), Node.self()} - }, - pid - } - ) - - %SL{sl | data: %Sess{data | request_id: req_id}} - else - {:error, :no_live_callees} -> - send_to_peer( - Caller.call_error(%Error{ - request_id: call_id, - error: "wamp.error.no_callees", - details: %{procedure: proc} - }), - tt, - t - ) - - sl - - :not_found -> - send_to_peer( - Caller.call_error(%Error{ - request_id: call_id, - error: "wamp.error.no_registration", - details: %{procedure: proc} - }), - tt, - t - ) - - sl - end - - {:ok, data, [{:next_event, :internal, :transition}]} - end - - @impl true - def handle_resource( - @handle_yield, - _, - @yield, - %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} = - data - ) do - {call_id, _, {pid, node}} = - Enum.find(inv, fn - {_, %Invocation{request_id: ^id}, _} -> true - _ -> false - end) - - send( - {proxy, node}, - {%Result{request_id: call_id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid} - ) - - inv = - Enum.filter(inv, fn - {_, %Invocation{request_id: ^id}, _} -> false - _ -> true - end) - - {:ok, %SL{data | data: %Sess{data.data | invocations: inv}}, - [{:next_event, :internal, :transition}]} - end - - @impl true - def handle_resource( - @handle_abort, - _, - @abort, - %SL{data: %Sess{transport: tt, transport_pid: t, error: er}} = data - ) do - send_to_peer(Peer.abort(%Abort{reason: er}), tt, t) - Logger.warn("Aborting: #{data.data.error}") - {:ok, data, []} - end - - @impl true - def handle_resource( - @handle_goodbye, - _, - @goodbye, - %SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data - ) do - send_to_peer(Peer.goodbye(goodbye), tt, t) - {:ok, data, []} - end - - @impl true - def handle_resource(resource, _, state, data) do - Logger.error("No specific resource handler for #{resource} in state #{state}") - {:ok, data, []} - end - - @impl true - def handle_info( - {call_id, %Invocation{} = e, from}, - _, - %SL{data: %Sess{transport: tt, transport_pid: t, invocations: inv}} = data - ) do - send_to_peer(Dealer.invocation(e), tt, t) - {:ok, %SL{data | data: %Sess{data.data | invocations: [{call_id, e, from} | inv]}}, []} - end - - @impl true - def handle_info(%Event{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do - send_to_peer(Broker.event(e), tt, t) - {:ok, data, []} - end - - @impl true - def handle_info(%Result{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do - send_to_peer(Dealer.result(e), tt, t) - {:ok, data, []} - end - - @impl true - def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do - {:ok, %SL{data | data: %Sess{sess | message: message}}, - [{:next_event, :internal, :message_received}]} - end - - @impl true - def handle_info(event, state, data) do - Logger.warn("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}") - {:ok, data, []} - end - - @impl true - def handle_termination(reason, _, %SL{ - data: %Sess{ - db: db, - transport: tt, - transport_pid: t, - registrations: regs, - realm: realm, - subscriptions: subs - } - }) do - Logger.warn("Router session terminating #{inspect(reason)}") - - Enum.each(regs, fn {id, _proc} -> - remove_registration(regs, realm, id, db) - end) - - Enum.each(subs, fn {id, _proc, _} -> - remove_subscription(subs, id, realm, db) - end) - - send_to_peer( - Peer.abort(%Abort{reason: "wamp.error.protocol_violation"}), - tt, - t - ) - end - - defp remove_subscription(subs, subscription_id, realm, db) do - {id, topic, wc} = - sub = - Enum.find(subs, fn - {^subscription_id, _topic, _} -> true - _ -> false - end) - - {key, filter} = - case wc do - true -> - key = ClusterKV.get_wildcard_key(topic, ".", ":", "") - - {key, - fn {id, values}, value -> - {id, - Enum.filter(values, fn - {_, ^value} -> false - _ -> true - end)} - end} - - false -> - {topic, - fn {id, values}, value -> - {id, List.delete(values, value)} - end} - end - - ClusterKV.update(db, realm, key, {id, {self(), Node.self()}}, filter) - List.delete(subs, sub) - end - - defp remove_registration(regs, realm, registration_id, db) do - {id, proc} = - reg = - Enum.find(regs, fn - {^registration_id, _proc} -> true - _ -> false - end) - - filter = fn {id, values}, value -> - {id, List.delete(values, value)} - end - - ClusterKV.update(db, realm, proc, {id, {self(), Node.self()}}, filter) - List.delete(regs, reg) - end - - defp get_live_callee(_proxy, []), do: {:error, :no_live_callees} - - defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do - case GenServer.call({proxy, node}, {:is_up, pid}) do - true -> c - false -> get_live_callee(proxy, t) - end - end - - defp get_subscribers({db, {keyspace, key}, acc}) do - case ClusterKV.get(db, keyspace, key, 500) do - {_, subscribers} -> {db, {keyspace, key}, acc ++ subscribers} - _ -> {db, {keyspace, key}, acc} - end - end - - defp get_prefix({db, {keyspace, key}, acc}) do - l = ClusterKV.prefix(db, keyspace, key, ".", 2, 500) - {db, {keyspace, key}, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)} - end - - defp get_wildcard({db, {keyspace, key}, acc}) do - l = ClusterKV.wildcard(db, keyspace, key, ".", ":", "") - {db, {keyspace, key}, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)} - end - - defp send_to_peer(msg, transport, pid) do - transport.send_request(pid, remove_nil_values(msg)) - end - - defp maybe_update_response(data, {:update, key, resp}) do - {%SL{data | data: Map.put(data.data, key, resp)}, resp} - end - - defp maybe_update_response(data, resp), do: {data, resp} - - defp remove_nil_values(message) do - message = - case Enum.reverse(message) do - [map | t] when map == %{} -> t - [nil | t] -> t - m -> m - end - - message = - case message do - [list | t] when list == [] -> - t - - [nil | t] -> - t - - m -> - m - end - - Enum.reverse(message) - end - - defp get_request_id(current_id) when current_id == @max_id do - 1 - end - - defp get_request_id(current_id) do - current_id + 1 - end - - defp handle_message(msg, roles) do - Enum.reduce_while(roles, nil, fn r, _ -> - try do - res = r.handle(msg) - {:halt, res} - rescue - FunctionClauseError -> {:cont, nil} - end - end) - end -end diff --git a/lib/router/transports/web_socket.ex b/lib/router/transports/web_socket.ex deleted file mode 100644 index ae36b57..0000000 --- a/lib/router/transports/web_socket.ex +++ /dev/null @@ -1,104 +0,0 @@ -defmodule Wampex.Router.Transports.WebSocket do - @moduledoc false - @behaviour :cowboy_websocket - - require Logger - - alias __MODULE__ - alias Wampex.Router.Session - alias Wampex.Serializers.{JSON, MessagePack} - - @protocol_header "sec-websocket-protocol" - @json 'wamp.2.json' - @msgpack "wamp.2.msgpack" - - defstruct [:serializer, :session] - - def send_request(transport, message) do - send(transport, {:send_request, message}) - end - - @impl true - def init(req, %{db: db, proxy: proxy}) do - {:ok, serializer, protocol} = get_serializer(req) - req = :cowboy_req.set_resp_header(@protocol_header, protocol, req) - {:cowboy_websocket, req, {%WebSocket{serializer: serializer}, db, proxy}} - end - - @impl true - def websocket_init({state, db, proxy}) do - {:ok, session} = start_session(db, proxy) - Logger.info("Websocket Initialized: #{inspect(state)}") - {:ok, %WebSocket{state | session: session}} - end - - @impl true - def websocket_handle({type, payload}, state) do - Logger.debug("Received #{type} Payload: #{payload}") - handle_payload({payload, state}) - {:ok, state, :hibernate} - end - - @impl true - def websocket_handle(:ping, state) do - {[:pong], state, :hibernate} - end - - @impl true - def websocket_handle(unknown, state) do - Logger.warn("Unknown websocket frame #{inspect(unknown)}") - {:ok, state, :hibernate} - end - - @impl true - def websocket_info({:send_request, message}, %WebSocket{serializer: s} = state) do - {[{s.data_type(), s.serialize!(message)}], state, :hibernate} - end - - @impl true - def websocket_info( - {:EXIT, _pid = session_pid, reason}, - %WebSocket{session: session_pid} = state - ) do - Logger.warn("Session ended because #{inspect(reason)}, closing connection") - {:stop, state} - end - - @impl true - def websocket_info(event, state) do - Logger.debug("Received unhandled event: #{inspect(event)}") - {:ok, state} - end - - @impl true - def terminate(reason, _req, _state) do - Logger.info("Websocket closing for reason #{inspect(reason)}") - :ok - end - - defp handle_payload({payload, %WebSocket{serializer: s} = state}) do - payload - |> s.deserialize!() - |> handle_event(state) - end - - defp handle_event(ms, %WebSocket{session: s} = state) do - send(s, {:set_message, ms}) - {:ok, state} - end - - defp start_session(db, proxy) do - Session.start_link(%Session{db: db, proxy: proxy, transport: WebSocket, transport_pid: self()}) - end - - defp get_serializer(req) do - @protocol_header - |> :cowboy_req.parse_header(req) - |> parse_protocol() - end - - defp parse_protocol([]), do: Logger.error("Unknown protocol") - defp parse_protocol([@json | _]), do: {:ok, JSON, @json} - defp parse_protocol([@msgpack | _]), do: {:ok, MessagePack, @msgpack} - defp parse_protocol([_ | t]), do: parse_protocol(t) -end diff --git a/lib/transport.ex b/lib/transport.ex deleted file mode 100644 index b801962..0000000 --- a/lib/transport.ex +++ /dev/null @@ -1,13 +0,0 @@ -defmodule Wampex.Transport do - @moduledoc "Behaviour for Transports" - @callback send_request(transport :: atom() | pid(), message :: Wampex.message()) :: :ok - @callback start_link( - url: binary(), - session: Wampex.Client.Session.t(), - protocol: binary(), - serializer: module(), - opts: [] - ) :: - {:ok, pid} - | {:error, term} -end diff --git a/mix.exs b/mix.exs index 33ccc3e..3c60089 100644 --- a/mix.exs +++ b/mix.exs @@ -37,23 +37,13 @@ defmodule Wampex.MixProject do defp deps do [ - # {:cluster_kv, path: "../cluster_kv"}, - {:cluster_kv, - git: "https://gitlab.com/entropealabs/cluster_kv.git", - tag: "4c36b8d68f40711cd167edb9917d32a992f49561"}, - {:cors_plug, "~> 2.0"}, {:credo, "~> 1.2", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false}, - {:ecto_sql, "~> 3.0"}, {:ex_doc, "~> 0.21", only: :dev, runtime: false}, {:excoveralls, "~> 0.12.2", only: [:dev, :test], runtime: false}, {:jason, "~> 1.1"}, {:msgpack, "~> 0.7.0"}, - {:pbkdf2, "~> 2.0"}, - {:plug_cowboy, "~> 2.1"}, - {:postgrex, ">= 0.0.0"}, - {:states_language, "~> 0.2"}, - {:websockex, "~> 0.4"} + {:pbkdf2, "~> 2.0"} ] end diff --git a/priv/repo/migrations/20200319210508_create_realms.exs b/priv/repo/migrations/20200319210508_create_realms.exs index 0ec59d6..86f165b 100644 --- a/priv/repo/migrations/20200319210508_create_realms.exs +++ b/priv/repo/migrations/20200319210508_create_realms.exs @@ -18,7 +18,7 @@ defmodule Wampex.Router.Authentication.Repo.Migrations.CreateRealms do add(:salt, :string, null: false) add(:iterations, :integer, null: false) add(:keylen, :integer, null: false) - add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all)) + add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all), null: false) timestamps() end diff --git a/test/wampex_test.exs b/test/wampex_test.exs index 4e874f9..b3fbc6a 100644 --- a/test/wampex_test.exs +++ b/test/wampex_test.exs @@ -351,6 +351,21 @@ defmodule WampexTest do assert is_binary(id) end + @tag :client + test "admin callee is invoked and responds with error" do + caller_name = TestAdminErrorCaller + Client.start_link(name: caller_name, session: @session) + + assert {:error, 48, "Error registering user", %{}, [], %{}} = + Client.send_request( + caller_name, + Caller.call(%Call{ + procedure: "admin.create_user", + arg_kw: %{authid: "chris", password: "woot!", realm: "not.real"} + }) + ) + end + @tag :client test "callee is invoked and responds and caller gets result" do callee_name = TestCalleeRespond