+++ /dev/null
-defmodule Wampex.Authentication do
- @moduledoc false
-
- alias Wampex.Crypto
-
- @enforce_keys [:authid, :authmethods, :secret]
- defstruct [:authid, :authmethods, :secret]
-
- @callback handle(challenge :: binary(), auth :: __MODULE__.t()) :: {binary(), map()}
-
- @type t :: %__MODULE__{
- authid: binary(),
- authmethods: [binary()],
- secret: binary()
- }
-
- def handle(
- {"wampcra", %{"challenge" => ch, "salt" => salt, "iterations" => it, "keylen" => len}},
- auth
- ) do
- {auth
- |> Map.get(:secret)
- |> Crypto.pbkdf2(salt, it, len)
- |> Crypto.hash_challenge(ch), %{}}
- end
-
- def handle({"wampcra", %{"challenge" => ch}}, auth) do
- {auth
- |> Map.get(:secret)
- |> Crypto.hash_challenge(ch), %{}}
- end
-end
+++ /dev/null
-defmodule Wampex.Client do
- @moduledoc """
- Documentation for Wampex.
- """
- use Supervisor
-
- alias Wampex.Client.Session, as: Sess
- alias Wampex.Roles.{Callee, Subscriber}
- alias Wampex.Roles.Callee.Register
- alias Wampex.Roles.Subscriber.Subscribe
-
- @spec start_link(name: atom(), session_data: Sess.t()) ::
- {:ok, pid()}
- | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
- def start_link(name: name, session: session_data) when is_atom(name) do
- Supervisor.start_link(__MODULE__, {name, session_data}, name: name)
- end
-
- @spec init({atom(), Sess.t()}) ::
- {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
- def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do
- session = session_name(name)
- subscriber_registry = subscriber_registry_name(name)
- callee_registry = callee_registry_name(name)
- transport = transport_name(name)
- session_data = %Sess{session_data | name: name, transport_pid: transport}
-
- children = [
- {Sess, [{:local, session}, session_data, []]},
- {t, [url: url, session: session, protocol: p, serializer: s, opts: [name: transport]]},
- {Registry,
- [keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]},
- {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]}
- ]
-
- Supervisor.init(children, strategy: :one_for_all, max_restarts: 0)
- end
-
- @spec session_name(module()) :: module()
- def session_name(name), do: Module.concat([name, Session])
- @spec subscriber_registry_name(module()) :: module()
- def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry])
- @spec callee_registry_name(module()) :: module()
- def callee_registry_name(name), do: Module.concat([name, CalleeRegistry])
- @spec transport_name(module()) :: module()
- def transport_name(name), do: Module.concat([name, Transport])
-
- @spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) ::
- {:ok, integer()}
- def subscribe(name, %Subscribe{topic: t} = sub, timeout \\ 5000) do
- {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(sub), timeout)
- Registry.register(subscriber_registry_name(name), id, t)
- {:ok, id}
- end
-
- @spec register(name :: module(), register :: Register.t(), timeout :: integer()) ::
- {:ok, integer()}
- def register(name, %Register{procedure: p} = reg, timeout \\ 5000) do
- {:ok, id} = Sess.send_request(session_name(name), Callee.register(reg), timeout)
- Registry.register(callee_registry_name(name), id, p)
- {:ok, id}
- end
-
- @spec send_request(name :: module(), request :: Wampex.message(), timeout :: integer()) ::
- term()
- def send_request(name, request, timeout \\ 5000) do
- Sess.send_request(session_name(name), request, timeout)
- end
-
- @spec cast_send_request(name :: module(), Wampex.message()) :: :ok
- def cast_send_request(name, request) do
- Sess.cast_send_request(session_name(name), request)
- end
-end
+++ /dev/null
-defmodule Wampex.Client.Session do
- @moduledoc """
- A state machine based process for managing a WAMP Session. Utilizes StatesLanguage to implement the gen_statem process. See priv/session.json for the JSON representation
- """
- use StatesLanguage, data: "priv/client.json"
-
- @max_id 9_007_199_254_740_992
-
- alias StatesLanguage, as: SL
- alias Wampex.Realm
- alias Wampex.Roles.Peer
- alias Wampex.Roles.Peer.{Authenticate, Hello}
- alias Wampex.Serializers.MessagePack
- alias __MODULE__, as: Sess
- alias Wampex.Client
- alias Wampex.Client.Transports.WebSocket
-
- @yield 70
- @hello 1
- @abort 3
-
- @enforce_keys [:url, :roles]
-
- defstruct [
- :id,
- :url,
- :transport_pid,
- :message,
- :event,
- :invocation,
- :goodbye,
- :error,
- :realm,
- :name,
- :roles,
- :challenge,
- :interrupt,
- message_queue: [],
- request_id: 0,
- protocol: "wamp.2.msgpack",
- transport: WebSocket,
- serializer: MessagePack,
- requests: []
- ]
-
- @type t :: %__MODULE__{
- id: integer() | nil,
- url: binary(),
- transport_pid: pid() | module() | nil,
- message: Wampex.message() | nil,
- event: Wampex.event() | nil,
- invocation: Wampex.invocation() | nil,
- goodbye: binary() | nil,
- realm: Realm.t(),
- name: module() | nil,
- roles: [module()],
- challenge: {binary(), binary()} | nil,
- interrupt: integer() | nil,
- message_queue: [],
- request_id: integer(),
- protocol: binary(),
- transport: module(),
- serializer: module(),
- requests: []
- }
-
- ## States
- @established "Established"
- @init "Init"
- @challenge "Challenge"
- @abort "Abort"
- @goodbye "Goodbye"
- @event "Event"
- @invocation "Invocation"
- @interrupt "Interrupt"
- @handshake "Handshake"
-
- ## Resources
- @init_transport "InitTransport"
- @wait_transport "WaitForTransport"
- @hello "Hello"
- @handle_challenge "HandleChallenge"
- @handle_established "HandleEstablished"
- @handle_abort "HandleAbort"
- @handle_goodbye "HandleGoodbye"
- @handle_event "HandleEvent"
- @handle_invocation "HandleInvocation"
- @handle_message "HandleMessage"
- @handle_interrupt "HandleInterrupt"
- @handle_handshake "HandleHandshake"
-
- @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message(), from :: pid()) ::
- :ok
- def cast_send_request(name, request, pid) do
- __MODULE__.cast(name, {:send_request, request, pid})
- end
-
- @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message()) :: :ok
- def cast_send_request(name, request) do
- __MODULE__.cast(name, {:send_request, request})
- end
-
- @spec send_request(name :: atom() | pid(), request :: Wampex.message(), timeout :: integer()) ::
- term()
- def send_request(name, request, timeout) do
- __MODULE__.call(name, {:send_request, request}, timeout)
- end
-
- @impl true
- def handle_call(
- {:send_request, request},
- from,
- @established,
- %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
- ) do
- request_id = do_send(r_id, tt, t, request)
-
- {:ok,
- %SL{
- data
- | data: %Sess{
- sess
- | request_id: request_id,
- requests: [{request_id, from} | sess.requests]
- }
- }, []}
- end
-
- @impl true
- def handle_call(
- {:send_request, request},
- from,
- _,
- %SL{data: %Sess{message_queue: mq} = sess} = data
- ) do
- Logger.debug("Queueing request: #{inspect(request)}")
- {:ok, %SL{data | data: %Sess{sess | message_queue: [{request, from} | mq]}}, []}
- end
-
- @impl true
- def handle_cast(
- {:send_request, request, from},
- @established,
- %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
- ) do
- request_id = do_send(r_id, tt, t, request)
-
- {:ok,
- %SL{
- data
- | data: %Sess{
- sess
- | request_id: request_id,
- requests: [{request_id, from} | sess.requests]
- }
- }, []}
- end
-
- @impl true
- def handle_cast(
- {:send_request, request},
- @established,
- %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
- ) do
- request_id = do_send(r_id, tt, t, request)
-
- {:ok,
- %SL{
- data
- | data: %Sess{sess | request_id: request_id}
- }, []}
- end
-
- @impl true
- def handle_cast(
- {:send_request, request},
- _,
- %SL{data: %Sess{message_queue: mq} = sess} = data
- ) do
- Logger.debug("Queueing request: #{inspect(request)}")
- {:ok, %SL{data | data: %Sess{sess | message_queue: [{request, nil} | mq]}}, []}
- end
-
- @impl true
- def handle_resource(
- @init_transport,
- _,
- @wait_transport,
- data
- ) do
- Logger.debug("Waiting for transport to connect...")
- {:ok, %SL{data | data: %Sess{data.data | roles: [Peer | data.data.roles]}}, []}
- end
-
- @impl true
- def handle_resource(
- @hello,
- _,
- @init,
- %SL{
- data: %Sess{
- transport: tt,
- transport_pid: t,
- roles: roles,
- realm: %Realm{name: realm, authentication: auth}
- }
- } = data
- ) do
- message = maybe_authentication(%Hello{realm: realm, roles: roles}, auth)
- tt.send_request(t, Peer.hello(message))
- {:ok, data, [{:next_event, :internal, :hello_sent}]}
- end
-
- @impl true
- def handle_resource(
- @handle_challenge,
- _,
- @challenge,
- %SL{
- data: %Sess{
- transport: tt,
- transport_pid: t,
- challenge: challenge,
- realm: %Realm{authentication: auth}
- }
- } = sl
- ) do
- {signature, extra} = auth.__struct__.handle(challenge, auth)
- tt.send_request(t, Peer.authenticate(%Authenticate{signature: signature, extra: extra}))
- {:ok, sl, [{:next_event, :internal, :challenged}]}
- end
-
- @impl true
- def handle_resource(
- @goodbye,
- _,
- @handle_goodbye,
- %SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data
- ) do
- tt.send_message(t, Peer.goodbye(goodbye))
- {:ok, data, []}
- end
-
- @impl true
- def handle_resource(
- @handle_message,
- _,
- _,
- %SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data
- ) do
- Logger.debug("Handling Message #{inspect(msg)}")
- {actions, id, response} = handle_message(msg, roles)
- {%SL{data: sess} = data, response} = maybe_update_response(data, response)
- {requests, actions} = resp = handle_response(id, actions, requests, response)
- Logger.debug("Response: #{inspect(resp)}")
- {:ok, %SL{data | data: %Sess{sess | requests: requests}}, actions}
- end
-
- @impl true
- def handle_resource(@handle_abort, _, @abort, data) do
- Logger.warn("Aborting: #{data.data.error}")
- {:ok, data, []}
- end
-
- @impl true
- def handle_resource(
- @handle_established,
- _,
- @established,
- %SL{
- data:
- %Sess{
- request_id: r_id,
- transport: tt,
- transport_pid: t,
- message_queue: mq,
- requests: reqs
- } = sess
- } = data
- ) do
- {request_id, requests} = send_message_queue(r_id, mq, tt, t, reqs)
- requests = remove_cast_requests(requests)
-
- {:ok,
- %SL{
- data
- | data: %Sess{sess | requests: requests, request_id: request_id, message_queue: []}
- }, []}
- end
-
- @impl true
- def handle_resource(@handle_handshake, _, @handshake, data) do
- {:ok, data, []}
- end
-
- @impl true
- def handle_resource(
- @handle_event,
- _,
- @event,
- %SL{data: %Sess{name: name, event: event}} = sl
- ) do
- Logger.debug("Received Event #{inspect(event)}")
-
- sub = Client.subscriber_registry_name(name)
-
- Registry.dispatch(sub, elem(event, 1), fn entries ->
- for {pid, _topic} <- entries, do: send(pid, event)
- end)
-
- {:ok, sl, [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_invocation,
- _,
- @invocation,
- %SL{data: %Sess{name: name, invocation: invocation}} = sl
- ) do
- Logger.debug("Received Invocation #{inspect(invocation)}")
- reg = Client.callee_registry_name(name)
-
- Registry.dispatch(reg, elem(invocation, 2), fn entries ->
- for {pid, _procedure} <- entries, do: send(pid, invocation)
- end)
-
- {:ok, sl, [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_interrupt,
- _,
- @interrupt,
- %SL{data: %Sess{interrupt: {id, opts}, name: name}} = data
- ) do
- [{callee, _procedure}] = Registry.lookup(Client.callee_registry_name(name), id)
- send(callee, {:interrupt, id, opts})
- {:ok, data, [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(resource, _, state, data) do
- Logger.error("No specific resource handler for #{resource} in state #{state}")
- {:ok, data, []}
- end
-
- @impl true
- def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do
- {:ok, %SL{data | data: %Sess{sess | message: message}},
- [{:next_event, :internal, :message_received}]}
- end
-
- defp handle_response(nil, actions, requests, _), do: {requests, actions}
-
- defp handle_response(id, actions, requests, response) do
- Enum.reduce_while(requests, {requests, actions}, fn
- {^id, nil}, _ ->
- {:halt, {remove_request(id, requests), actions}}
-
- {^id, from}, _ ->
- {:halt, {remove_request(id, requests), [get_response_action(from, response) | actions]}}
-
- {_, _}, acc ->
- {:cont, acc}
- end)
- end
-
- defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do
- send(from, {:progress, arg_l, arg_kw})
- []
- end
-
- defp get_response_action(from, response), do: {:reply, from, response}
-
- defp maybe_update_response(data, {:update, key, resp}) do
- {%SL{data | data: Map.put(data.data, key, resp)}, resp}
- end
-
- defp maybe_update_response(data, resp), do: {data, resp}
-
- defp do_send(r_id, tt, t, message) do
- {request_id, message} = maybe_inject_request_id(r_id, message)
- tt.send_request(t, remove_nil_values(message))
- request_id
- end
-
- defp maybe_authentication(message, nil), do: message
-
- defp maybe_authentication(%Hello{} = message, auth) do
- %Hello{message | options: Map.from_struct(auth)}
- end
-
- defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1)
-
- defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {r_id, message}
- defp maybe_inject_request_id(r_id, [@hello | _] = message), do: {r_id, message}
- defp maybe_inject_request_id(r_id, [@abort | _] = message), do: {r_id, message}
-
- defp maybe_inject_request_id(r_id, message) do
- request_id = get_request_id(r_id)
- {request_id, List.insert_at(message, 1, request_id)}
- end
-
- defp get_request_id(current_id) when current_id == @max_id do
- 1
- end
-
- defp get_request_id(current_id) do
- current_id + 1
- end
-
- defp remove_request(id, requests) do
- Enum.filter(requests, fn
- {^id, _} -> false
- {_id, _} -> true
- end)
- end
-
- defp send_message_queue(r_id, [], _, _, reqs), do: {r_id, reqs}
-
- defp send_message_queue(r_id, mq, tt, t, reqs) do
- mq
- |> Enum.reverse()
- |> Enum.reduce({r_id, reqs}, fn {request, from}, {id, requests} ->
- r = do_send(id, tt, t, request)
- {r, [{r, from} | requests]}
- end)
- end
-
- def remove_cast_requests([]), do: []
-
- def remove_cast_requests(requests) do
- Enum.filter(requests, fn
- {_, nil} -> false
- {_, _} -> true
- end)
- end
-
- defp handle_message(msg, roles) do
- Enum.reduce_while(roles, nil, fn r, _ ->
- try do
- res = r.handle(msg)
- {:halt, res}
- rescue
- FunctionClauseError -> {:cont, nil}
- end
- end)
- end
-end
+++ /dev/null
-defmodule Wampex.Client.Transports.WebSocket do
- use WebSockex
-
- @behaviour Wampex.Transport
-
- alias WebSockex.Conn
-
- require Logger
-
- @header "Sec-WebSocket-Protocol"
- @ping_interval 20_000
-
- @impl true
- def send_request(t, data) do
- WebSockex.cast(t, {:send_request, data})
- end
-
- @impl true
- def start_link(
- url: url,
- session: session,
- protocol: protocol,
- serializer: serializer,
- opts: opts
- ) do
- url
- |> Conn.new(extra_headers: [{@header, protocol}])
- |> WebSockex.start_link(
- __MODULE__,
- %{session: session, serializer: serializer, connected: false},
- opts
- )
- end
-
- @impl true
- def handle_connect(%Conn{host: h, port: p, path: pa, query: q} = _conn, state) do
- Logger.debug("Connected to #{h}:#{p}#{pa}?#{q}")
- Process.send_after(self(), :ping, @ping_interval)
- send(state.session, {:connected, true})
- {:ok, %{state | connected: true}}
- end
-
- @impl true
- def handle_ping(_ping, state) do
- {:reply, :pong, state}
- end
-
- @impl true
- def handle_pong(_pong, state) do
- {:ok, state}
- end
-
- @impl true
- def handle_cast({:send_request, data}, %{serializer: s} = state) do
- Logger.debug("Sending Request #{inspect(data)}")
- {:reply, {s.data_type(), s.serialize!(data)}, state}
- end
-
- @impl true
- def handle_cast({:connected, resp}, state) do
- send(resp, {:connected, state.connected})
- {:ok, state}
- end
-
- @impl true
- def handle_info(:ping, state) do
- Process.send_after(self(), :ping, @ping_interval)
- {:reply, :ping, state}
- end
-
- @impl true
- def handle_frame({type, message}, %{serializer: s} = state) do
- message = s.deserialize!(message)
- Logger.debug("Received #{type} data: #{inspect(message)}")
- send(state.session, {:set_message, message})
- {:ok, state}
- end
-end
+++ /dev/null
-defmodule Wampex.Realm do
- @moduledoc "Defines the WAMP Realm"
- alias Wampex.Authentication
-
- @enforce_keys [:name]
- defstruct [:name, :authentication]
-
- @type t :: %__MODULE__{
- name: binary(),
- authentication: Authentication.t() | nil
- }
-end
+++ /dev/null
-defmodule Wampex.Router do
- use Supervisor
-
- alias Wampex.Router.{Admin, Authentication, Proxy, REST}
- alias Wampex.Router.Authentication.EnsureDefaultAdmin
- alias Wampex.Router.Transports.WebSocket
-
- @spec start_link(
- name: module(),
- port: pos_integer(),
- topologies: [],
- replicas: integer(),
- quorum: integer(),
- admin_realm: String.t(),
- admin_authid: String.t(),
- admin_password: String.t()
- ) ::
- {:ok, pid()}
- | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
- def start_link(
- name: name,
- port: port,
- topologies: topologies,
- replicas: repls,
- quorum: quorum,
- admin_realm: admin_realm,
- admin_authid: admin_authid,
- admin_password: admin_password
- )
- when is_atom(name) do
- Supervisor.start_link(
- __MODULE__,
- {name, port, topologies, repls, quorum, admin_realm, admin_authid, admin_password},
- name: name
- )
- end
-
- @spec init(
- {module(), pos_integer(), list(), integer(), integer(), String.t(), String.t(),
- String.t()}
- ) ::
- {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
- def init({name, port, topologies, replicas, quorum, admin_realm, admin_authid, admin_password}) do
- children = [
- Authentication.Repo,
- {EnsureDefaultAdmin, [uri: admin_realm, authid: admin_authid, password: admin_password]},
- {ClusterKV,
- [
- name: db_name(name),
- topologies: topologies,
- replicas: replicas,
- quorum: quorum
- ]},
- {Proxy, [name: proxy_name(name)]},
- {Admin,
- [name: admin_name(name), db: db_name(name), realm: admin_realm, proxy: proxy_name(name)]},
- {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch(name)]}
- ]
-
- Supervisor.init(children, strategy: :one_for_one, max_restarts: 10, max_seconds: 10)
- end
-
- def db_name(name), do: Module.concat([name, KV])
- def proxy_name(name), do: Module.concat([name, Proxy])
- def admin_name(name), do: Module.concat([name, Admin])
-
- defp dispatch(name) do
- [
- {:_,
- [
- {"/ws", WebSocket, %{db: db_name(name), proxy: proxy_name(name)}},
- {:_, Plug.Cowboy.Handler, {REST, []}}
- ]}
- ]
- end
-end
+++ /dev/null
-defmodule Wampex.Router.Admin do
- @moduledoc false
- use GenServer
- require Logger
- alias Wampex.Roles.Dealer.{Invocation, Result}
- alias Wampex.Router.Authentication.{User, Realm}
- alias Wampex.Router.Session
-
- @procedures [
- "admin.create_user",
- "admin.create_realm"
- ]
-
- def start_link(name: name, db: db, realm: realm, proxy: proxy) do
- GenServer.start_link(__MODULE__, {db, realm, proxy}, name: name)
- end
-
- def init({db, realm, proxy}) do
- {:ok, %{db: db, realm: realm, proxy: proxy, regs: []}, {:continue, :ok}}
- end
-
- def handle_continue(:ok, %{db: db, realm: realm, regs: regs} = state) do
- regs = register_procedures(db, realm, regs)
- {:noreply, %{state | regs: regs}}
- end
-
- def register_procedures(db, realm, regs) do
- Enum.reduce(@procedures, regs, fn proc, acc ->
- val = {Session.get_global_id(), {self(), Node.self()}}
- ClusterKV.put(db, realm, proc, val)
- [{proc, val} | acc]
- end)
- end
-
- def handle_info(
- {req_id,
- %Invocation{
- arg_kw: %{"realm" => realm, "authid" => authid, "password" => password},
- options: %{"procedure" => "admin.create_user"}
- } = event, {pid, node}},
- %{proxy: proxy} = state
- ) do
- realm = Realm.get(uri: realm)
- %User{id: id} = User.create(authid: authid, password: password, realm: realm)
- Logger.info("Admin handled event: #{inspect(event)}")
- send({proxy, node}, {%Result{request_id: req_id, arg_list: [id]}, pid})
- {:noreply, state}
- end
-end
+++ /dev/null
-defmodule Wampex.Router.Authentication do
- @moduledoc false
-
- require Logger
-
- alias Wampex.Crypto
- alias Wampex.Serializers.JSON
- alias Wampex.Router.Authentication.{Realm, User}
-
- @wampcra "wampcra"
- @auth_provider "userdb"
- @auth_role "user"
-
- def authenticate?(methods) do
- can_auth(methods)
- end
-
- def method, do: @wampcra
-
- def challenge(realm, authid, session_id) do
- %Realm{} = realm = Realm.get(uri: realm)
- %User{} = user = User.get(authid: authid, realm: realm)
- now = DateTime.to_iso8601(DateTime.utc_now())
-
- %{
- challenge:
- JSON.serialize!(%{
- nonce: Crypto.random_string(user.keylen),
- authprovider: @auth_provider,
- authid: authid,
- timestamp: now,
- authrole: @auth_role,
- authmethod: @wampcra,
- session: session_id
- }),
- salt: user.salt,
- keylen: user.keylen,
- iterations: user.iterations
- }
- end
-
- def authenticate(signature, realm, authid, %{
- challenge: challenge
- }) do
- authid
- |> get_secret(realm)
- |> Crypto.hash_challenge(challenge)
- |> :pbkdf2.compare_secure(signature)
- end
-
- defp get_secret(authid, uri) do
- realm = Realm.get(uri: uri)
- %User{password: password} = User.get(authid: authid, realm: realm)
- password
- end
-
- defp can_auth([]), do: false
- defp can_auth([@wampcra | _]), do: true
- defp can_auth([_ | t]), do: can_auth(t)
-end
+++ /dev/null
-defmodule Wampex.Router.Authentication.EnsureDefaultAdmin do
- @moduledoc false
- require Logger
- use GenServer
-
- alias Wampex.Router.Authentication.{Realm, User}
-
- def start_link(uri: uri, authid: authid, password: password) do
- GenServer.start_link(__MODULE__, {uri, authid, password})
- end
-
- def init({uri, authid, password}) do
- %Realm{} = realm = Realm.create(uri: uri)
- %User{} = user = User.create(authid: authid, password: password, realm: realm)
- Logger.info("Realm: #{inspect(realm)}")
- Logger.info("User: #{inspect(user)}")
- :ignore
- end
-end
+++ /dev/null
-defmodule Wampex.Router.Authentication.Realm do
- @moduledoc """
- CREATE TABLE authentication.realms (
- id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
- uri STRING(255) NOT NULL UNIQUE,
- parent STRING NULL,
- inserted_at TIMESTAMP NOT NULL,
- updated_at TIMESTAMP NOT NULL
- );
-
- """
- use Ecto.Schema
-
- alias __MODULE__
- alias Wampex.Router.Authentication.Repo
-
- @primary_key {:id, :binary_id, autogenerate: false, read_after_writes: true}
- @foreign_key_type :binary_id
- schema "realms" do
- field(:uri, :string)
- field(:parent, :string)
- timestamps()
- end
-
- def get(uri: uri) do
- Repo.get_by(Realm, uri: uri)
- end
-
- def create(uri: uri) do
- try do
- {:ok, r} =
- %Realm{uri: uri, parent: uri}
- |> Repo.insert()
-
- r
- rescue
- _er ->
- get(uri: uri)
- end
- end
-end
+++ /dev/null
-defmodule Wampex.Router.Authentication.Repo do
- use Ecto.Repo,
- otp_app: :wampex,
- adapter: Ecto.Adapters.Postgres
-end
+++ /dev/null
-defmodule Wampex.Router.Authentication.User do
- @moduledoc """
- CREATE TABLE authentication.users (
- id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
- authid STRING(255) NOT NULL,
- password STRING NOT NULL,
- salt STRING NOT NULL,
- iterations INT NOT NULL,
- keylen INT NOT NULL,
- realm_id UUID NOT NULL REFERENCES authentication.realms (id) ON DELETE CASCADE,
- inserted_at TIMESTAMP NOT NULL,
- updated_at TIMESTAMP NOT NULL,
- UNIQUE (authid, realm_id),
- INDEX (authid)
- );
-
- """
-
- use Ecto.Schema
- alias __MODULE__
- alias Ecto.Migration
- alias Wampex.Crypto
- alias Wampex.Router.Authentication.{Realm, Repo}
-
- @primary_key {:id, :binary_id, autogenerate: false, read_after_writes: true}
- @foreign_key_type :binary_id
- schema "users" do
- field(:authid, :string)
- field(:password, :string)
- field(:salt, :string)
- field(:iterations, :integer)
- field(:keylen, :integer)
- belongs_to(:realm, Realm)
- timestamps()
- end
-
- def get(authid: authid, realm: realm) do
- Repo.get_by(User, authid: authid, realm_id: realm.id)
- end
-
- def create(authid: authid, password: password, realm: realm) do
- keylen = Application.get_env(:wampex, :keylen)
- salt = Crypto.random_string(keylen)
- iterations = Enum.random(10_000..50_000)
-
- try do
- password = Crypto.pbkdf2(password, salt, iterations, keylen)
-
- {:ok, u} =
- %User{
- authid: authid,
- password: password,
- realm: realm,
- salt: salt,
- iterations: iterations,
- keylen: keylen
- }
- |> Repo.insert()
-
- u
- rescue
- _er ->
- get(authid: authid, realm: realm)
- end
- end
-end
+++ /dev/null
-defmodule Wampex.Router.Proxy do
- @moduledoc false
- use GenServer
-
- def start_link(name: name) do
- GenServer.start_link(__MODULE__, :ok, name: name)
- end
-
- def init(:ok) do
- {:ok, []}
- end
-
- def handle_info({e, to}, state) do
- send(to, e)
- {:noreply, state}
- end
-
- def handle_call({:is_up, pid}, _from, s), do: {:reply, Process.alive?(pid), s}
-end
+++ /dev/null
-defmodule Wampex.Router.REST do
- @moduledoc false
- use Plug.Router
- import Plug.Conn
-
- plug(CORSPlug, origin: ["*"])
- plug(:match)
- plug(:dispatch)
-
- get "/favicon.ico" do
- send_resp(conn, 200, "ok")
- end
-
- get "/alivez" do
- send_resp(conn, 200, "ok")
- end
-
- get "/readyz" do
- send_resp(conn, 200, "ok")
- end
-
- match _ do
- send_resp(conn, 404, "oops")
- end
-end
+++ /dev/null
-defmodule Wampex.Router.Session do
- @moduledoc """
- A state machine based process for managing a WAMP Session. Utilizes StatesLanguage to implement the gen_statem process. See priv/session.json for the JSON representation
- """
- use StatesLanguage, data: "priv/router.json"
-
- @max_id 9_007_199_254_740_992
-
- alias __MODULE__, as: Sess
- alias StatesLanguage, as: SL
- alias Wampex.Roles.{Broker, Caller, Dealer, Peer}
- alias Wampex.Roles.Peer.{Abort, Challenge, Error, Welcome}
- alias Wampex.Router
- alias Wampex.Serializers.JSON
- alias Broker.{Subscribed, Published, Event, Unsubscribed}
- alias Dealer.{Invocation, Registered, Result, Unregistered}
- alias Router.Authentication
-
- @enforce_keys [:transport, :transport_pid]
- defstruct [
- :id,
- :db,
- :proxy,
- :transport,
- :transport_pid,
- :message,
- :hello,
- :authenticate,
- :register,
- :unregister,
- :call,
- :yield,
- :subscribe,
- :unsubscribe,
- :publish,
- :realm,
- :name,
- :goodbye,
- :peer_information,
- :challenge,
- error: "wamp.error.protocol_violation",
- authentication: Authentication,
- roles: [Peer, Broker, Dealer],
- registrations: [],
- subscriptions: [],
- invocations: [],
- message_queue: [],
- request_id: 0,
- requests: [],
- hello_received: false
- ]
-
- @type t :: %__MODULE__{
- id: integer() | nil,
- db: module() | nil,
- proxy: module() | nil,
- transport_pid: pid() | module() | nil,
- message: Wampex.message() | nil,
- hello: Wampex.hello() | nil,
- authenticate: Wampex.authenticate() | nil,
- register: Wampex.register() | nil,
- unregister: Wampex.unregister() | nil,
- call: Wampex.call() | nil,
- yield: Wampex.yield() | nil,
- subscribe: Wampex.subscribe() | nil,
- unsubscribe: Wampex.unsubscribe() | nil,
- publish: Wampex.publish() | nil,
- goodbye: binary() | nil,
- realm: String.t() | nil,
- name: module() | nil,
- challenge: map() | nil,
- roles: [module()],
- registrations: [],
- subscriptions: [],
- invocations: [],
- message_queue: [],
- request_id: integer(),
- requests: [],
- hello_received: boolean()
- }
-
- ## States
- @init "Init"
- @established "Established"
- @hello "Hello"
- @authenticate "Authenticate"
- @call "Call"
- @yield "Yield"
- @register "Register"
- @unregister "Unregister"
- @subscribe "Subscribe"
- @unsubscribe "Unsubscribe"
- @publish "Publish"
- # @error "Error"
- @abort "Abort"
- @goodbye "Goodbye"
-
- ## Resources
- @handle_init "HandleInit"
- @handle_established "HandleEstablished"
- @handle_message "HandleMessage"
- @handle_hello "HandleHello"
- @handle_authenticate "HandleAuthenticate"
- @handle_call "HandleCall"
- @handle_yield "HandleYield"
- @handle_register "HandleRegister"
- @handle_unregister "HandleUnregister"
- @handle_subscribe "HandleSubscribe"
- @handle_unsubscribe "HandleUnsubscribe"
- @handle_publish "HandlePublish"
- # @handle_interrupt "HandleInterrupt"
- @handle_abort "HandleAbort"
- @handle_goodbye "HandleGoodbye"
- # @handle_cancel "HandleCancel"
-
- def get_global_id do
- Enum.random(0..@max_id)
- end
-
- @impl true
- def handle_resource(
- @handle_init,
- _,
- @init,
- data
- ) do
- debug("Init")
-
- {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}},
- [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_established,
- _,
- @established,
- data
- ) do
- debug("Established")
- {:ok, data, []}
- end
-
- @impl true
- def handle_resource(
- @handle_message,
- _,
- _,
- %SL{data: %Sess{message: msg, roles: roles}} = data
- ) do
- debug("Handling Message #{inspect(msg)}")
-
- {data, actions} =
- case handle_message(msg, roles) do
- {actions, _id, response} ->
- {data, response} = maybe_update_response(data, response)
- debug("Response: #{inspect(response)}")
- {data, actions}
-
- nil ->
- {data, [{:next_event, :internal, :abort}]}
- end
-
- {:ok, data, actions}
- end
-
- @impl true
- def handle_resource(
- @handle_hello,
- _,
- @hello,
- %SL{
- data:
- %Sess{
- id: id,
- transport: tt,
- transport_pid: t,
- hello: {:hello, realm, dets},
- hello_received: false,
- authentication: auth
- } = data
- } = sl
- ) do
- Logger.info("Hello #{inspect(dets)}")
-
- {actions, challenge} =
- case dets do
- %{"authid" => ai, "authmethods" => am} ->
- case auth.authenticate?(am) do
- true ->
- ch = auth.challenge(realm, ai, id)
-
- chal = %Challenge{
- auth_method: auth.method(),
- options: ch
- }
-
- send_to_peer(
- Peer.challenge(chal),
- tt,
- t
- )
-
- {[], ch}
-
- false ->
- {[{:next_event, :internal, :abort}], nil}
- end
-
- %{} ->
- {[{:next_event, :internal, :abort}], nil}
- end
-
- {:ok,
- %SL{
- sl
- | data: %Sess{
- data
- | challenge: challenge,
- hello_received: true,
- realm: realm,
- peer_information: dets
- }
- }, [{:next_event, :internal, :transition}] ++ actions}
- end
-
- @impl true
- def handle_resource(
- @handle_authenticate,
- _,
- @authenticate,
- %SL{
- data: %Sess{
- id: session_id,
- transport: tt,
- transport_pid: t,
- authentication: auth,
- challenge: challenge,
- realm: realm,
- authenticate: {:authenticate, sig, _dets}
- }
- } = sl
- ) do
- ch = JSON.deserialize!(challenge.challenge)
- authid = get_in(ch, ["authid"])
- authrole = get_in(ch, ["authrole"])
- authmethod = get_in(ch, ["authmethod"])
- authprovider = get_in(ch, ["authprovider"])
-
- actions =
- case auth.authenticate(sig, realm, authid, challenge) do
- true ->
- send_to_peer(
- Peer.welcome(%Welcome{
- session_id: session_id,
- options: %{
- agent: "WAMPex Router",
- authid: authid,
- authrole: authrole,
- authmethod: authmethod,
- authprovider: authprovider,
- roles: %{broker: %{}, dealer: %{}}
- }
- }),
- tt,
- t
- )
-
- [{:next_event, :internal, :transition}]
-
- false ->
- [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}]
- end
-
- {:ok, sl, actions}
- end
-
- @impl true
- def handle_resource(
- @handle_hello,
- _,
- @hello,
- %SL{
- data:
- %Sess{
- hello_received: true
- } = data
- } = sl
- ) do
- {:ok,
- %SL{
- sl
- | data: %Sess{
- data
- | error: "wamp.error.protocol_violation"
- }
- }, [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}]}
- end
-
- @impl true
- def handle_resource(
- @handle_subscribe,
- _,
- @subscribe,
- %SL{
- data:
- %Sess{
- transport: tt,
- transport_pid: t,
- subscriptions: subs,
- realm: realm,
- subscribe: {:subscribe, ri, opts, topic},
- db: db
- } = data
- } = sl
- ) do
- id = get_global_id()
-
- wc =
- case opts do
- %{"match" => "wildcard"} ->
- ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "")
- true
-
- _ ->
- ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}})
- false
- end
-
- send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t)
-
- {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic, wc} | subs]}},
- [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_unsubscribe,
- _,
- @unsubscribe,
- %SL{
- data: %Sess{
- realm: realm,
- transport: tt,
- transport_pid: t,
- db: db,
- subscriptions: subs,
- unsubscribe: {:unsubscribe, rid, subscription_id}
- }
- } = sl
- ) do
- subs = remove_subscription(subs, subscription_id, realm, db)
-
- send_to_peer(Broker.unsubscribed(%Unsubscribed{request_id: rid}), tt, t)
-
- {:ok, %SL{sl | data: %Sess{sl.data | subscriptions: subs}},
- [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_publish,
- _,
- @publish,
- %SL{
- data: %Sess{
- proxy: proxy,
- realm: realm,
- transport: tt,
- transport_pid: t,
- db: db,
- publish: {:publish, rid, opts, topic, arg_l, arg_kw}
- }
- } = sl
- ) do
- pub_id = get_global_id()
-
- {_, _, subs} =
- {db, {realm, topic}, []}
- |> get_subscribers()
- |> get_prefix()
- |> get_wildcard()
-
- subs = Enum.uniq(subs)
-
- Enum.each(subs, fn {id, {pid, node}} ->
- send(
- {proxy, node},
- {%Event{
- subscription_id: id,
- publication_id: pub_id,
- arg_list: arg_l,
- arg_kw: arg_kw,
- options: opts
- }, pid}
- )
- end)
-
- case opts do
- %{acknowledge: true} ->
- send_to_peer(Broker.published(%Published{request_id: rid, publication_id: pub_id}), tt, t)
-
- %{} ->
- :noop
- end
-
- {:ok, sl, [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_unregister,
- _,
- @unregister,
- %SL{
- data:
- %Sess{
- db: db,
- transport: tt,
- transport_pid: t,
- registrations: regs,
- realm: realm,
- unregister: {:unregister, request_id, registration_id}
- } = data
- } = sl
- ) do
- regs = remove_registration(regs, realm, registration_id, db)
-
- send_to_peer(Dealer.unregistered(%Unregistered{request_id: request_id}), tt, t)
- {:ok, %SL{sl | data: %Sess{data | registrations: regs}}}
- end
-
- @impl true
- def handle_resource(
- @handle_register,
- _,
- @register,
- %SL{
- data:
- %Sess{
- transport: tt,
- transport_pid: t,
- registrations: regs,
- db: db,
- realm: realm,
- register: {:register, rid, _opts, procedure}
- } = data
- } = sl
- ) do
- id = get_global_id()
- ClusterKV.put(db, realm, procedure, {id, {self(), Node.self()}})
- regd = %Registered{request_id: rid, registration_id: id}
- send_to_peer(Dealer.registered(regd), tt, t)
-
- {:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}},
- [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_call,
- _,
- @call,
- %SL{
- data:
- %Sess{
- db: db,
- proxy: proxy,
- transport: tt,
- transport_pid: t,
- realm: realm,
- request_id: ri,
- call: {:call, call_id, dets, proc, al, akw}
- } = data
- } = sl
- ) do
- data =
- with {_key, callees} <- ClusterKV.get(db, realm, proc, 500),
- {id, {pid, node}} <- get_live_callee(proxy, callees) do
- req_id = get_request_id(ri)
- dets = Map.put(dets, "procedure", proc)
-
- send(
- {proxy, node},
- {
- {
- call_id,
- %Invocation{
- request_id: req_id,
- registration_id: id,
- options: dets,
- arg_list: al,
- arg_kw: akw
- },
- {self(), Node.self()}
- },
- pid
- }
- )
-
- %SL{sl | data: %Sess{data | request_id: req_id}}
- else
- {:error, :no_live_callees} ->
- send_to_peer(
- Caller.call_error(%Error{
- request_id: call_id,
- error: "wamp.error.no_callees",
- details: %{procedure: proc}
- }),
- tt,
- t
- )
-
- sl
-
- :not_found ->
- send_to_peer(
- Caller.call_error(%Error{
- request_id: call_id,
- error: "wamp.error.no_registration",
- details: %{procedure: proc}
- }),
- tt,
- t
- )
-
- sl
- end
-
- {:ok, data, [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_yield,
- _,
- @yield,
- %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} =
- data
- ) do
- {call_id, _, {pid, node}} =
- Enum.find(inv, fn
- {_, %Invocation{request_id: ^id}, _} -> true
- _ -> false
- end)
-
- send(
- {proxy, node},
- {%Result{request_id: call_id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid}
- )
-
- inv =
- Enum.filter(inv, fn
- {_, %Invocation{request_id: ^id}, _} -> false
- _ -> true
- end)
-
- {:ok, %SL{data | data: %Sess{data.data | invocations: inv}},
- [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_abort,
- _,
- @abort,
- %SL{data: %Sess{transport: tt, transport_pid: t, error: er}} = data
- ) do
- send_to_peer(Peer.abort(%Abort{reason: er}), tt, t)
- Logger.warn("Aborting: #{data.data.error}")
- {:ok, data, []}
- end
-
- @impl true
- def handle_resource(
- @handle_goodbye,
- _,
- @goodbye,
- %SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data
- ) do
- send_to_peer(Peer.goodbye(goodbye), tt, t)
- {:ok, data, []}
- end
-
- @impl true
- def handle_resource(resource, _, state, data) do
- Logger.error("No specific resource handler for #{resource} in state #{state}")
- {:ok, data, []}
- end
-
- @impl true
- def handle_info(
- {call_id, %Invocation{} = e, from},
- _,
- %SL{data: %Sess{transport: tt, transport_pid: t, invocations: inv}} = data
- ) do
- send_to_peer(Dealer.invocation(e), tt, t)
- {:ok, %SL{data | data: %Sess{data.data | invocations: [{call_id, e, from} | inv]}}, []}
- end
-
- @impl true
- def handle_info(%Event{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
- send_to_peer(Broker.event(e), tt, t)
- {:ok, data, []}
- end
-
- @impl true
- def handle_info(%Result{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
- send_to_peer(Dealer.result(e), tt, t)
- {:ok, data, []}
- end
-
- @impl true
- def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do
- {:ok, %SL{data | data: %Sess{sess | message: message}},
- [{:next_event, :internal, :message_received}]}
- end
-
- @impl true
- def handle_info(event, state, data) do
- Logger.warn("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}")
- {:ok, data, []}
- end
-
- @impl true
- def handle_termination(reason, _, %SL{
- data: %Sess{
- db: db,
- transport: tt,
- transport_pid: t,
- registrations: regs,
- realm: realm,
- subscriptions: subs
- }
- }) do
- Logger.warn("Router session terminating #{inspect(reason)}")
-
- Enum.each(regs, fn {id, _proc} ->
- remove_registration(regs, realm, id, db)
- end)
-
- Enum.each(subs, fn {id, _proc, _} ->
- remove_subscription(subs, id, realm, db)
- end)
-
- send_to_peer(
- Peer.abort(%Abort{reason: "wamp.error.protocol_violation"}),
- tt,
- t
- )
- end
-
- defp remove_subscription(subs, subscription_id, realm, db) do
- {id, topic, wc} =
- sub =
- Enum.find(subs, fn
- {^subscription_id, _topic, _} -> true
- _ -> false
- end)
-
- {key, filter} =
- case wc do
- true ->
- key = ClusterKV.get_wildcard_key(topic, ".", ":", "")
-
- {key,
- fn {id, values}, value ->
- {id,
- Enum.filter(values, fn
- {_, ^value} -> false
- _ -> true
- end)}
- end}
-
- false ->
- {topic,
- fn {id, values}, value ->
- {id, List.delete(values, value)}
- end}
- end
-
- ClusterKV.update(db, realm, key, {id, {self(), Node.self()}}, filter)
- List.delete(subs, sub)
- end
-
- defp remove_registration(regs, realm, registration_id, db) do
- {id, proc} =
- reg =
- Enum.find(regs, fn
- {^registration_id, _proc} -> true
- _ -> false
- end)
-
- filter = fn {id, values}, value ->
- {id, List.delete(values, value)}
- end
-
- ClusterKV.update(db, realm, proc, {id, {self(), Node.self()}}, filter)
- List.delete(regs, reg)
- end
-
- defp get_live_callee(_proxy, []), do: {:error, :no_live_callees}
-
- defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do
- case GenServer.call({proxy, node}, {:is_up, pid}) do
- true -> c
- false -> get_live_callee(proxy, t)
- end
- end
-
- defp get_subscribers({db, {keyspace, key}, acc}) do
- case ClusterKV.get(db, keyspace, key, 500) do
- {_, subscribers} -> {db, {keyspace, key}, acc ++ subscribers}
- _ -> {db, {keyspace, key}, acc}
- end
- end
-
- defp get_prefix({db, {keyspace, key}, acc}) do
- l = ClusterKV.prefix(db, keyspace, key, ".", 2, 500)
- {db, {keyspace, key}, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)}
- end
-
- defp get_wildcard({db, {keyspace, key}, acc}) do
- l = ClusterKV.wildcard(db, keyspace, key, ".", ":", "")
- {db, {keyspace, key}, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)}
- end
-
- defp send_to_peer(msg, transport, pid) do
- transport.send_request(pid, remove_nil_values(msg))
- end
-
- defp maybe_update_response(data, {:update, key, resp}) do
- {%SL{data | data: Map.put(data.data, key, resp)}, resp}
- end
-
- defp maybe_update_response(data, resp), do: {data, resp}
-
- defp remove_nil_values(message) do
- message =
- case Enum.reverse(message) do
- [map | t] when map == %{} -> t
- [nil | t] -> t
- m -> m
- end
-
- message =
- case message do
- [list | t] when list == [] ->
- t
-
- [nil | t] ->
- t
-
- m ->
- m
- end
-
- Enum.reverse(message)
- end
-
- defp get_request_id(current_id) when current_id == @max_id do
- 1
- end
-
- defp get_request_id(current_id) do
- current_id + 1
- end
-
- defp handle_message(msg, roles) do
- Enum.reduce_while(roles, nil, fn r, _ ->
- try do
- res = r.handle(msg)
- {:halt, res}
- rescue
- FunctionClauseError -> {:cont, nil}
- end
- end)
- end
-end
+++ /dev/null
-defmodule Wampex.Router.Transports.WebSocket do
- @moduledoc false
- @behaviour :cowboy_websocket
-
- require Logger
-
- alias __MODULE__
- alias Wampex.Router.Session
- alias Wampex.Serializers.{JSON, MessagePack}
-
- @protocol_header "sec-websocket-protocol"
- @json 'wamp.2.json'
- @msgpack "wamp.2.msgpack"
-
- defstruct [:serializer, :session]
-
- def send_request(transport, message) do
- send(transport, {:send_request, message})
- end
-
- @impl true
- def init(req, %{db: db, proxy: proxy}) do
- {:ok, serializer, protocol} = get_serializer(req)
- req = :cowboy_req.set_resp_header(@protocol_header, protocol, req)
- {:cowboy_websocket, req, {%WebSocket{serializer: serializer}, db, proxy}}
- end
-
- @impl true
- def websocket_init({state, db, proxy}) do
- {:ok, session} = start_session(db, proxy)
- Logger.info("Websocket Initialized: #{inspect(state)}")
- {:ok, %WebSocket{state | session: session}}
- end
-
- @impl true
- def websocket_handle({type, payload}, state) do
- Logger.debug("Received #{type} Payload: #{payload}")
- handle_payload({payload, state})
- {:ok, state, :hibernate}
- end
-
- @impl true
- def websocket_handle(:ping, state) do
- {[:pong], state, :hibernate}
- end
-
- @impl true
- def websocket_handle(unknown, state) do
- Logger.warn("Unknown websocket frame #{inspect(unknown)}")
- {:ok, state, :hibernate}
- end
-
- @impl true
- def websocket_info({:send_request, message}, %WebSocket{serializer: s} = state) do
- {[{s.data_type(), s.serialize!(message)}], state, :hibernate}
- end
-
- @impl true
- def websocket_info(
- {:EXIT, _pid = session_pid, reason},
- %WebSocket{session: session_pid} = state
- ) do
- Logger.warn("Session ended because #{inspect(reason)}, closing connection")
- {:stop, state}
- end
-
- @impl true
- def websocket_info(event, state) do
- Logger.debug("Received unhandled event: #{inspect(event)}")
- {:ok, state}
- end
-
- @impl true
- def terminate(reason, _req, _state) do
- Logger.info("Websocket closing for reason #{inspect(reason)}")
- :ok
- end
-
- defp handle_payload({payload, %WebSocket{serializer: s} = state}) do
- payload
- |> s.deserialize!()
- |> handle_event(state)
- end
-
- defp handle_event(ms, %WebSocket{session: s} = state) do
- send(s, {:set_message, ms})
- {:ok, state}
- end
-
- defp start_session(db, proxy) do
- Session.start_link(%Session{db: db, proxy: proxy, transport: WebSocket, transport_pid: self()})
- end
-
- defp get_serializer(req) do
- @protocol_header
- |> :cowboy_req.parse_header(req)
- |> parse_protocol()
- end
-
- defp parse_protocol([]), do: Logger.error("Unknown protocol")
- defp parse_protocol([@json | _]), do: {:ok, JSON, @json}
- defp parse_protocol([@msgpack | _]), do: {:ok, MessagePack, @msgpack}
- defp parse_protocol([_ | t]), do: parse_protocol(t)
-end
+++ /dev/null
-defmodule Wampex.Transport do
- @moduledoc "Behaviour for Transports"
- @callback send_request(transport :: atom() | pid(), message :: Wampex.message()) :: :ok
- @callback start_link(
- url: binary(),
- session: Wampex.Client.Session.t(),
- protocol: binary(),
- serializer: module(),
- opts: []
- ) ::
- {:ok, pid}
- | {:error, term}
-end
defp deps do
[
- # {:cluster_kv, path: "../cluster_kv"},
- {:cluster_kv,
- git: "https://gitlab.com/entropealabs/cluster_kv.git",
- tag: "4c36b8d68f40711cd167edb9917d32a992f49561"},
- {:cors_plug, "~> 2.0"},
{:credo, "~> 1.2", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
- {:ecto_sql, "~> 3.0"},
{:ex_doc, "~> 0.21", only: :dev, runtime: false},
{:excoveralls, "~> 0.12.2", only: [:dev, :test], runtime: false},
{:jason, "~> 1.1"},
{:msgpack, "~> 0.7.0"},
- {:pbkdf2, "~> 2.0"},
- {:plug_cowboy, "~> 2.1"},
- {:postgrex, ">= 0.0.0"},
- {:states_language, "~> 0.2"},
- {:websockex, "~> 0.4"}
+ {:pbkdf2, "~> 2.0"}
]
end
add(:salt, :string, null: false)
add(:iterations, :integer, null: false)
add(:keylen, :integer, null: false)
- add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all))
+ add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all), null: false)
timestamps()
end
assert is_binary(id)
end
+ @tag :client
+ test "admin callee is invoked and responds with error" do
+ caller_name = TestAdminErrorCaller
+ Client.start_link(name: caller_name, session: @session)
+
+ assert {:error, 48, "Error registering user", %{}, [], %{}} =
+ Client.send_request(
+ caller_name,
+ Caller.call(%Call{
+ procedure: "admin.create_user",
+ arg_kw: %{authid: "chris", password: "woot!", realm: "not.real"}
+ })
+ )
+ end
+
@tag :client
test "callee is invoked and responds and caller gets result" do
callee_name = TestCalleeRespond