-defmodule Wampex.Authentication do
+defmodule Wampex.Client.Authentication do
@moduledoc false
alias Wampex.Crypto
-defmodule Wampex.Realm do
+defmodule Wampex.Client.Realm do
@moduledoc "Defines the WAMP Realm"
- alias Wampex.Authentication
+ alias Wampex.Client.Authentication
@enforce_keys [:name]
defstruct [:name, :authentication]
@max_id 9_007_199_254_740_992
alias StatesLanguage, as: SL
- alias Wampex.Realm
+ alias Wampex.Client.Realm
alias Wampex.Roles.Peer
alias Wampex.Roles.Peer.{Authenticate, Hello}
alias Wampex.Serializers.MessagePack
-defmodule Wampex.Transport do
+defmodule Wampex.Client.Transport do
@moduledoc "Behaviour for Transports"
@callback send_request(transport :: atom() | pid(), message :: Wampex.message()) :: :ok
@callback start_link(
defmodule Wampex.Client.Transports.WebSocket do
use WebSockex
- @behaviour Wampex.Transport
+ @behaviour Wampex.Client.Transport
alias WebSockex.Conn
require Logger
- @header "Sec-WebSocket-Protocol"
+ @header "sec-websocket-protocol"
@ping_interval 20_000
@impl true
+++ /dev/null
-defmodule Wampex.Crypto do
- @moduledoc false
- def hash_challenge(key, data) do
- :hmac
- |> :crypto.mac(:sha256, key, data)
- |> Base.encode64()
- end
-
- def pbkdf2(secret, salt, iterations, keylen) do
- {:ok, derived} = :pbkdf2.pbkdf2(:sha256, secret, salt, iterations, keylen)
- Base.encode64(derived)
- end
-
- def random_string(length) do
- length
- |> :crypto.strong_rand_bytes()
- |> Base.encode64()
- end
-end
+++ /dev/null
-defmodule Wampex.Role do
- @moduledoc "Behaviour for Roles"
- @callback add(map()) :: map()
- @callback handle(message :: Wampex.message()) ::
- {[:gen_statem.action()], integer() | nil, Wampex.handle_response()}
-end
+++ /dev/null
-defmodule Wampex.Roles.Broker do
- @moduledoc """
- Handles requests and responses for a Broker
- """
- alias Wampex.Role
- @behaviour Role
-
- @publish 16
- @published 17
- @subscribe 32
- @subscribed 33
- @unsubscribe 34
- @unsubscribed 35
- @event 36
-
- defmodule Event do
- @moduledoc false
- @enforce_keys [:subscription_id, :publication_id]
- defstruct [:subscription_id, :publication_id, arg_list: [], arg_kw: %{}, options: %{}]
-
- @type t :: %__MODULE__{
- subscription_id: integer(),
- publication_id: integer(),
- arg_list: list(any()),
- arg_kw: map(),
- options: map()
- }
- end
-
- defmodule Subscribed do
- @moduledoc false
- @enforce_keys [:request_id, :subscription_id]
- defstruct [:request_id, :subscription_id]
-
- @type t :: %__MODULE__{
- request_id: integer(),
- subscription_id: integer()
- }
- end
-
- defmodule Unsubscribed do
- @moduledoc false
- @enforce_keys [:request_id]
- defstruct [:request_id]
-
- @type t :: %__MODULE__{
- request_id: integer()
- }
- end
-
- defmodule Published do
- @moduledoc false
- @enforce_keys [:request_id, :publication_id]
- defstruct [:request_id, :publication_id]
-
- @type t :: %__MODULE__{
- request_id: integer(),
- publication_id: integer()
- }
- end
-
- @impl true
- def add(roles) do
- Map.put(roles, :broker, %{})
- end
-
- @spec event(Event.t()) :: Wampex.message()
- def event(%Event{
- subscription_id: si,
- publication_id: pi,
- arg_list: al,
- arg_kw: akw,
- options: opts
- }) do
- [@event, si, pi, opts, al, akw]
- end
-
- @spec subscribed(Subscribed.t()) :: Wampex.message()
- def subscribed(%Subscribed{request_id: ri, subscription_id: si}) do
- [@subscribed, ri, si]
- end
-
- @spec unsubscribed(Unsubscribed.t()) :: Wampex.message()
- def unsubscribed(%Unsubscribed{request_id: ri}) do
- [@unsubscribed, ri]
- end
-
- @spec published(Published.t()) :: Wampex.message()
- def published(%Published{request_id: ri, publication_id: pi}) do
- [@published, ri, pi]
- end
-
- @impl true
- def handle([@subscribe, request_id, opts, topic]) do
- {[{:next_event, :internal, :subscribe}], request_id,
- {:update, :subscribe, {:subscribe, request_id, opts, topic}}}
- end
-
- @impl true
- def handle([@unsubscribe, request_id, id]) do
- {[{:next_event, :internal, :unsubscribe}], request_id,
- {:update, :unsubscribe, {:unsubscribe, request_id, id}}}
- end
-
- @impl true
- def handle([@publish, id, opts, topic]) do
- handle([@publish, id, opts, topic, [], %{}])
- end
-
- @impl true
- def handle([@publish, id, opts, topic, arg_l]) do
- handle([@publish, id, opts, topic, arg_l, %{}])
- end
-
- @impl true
- def handle([@publish, id, opts, topic, arg_l, arg_kw]) do
- {[{:next_event, :internal, :publish}], id,
- {:update, :publish, {:publish, id, opts, topic, arg_l, arg_kw}}}
- end
-end
+++ /dev/null
-defmodule Wampex.Roles.Callee do
- @moduledoc """
- Handles requests and responses for a Callee
- """
-
- alias Wampex.Role
- @behaviour Role
-
- @register 64
- @registered 65
- @unregister 66
- @unregistered 67
- @invocation 68
- @yield 70
- @interrupt 69
-
- defmodule Register do
- @moduledoc false
- @enforce_keys [:procedure]
- defstruct [:procedure, options: %{}]
-
- @type t :: %__MODULE__{
- procedure: binary(),
- options: map()
- }
- end
-
- defmodule Unregister do
- @moduledoc false
- @enforce_keys [:registration_id]
- defstruct [:registration_id]
-
- @type t :: %__MODULE__{
- registration_id: integer()
- }
- end
-
- defmodule Yield do
- @moduledoc false
- @enforce_keys [:request_id]
- defstruct [:request_id, arg_list: [], arg_kw: %{}, options: %{}]
-
- @type t :: %__MODULE__{
- request_id: integer(),
- arg_list: list(any()),
- arg_kw: map(),
- options: map()
- }
- end
-
- @impl true
- def add(roles) do
- Map.put(roles, :callee, %{})
- end
-
- @spec register(Register.t()) :: Wampex.message()
- def register(%Register{procedure: p, options: opts}) do
- [@register, opts, p]
- end
-
- @spec unregister(Unregister.t()) :: Wampex.message()
- def unregister(%Unregister{registration_id: ri}) do
- [@unregister, ri]
- end
-
- @spec yield(Yield.t()) :: Wampex.message()
- def yield(%Yield{request_id: ri, arg_list: al, arg_kw: akw, options: opts}) do
- [@yield, ri, opts, al, akw]
- end
-
- @impl true
- def handle([@unregistered, request_id]) do
- {[{:next_event, :internal, :established}], request_id, {:ok, request_id}}
- end
-
- @impl true
- def handle([@registered, request_id, id]) do
- {[{:next_event, :internal, :established}], request_id, {:ok, id}}
- end
-
- @impl true
- def handle([@invocation, id, reg_id, dets]) do
- handle([@invocation, id, reg_id, dets, [], %{}])
- end
-
- @impl true
- def handle([@invocation, id, reg_id, dets, arg_l]) do
- handle([@invocation, id, dets, reg_id, arg_l, %{}])
- end
-
- @impl true
- def handle([@invocation, id, reg_id, dets, arg_l, arg_kw]) do
- {[{:next_event, :internal, :invocation}], id,
- {:update, :invocation, {:invocation, id, reg_id, dets, arg_l, arg_kw}}}
- end
-
- @impl true
- def handle([@interrupt, id, opts]) do
- {[{:next_event, :internal, :interrupt}], id, {:update, :interrupt, {id, opts}}}
- end
-end
+++ /dev/null
-defmodule Wampex.Roles.Caller do
- @moduledoc """
- Handles requests and responses for a Caller
- """
-
- alias Wampex.Role
- alias Wampex.Roles.Peer.Error
- @behaviour Role
-
- @call 48
- @cancel 49
- @error 8
- @result 50
-
- defmodule Call do
- @moduledoc false
- @enforce_keys [:procedure]
-
- defstruct [:procedure, arg_list: [], arg_kw: %{}, options: %{}]
-
- @type t :: %__MODULE__{
- procedure: binary(),
- arg_list: list(any()),
- arg_kw: map(),
- options: map()
- }
- end
-
- defmodule Cancel do
- @moduledoc false
- @enforce_keys [:request_id]
- defstruct [:request_id, options: %{}]
-
- @type t :: %__MODULE__{
- request_id: integer(),
- options: map()
- }
- end
-
- @impl true
- def add(roles) do
- Map.put(roles, :caller, %{})
- end
-
- @spec call(Call.t()) :: Wampex.message()
- def call(%Call{procedure: p, arg_list: al, arg_kw: akw, options: opts}) do
- [@call, opts, p, al, akw]
- end
-
- @spec call_error(Error.t()) :: Wampex.message()
- def call_error(%Error{request_id: rid, error: er, details: dets, arg_l: al, arg_kw: akw}) do
- [@error, @call, rid, dets, er, al, akw]
- end
-
- @spec cancel(Cancel.t()) :: Wampex.message()
- def cancel(%Cancel{request_id: ri, options: opts}) do
- [@cancel, ri, opts]
- end
-
- @impl true
- def handle([@result, id, dets]) do
- handle([@result, id, dets, [], %{}])
- end
-
- @impl true
- def handle([@result, id, dets, arg_l]) do
- handle([@result, id, dets, arg_l, %{}])
- end
-
- @impl true
- def handle([@result, id, dets, arg_l, arg_kw]) do
- {[{:next_event, :internal, :established}], id, {:ok, dets, arg_l, arg_kw}}
- end
-end
+++ /dev/null
-defmodule Wampex.Roles.Dealer do
- @moduledoc """
- Handles requests and responses for a Dealer
- """
- alias Wampex.Role
- @behaviour Role
-
- @call 48
- @result 50
- @register 64
- @registered 65
- @unregister 66
- @unregistered 67
- @invocation 68
- @yield 70
-
- defmodule Registered do
- @moduledoc false
- @enforce_keys [:request_id, :registration_id]
- defstruct [:request_id, :registration_id]
-
- @type t :: %__MODULE__{
- request_id: integer(),
- registration_id: integer()
- }
- end
-
- defmodule Unregistered do
- @moduledoc false
- @enforce_keys [:request_id]
- defstruct [:request_id]
-
- @type t :: %__MODULE__{
- request_id: integer()
- }
- end
-
- defmodule Result do
- @moduledoc false
- @enforce_keys [:request_id]
- defstruct [:request_id, arg_list: [], arg_kw: %{}, options: %{}]
-
- @type t :: %__MODULE__{
- request_id: integer(),
- arg_list: list(any()),
- arg_kw: map(),
- options: map()
- }
- end
-
- defmodule Invocation do
- @moduledoc false
- @enforce_keys [:request_id, :registration_id]
- defstruct [:request_id, :registration_id, arg_list: [], arg_kw: %{}, options: %{}]
-
- @type t :: %__MODULE__{
- request_id: integer(),
- registration_id: integer(),
- arg_list: list(any()),
- arg_kw: map(),
- options: map()
- }
- end
-
- @impl true
- def add(roles) do
- Map.put(roles, :dealer, %{})
- end
-
- @spec registered(Registered.t()) :: Wampex.message()
- def registered(%Registered{request_id: ri, registration_id: reg_id}) do
- [@registered, ri, reg_id]
- end
-
- @spec unregistered(Unregistered.t()) :: Wampex.message()
- def unregistered(%Unregistered{request_id: ri}) do
- [@unregistered, ri]
- end
-
- @spec invocation(Invocation.t()) :: Wampex.message()
- def invocation(%Invocation{
- request_id: ri,
- registration_id: reg_id,
- options: opts,
- arg_list: al,
- arg_kw: akw
- }) do
- [@invocation, ri, reg_id, opts, al, akw]
- end
-
- @spec result(Result.t()) :: Wampex.message()
- def result(%Result{
- request_id: ri,
- arg_list: al,
- arg_kw: akw,
- options: opts
- }) do
- [@result, ri, opts, al, akw]
- end
-
- @impl true
- def handle([@unregister, request_id, registration_id]) do
- {[{:next_event, :internal, :unregister}], request_id,
- {:update, :unregister, {:unregister, request_id, registration_id}}}
- end
-
- @impl true
- def handle([@register, request_id, opts, procedure]) do
- {[{:next_event, :internal, :register}], request_id,
- {:update, :register, {:register, request_id, opts, procedure}}}
- end
-
- @impl true
- def handle([@call, id, dets, proc]) do
- handle([@call, id, dets, proc, [], %{}])
- end
-
- @impl true
- def handle([@call, id, dets, proc, arg_l]) do
- handle([@call, id, dets, proc, arg_l, %{}])
- end
-
- @impl true
- def handle([@call, id, dets, proc, arg_l, arg_kw]) do
- {[{:next_event, :internal, :call}], id,
- {:update, :call, {:call, id, dets, proc, arg_l, arg_kw}}}
- end
-
- @impl true
- def handle([@yield, id, dets]) do
- handle([@yield, id, dets, [], %{}])
- end
-
- @impl true
- def handle([@yield, id, dets, arg_l]) do
- handle([@yield, id, dets, arg_l, %{}])
- end
-
- @impl true
- def handle([@yield, id, dets, arg_l, arg_kw]) do
- {[{:next_event, :internal, :yield}], id, {:update, :yield, {:yield, id, dets, arg_l, arg_kw}}}
- end
-end
+++ /dev/null
-defmodule Wampex.Roles.Peer do
- @moduledoc """
- Handles requests and responses for low-level Peer/Session interactions
- """
- alias Wampex.Role
- @behaviour Role
-
- @hello 1
- @welcome 2
- @abort 3
- @challenge 4
- @authenticate 5
- @goodbye 6
- @error 8
-
- defmodule Hello do
- @moduledoc false
- @enforce_keys [:realm, :roles]
- defstruct [:realm, :roles, options: %{}, agent: "WAMPex"]
-
- @type t :: %__MODULE__{
- realm: binary(),
- roles: nonempty_list(module()),
- agent: binary(),
- options: map()
- }
- end
-
- defmodule Welcome do
- @moduledoc false
- @enforce_keys [:session_id]
- defstruct [:session_id, options: %{}]
-
- @type t :: %__MODULE__{
- session_id: integer(),
- options: map()
- }
- end
-
- defmodule Challenge do
- @moduledoc false
- @enforce_keys [:auth_method]
- defstruct [:auth_method, options: %{}]
-
- @type t :: %__MODULE__{
- auth_method: String.t(),
- options: %{}
- }
- end
-
- defmodule Goodbye do
- @moduledoc false
- @enforce_keys [:reason]
- defstruct [:reason, options: %{}]
-
- @type t :: %__MODULE__{
- reason: binary(),
- options: map()
- }
- end
-
- defmodule Authenticate do
- @moduledoc false
- @enforce_keys [:signature, :extra]
- defstruct [:signature, :extra]
-
- @type t :: %__MODULE__{
- signature: binary(),
- extra: map()
- }
- end
-
- defmodule Abort do
- @moduledoc false
- @enforce_keys [:reason]
- defstruct [:reason, details: %{}]
-
- @type t :: %__MODULE__{
- reason: String.t(),
- details: map()
- }
- end
-
- defmodule Error do
- @moduledoc false
- @enforce_keys [:error]
- defstruct [:request_id, :error, arg_l: [], arg_kw: %{}, details: %{}]
-
- @type t :: %__MODULE__{
- request_id: integer() | nil,
- error: String.t(),
- arg_l: list(),
- arg_kw: map(),
- details: map()
- }
- end
-
- @impl true
- def add(roles), do: roles
-
- @spec hello(Hello.t()) :: Wampex.message()
- def hello(%Hello{realm: r, roles: roles, agent: agent, options: opts}) do
- options =
- Map.merge(opts, %{agent: agent, roles: Enum.reduce(roles, %{}, fn r, acc -> r.add(acc) end)})
-
- [@hello, r, options]
- end
-
- @spec welcome(Welcome.t()) :: Wampex.message()
- def welcome(%Welcome{session_id: si, options: opts}) do
- [@welcome, si, opts]
- end
-
- @spec abort(Abort.t()) :: Wampex.message()
- def abort(%Abort{reason: reason, details: opts}) do
- [@abort, opts, reason]
- end
-
- @spec challenge(Challenge.t()) :: Wampex.message()
- def challenge(%Challenge{auth_method: am, options: opts}) do
- [@challenge, am, opts]
- end
-
- @spec goodbye(Goodbye.t()) :: Wampex.message()
- def goodbye(%Goodbye{reason: r, options: opts}) do
- [@goodbye, opts, r]
- end
-
- @spec authenticate(Authenticate.t()) :: Wampex.message()
- def authenticate(%Authenticate{signature: s, extra: e}) do
- [@authenticate, s, e]
- end
-
- @impl true
- def handle([@hello, realm, dets]) do
- {[{:next_event, :internal, :hello}], nil, {:update, :hello, {:hello, realm, dets}}}
- end
-
- @impl true
- def handle([@authenticate, sig, dets]) do
- {[{:next_event, :internal, :authenticate}], nil,
- {:update, :authenticate, {:authenticate, sig, dets}}}
- end
-
- @impl true
- def handle([@welcome, session_id, _dets]) do
- {[{:next_event, :internal, :established}], nil, {:update, :id, session_id}}
- end
-
- @impl true
- def handle([@abort, _dets, reason]) do
- {[{:next_event, :internal, :abort}], nil, {:update, :error, reason}}
- end
-
- @impl true
- def handle([@goodbye, _dets, reason]) do
- {[{:next_event, :internal, :goodbye}], nil, {:update, :goodbye, reason}}
- end
-
- @impl true
- def handle([@challenge, auth_method, extra]) do
- {[{:next_event, :internal, :challenge}], nil, {:update, :challenge, {auth_method, extra}}}
- end
-
- @impl true
- def handle([@error, type, id, dets, error]) do
- handle([@error, type, id, dets, error, [], %{}])
- end
-
- @impl true
- def handle([@error, type, id, dets, error, arg_l]) do
- handle([@error, type, id, dets, error, arg_l, %{}])
- end
-
- @impl true
- def handle([@error, type, id, dets, error, arg_l, arg_kw]) do
- {[{:next_event, :internal, :established}], id, {:error, type, error, dets, arg_l, arg_kw}}
- end
-end
+++ /dev/null
-defmodule Wampex.Roles.Publisher do
- @moduledoc """
- Handles requests and responses for Publishers
- """
-
- alias Wampex.Role
- @behaviour Role
-
- @publish 16
- @published 17
-
- defmodule Publish do
- @moduledoc false
- @enforce_keys [:topic]
- defstruct [:topic, arg_list: [], arg_kw: %{}, options: %{}]
-
- @type t :: %__MODULE__{
- topic: binary(),
- arg_list: list(any()),
- arg_kw: map(),
- options: map()
- }
- end
-
- @impl true
- def add(roles) do
- Map.put(roles, :publisher, %{})
- end
-
- @spec publish(Publish.t()) :: Wampex.message()
- def publish(%Publish{topic: t, options: opts, arg_list: al, arg_kw: akw}) do
- [@publish, opts, t, al, akw]
- end
-
- @impl true
- def handle([@published, request_id, id]) do
- {[{:next_event, :internal, :established}], request_id, {:ok, id}}
- end
-end
+++ /dev/null
-defmodule Wampex.Roles.Subscriber do
- @moduledoc """
- Handles requests and responses for Subscribers
- """
-
- alias Wampex.Role
- @behaviour Role
-
- @subscribe 32
- @subscribed 33
- @unsubscribe 34
- @unsubscribed 35
- @event 36
-
- defmodule Subscribe do
- @moduledoc false
- @enforce_keys [:topic]
- defstruct [:topic, options: %{}]
-
- @type t :: %__MODULE__{
- topic: binary(),
- options: map()
- }
- end
-
- defmodule Unsubscribe do
- @moduledoc false
- @enforce_keys [:subscription_id]
- defstruct [:subscription_id]
-
- @type t :: %__MODULE__{
- subscription_id: integer()
- }
- end
-
- @impl true
- def add(roles) do
- Map.put(roles, :subscriber, %{})
- end
-
- @spec subscribe(Subscribe.t()) :: Wampex.message()
- def subscribe(%Subscribe{topic: t, options: opts}) do
- [@subscribe, opts, t]
- end
-
- @spec unsubscribe(Unsubscribe.t()) :: Wampex.message()
- def unsubscribe(%Unsubscribe{subscription_id: si}) do
- [@unsubscribe, si]
- end
-
- @impl true
- def handle([@subscribed, request_id, id]) do
- {[{:next_event, :internal, :established}], request_id, {:ok, id}}
- end
-
- @impl true
- def handle(<<@unsubscribed, request_id>>) do
- handle([@unsubscribed, request_id])
- end
-
- @impl true
- def handle([@unsubscribed, request_id]) do
- {[{:next_event, :internal, :established}], request_id, :ok}
- end
-
- @impl true
- def handle([@event, sub_id, pub_id, dets]) do
- handle([@event, sub_id, pub_id, dets, [], %{}])
- end
-
- @impl true
- def handle([@event, sub_id, pub_id, dets, arg_l]) do
- handle([@event, sub_id, pub_id, dets, arg_l, %{}])
- end
-
- @impl true
- def handle([@event, sub_id, pub_id, dets, arg_l, arg_kw]) do
- {[{:next_event, :internal, :event}], nil,
- {:update, :event, {:event, sub_id, pub_id, dets, arg_l, arg_kw}}}
- end
-end
+++ /dev/null
-defmodule Wampex.Router do
- use Supervisor
-
- alias Wampex.Router.{Admin, Authentication, Proxy, REST}
- alias Wampex.Router.Authentication.EnsureDefaultAdmin
- alias Wampex.Router.Transports.WebSocket
-
- @spec start_link(
- name: module(),
- port: pos_integer(),
- topologies: [],
- replicas: integer(),
- quorum: integer(),
- admin_realm: String.t(),
- admin_authid: String.t(),
- admin_password: String.t()
- ) ::
- {:ok, pid()}
- | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
- def start_link(
- name: name,
- port: port,
- topologies: topologies,
- replicas: repls,
- quorum: quorum,
- admin_realm: admin_realm,
- admin_authid: admin_authid,
- admin_password: admin_password
- )
- when is_atom(name) do
- Supervisor.start_link(
- __MODULE__,
- {name, port, topologies, repls, quorum, admin_realm, admin_authid, admin_password},
- name: name
- )
- end
-
- @spec init(
- {module(), pos_integer(), list(), integer(), integer(), String.t(), String.t(),
- String.t()}
- ) ::
- {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
- def init({name, port, topologies, replicas, quorum, admin_realm, admin_authid, admin_password}) do
- children = [
- Authentication.Repo,
- {EnsureDefaultAdmin, [uri: admin_realm, authid: admin_authid, password: admin_password]},
- {ClusterKV,
- [
- name: db_name(name),
- topologies: topologies,
- replicas: replicas,
- quorum: quorum
- ]},
- {Proxy, [name: proxy_name(name)]},
- {Admin,
- [name: admin_name(name), db: db_name(name), realm: admin_realm, proxy: proxy_name(name)]},
- {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch(name)]}
- ]
-
- Supervisor.init(children, strategy: :one_for_one, max_restarts: 10, max_seconds: 10)
- end
-
- def db_name(name), do: Module.concat([name, KV])
- def proxy_name(name), do: Module.concat([name, Proxy])
- def admin_name(name), do: Module.concat([name, Admin])
-
- defp dispatch(name) do
- [
- {:_,
- [
- {"/ws", WebSocket, %{db: db_name(name), proxy: proxy_name(name)}},
- {:_, Plug.Cowboy.Handler, {REST, []}}
- ]}
- ]
- end
-end
+++ /dev/null
-defmodule Wampex.Router.Admin do
- @moduledoc false
- use GenServer
- require Logger
- alias Wampex.Roles.Dealer.{Invocation, Result}
- alias Wampex.Router.Authentication.{User, Realm}
- alias Wampex.Router.Session
-
- @procedures [
- "admin.create_user",
- "admin.create_realm"
- ]
-
- def start_link(name: name, db: db, realm: realm, proxy: proxy) do
- GenServer.start_link(__MODULE__, {db, realm, proxy}, name: name)
- end
-
- def init({db, realm, proxy}) do
- {:ok, %{db: db, realm: realm, proxy: proxy, regs: []}, {:continue, :ok}}
- end
-
- def handle_continue(:ok, %{db: db, realm: realm, regs: regs} = state) do
- regs = register_procedures(db, realm, regs)
- {:noreply, %{state | regs: regs}}
- end
-
- def register_procedures(db, realm, regs) do
- Enum.reduce(@procedures, regs, fn proc, acc ->
- val = {Session.get_global_id(), {self(), Node.self()}}
- ClusterKV.put(db, realm, proc, val)
- [{proc, val} | acc]
- end)
- end
-
- def handle_info(
- {req_id,
- %Invocation{
- arg_kw: %{"realm" => realm, "authid" => authid, "password" => password},
- options: %{"procedure" => "admin.create_user"}
- } = event, {pid, node}},
- %{proxy: proxy} = state
- ) do
- realm = Realm.get(uri: realm)
- %User{id: id} = User.create(authid: authid, password: password, realm: realm)
- Logger.info("Admin handled event: #{inspect(event)}")
- send({proxy, node}, {%Result{request_id: req_id, arg_list: [id]}, pid})
- {:noreply, state}
- end
-end
+++ /dev/null
-defmodule Wampex.Router.Authentication do
- @moduledoc false
-
- require Logger
-
- alias Wampex.Crypto
- alias Wampex.Serializers.JSON
- alias Wampex.Router.Authentication.{Realm, User}
-
- @wampcra "wampcra"
- @auth_provider "userdb"
- @auth_role "user"
-
- def authenticate?(methods) do
- can_auth(methods)
- end
-
- def method, do: @wampcra
-
- def challenge(realm, authid, session_id) do
- %Realm{} = realm = Realm.get(uri: realm)
- %User{} = user = User.get(authid: authid, realm: realm)
- now = DateTime.to_iso8601(DateTime.utc_now())
-
- %{
- challenge:
- JSON.serialize!(%{
- nonce: Crypto.random_string(user.keylen),
- authprovider: @auth_provider,
- authid: authid,
- timestamp: now,
- authrole: @auth_role,
- authmethod: @wampcra,
- session: session_id
- }),
- salt: user.salt,
- keylen: user.keylen,
- iterations: user.iterations
- }
- end
-
- def authenticate(signature, realm, authid, %{
- challenge: challenge
- }) do
- authid
- |> get_secret(realm)
- |> Crypto.hash_challenge(challenge)
- |> :pbkdf2.compare_secure(signature)
- end
-
- defp get_secret(authid, uri) do
- realm = Realm.get(uri: uri)
- %User{password: password} = User.get(authid: authid, realm: realm)
- password
- end
-
- defp can_auth([]), do: false
- defp can_auth([@wampcra | _]), do: true
- defp can_auth([_ | t]), do: can_auth(t)
-end
+++ /dev/null
-defmodule Wampex.Router.Authentication.EnsureDefaultAdmin do
- @moduledoc false
- require Logger
- use GenServer
-
- alias Wampex.Router.Authentication.{Realm, User}
-
- def start_link(uri: uri, authid: authid, password: password) do
- GenServer.start_link(__MODULE__, {uri, authid, password})
- end
-
- def init({uri, authid, password}) do
- %Realm{} = realm = Realm.create(uri: uri)
- %User{} = user = User.create(authid: authid, password: password, realm: realm)
- Logger.info("Realm: #{inspect(realm)}")
- Logger.info("User: #{inspect(user)}")
- :ignore
- end
-end
+++ /dev/null
-defmodule Wampex.Router.Authentication.Realm do
- @moduledoc """
- CREATE TABLE authentication.realms (
- id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
- uri STRING(255) NOT NULL UNIQUE,
- parent STRING NULL,
- inserted_at TIMESTAMP NOT NULL,
- updated_at TIMESTAMP NOT NULL
- );
-
- """
- use Ecto.Schema
-
- alias __MODULE__
- alias Wampex.Router.Authentication.Repo
-
- @primary_key {:id, :binary_id, autogenerate: false, read_after_writes: true}
- @foreign_key_type :binary_id
- schema "realms" do
- field(:uri, :string)
- field(:parent, :string)
- timestamps()
- end
-
- def get(uri: uri) do
- Repo.get_by(Realm, uri: uri)
- end
-
- def create(uri: uri) do
- try do
- {:ok, r} =
- %Realm{uri: uri, parent: uri}
- |> Repo.insert()
-
- r
- rescue
- _er ->
- get(uri: uri)
- end
- end
-end
+++ /dev/null
-defmodule Wampex.Router.Authentication.Repo do
- use Ecto.Repo,
- otp_app: :wampex,
- adapter: Ecto.Adapters.Postgres
-end
+++ /dev/null
-defmodule Wampex.Router.Authentication.User do
- @moduledoc """
- CREATE TABLE authentication.users (
- id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
- authid STRING(255) NOT NULL,
- password STRING NOT NULL,
- salt STRING NOT NULL,
- iterations INT NOT NULL,
- keylen INT NOT NULL,
- realm_id UUID NOT NULL REFERENCES authentication.realms (id) ON DELETE CASCADE,
- inserted_at TIMESTAMP NOT NULL,
- updated_at TIMESTAMP NOT NULL,
- UNIQUE (authid, realm_id),
- INDEX (authid)
- );
-
- """
-
- use Ecto.Schema
- alias __MODULE__
- alias Ecto.Migration
- alias Wampex.Crypto
- alias Wampex.Router.Authentication.{Realm, Repo}
-
- @primary_key {:id, :binary_id, autogenerate: false, read_after_writes: true}
- @foreign_key_type :binary_id
- schema "users" do
- field(:authid, :string)
- field(:password, :string)
- field(:salt, :string)
- field(:iterations, :integer)
- field(:keylen, :integer)
- belongs_to(:realm, Realm)
- timestamps()
- end
-
- def get(authid: authid, realm: realm) do
- Repo.get_by(User, authid: authid, realm_id: realm.id)
- end
-
- def create(authid: authid, password: password, realm: realm) do
- keylen = Application.get_env(:wampex, :keylen)
- salt = Crypto.random_string(keylen)
- iterations = Enum.random(10_000..50_000)
-
- try do
- password = Crypto.pbkdf2(password, salt, iterations, keylen)
-
- {:ok, u} =
- %User{
- authid: authid,
- password: password,
- realm: realm,
- salt: salt,
- iterations: iterations,
- keylen: keylen
- }
- |> Repo.insert()
-
- u
- rescue
- _er ->
- get(authid: authid, realm: realm)
- end
- end
-end
+++ /dev/null
-defmodule Wampex.Router.Proxy do
- @moduledoc false
- use GenServer
-
- def start_link(name: name) do
- GenServer.start_link(__MODULE__, :ok, name: name)
- end
-
- def init(:ok) do
- {:ok, []}
- end
-
- def handle_info({e, to}, state) do
- send(to, e)
- {:noreply, state}
- end
-
- def handle_call({:is_up, pid}, _from, s), do: {:reply, Process.alive?(pid), s}
-end
+++ /dev/null
-defmodule Wampex.Router.REST do
- @moduledoc false
- use Plug.Router
- import Plug.Conn
-
- plug(CORSPlug, origin: ["*"])
- plug(:match)
- plug(:dispatch)
-
- get "/favicon.ico" do
- send_resp(conn, 200, "ok")
- end
-
- get "/alivez" do
- send_resp(conn, 200, "ok")
- end
-
- get "/readyz" do
- send_resp(conn, 200, "ok")
- end
-
- match _ do
- send_resp(conn, 404, "oops")
- end
-end
+++ /dev/null
-defmodule Wampex.Router.Session do
- @moduledoc """
- A state machine based process for managing a WAMP Session. Utilizes StatesLanguage to implement the gen_statem process. See priv/session.json for the JSON representation
- """
- use StatesLanguage, data: "priv/router.json"
-
- @max_id 9_007_199_254_740_992
-
- alias __MODULE__, as: Sess
- alias StatesLanguage, as: SL
- alias Wampex.Roles.{Broker, Caller, Dealer, Peer}
- alias Wampex.Roles.Peer.{Abort, Challenge, Error, Welcome}
- alias Wampex.Router
- alias Wampex.Serializers.JSON
- alias Broker.{Subscribed, Published, Event, Unsubscribed}
- alias Dealer.{Invocation, Registered, Result, Unregistered}
- alias Router.Authentication
-
- @enforce_keys [:transport, :transport_pid]
- defstruct [
- :id,
- :db,
- :proxy,
- :transport,
- :transport_pid,
- :message,
- :hello,
- :authenticate,
- :register,
- :unregister,
- :call,
- :yield,
- :subscribe,
- :unsubscribe,
- :publish,
- :realm,
- :name,
- :goodbye,
- :peer_information,
- :challenge,
- error: "wamp.error.protocol_violation",
- authentication: Authentication,
- roles: [Peer, Broker, Dealer],
- registrations: [],
- subscriptions: [],
- invocations: [],
- message_queue: [],
- request_id: 0,
- requests: [],
- hello_received: false
- ]
-
- @type t :: %__MODULE__{
- id: integer() | nil,
- db: module() | nil,
- proxy: module() | nil,
- transport_pid: pid() | module() | nil,
- message: Wampex.message() | nil,
- hello: Wampex.hello() | nil,
- authenticate: Wampex.authenticate() | nil,
- register: Wampex.register() | nil,
- unregister: Wampex.unregister() | nil,
- call: Wampex.call() | nil,
- yield: Wampex.yield() | nil,
- subscribe: Wampex.subscribe() | nil,
- unsubscribe: Wampex.unsubscribe() | nil,
- publish: Wampex.publish() | nil,
- goodbye: binary() | nil,
- realm: String.t() | nil,
- name: module() | nil,
- challenge: map() | nil,
- roles: [module()],
- registrations: [],
- subscriptions: [],
- invocations: [],
- message_queue: [],
- request_id: integer(),
- requests: [],
- hello_received: boolean()
- }
-
- ## States
- @init "Init"
- @established "Established"
- @hello "Hello"
- @authenticate "Authenticate"
- @call "Call"
- @yield "Yield"
- @register "Register"
- @unregister "Unregister"
- @subscribe "Subscribe"
- @unsubscribe "Unsubscribe"
- @publish "Publish"
- # @error "Error"
- @abort "Abort"
- @goodbye "Goodbye"
-
- ## Resources
- @handle_init "HandleInit"
- @handle_established "HandleEstablished"
- @handle_message "HandleMessage"
- @handle_hello "HandleHello"
- @handle_authenticate "HandleAuthenticate"
- @handle_call "HandleCall"
- @handle_yield "HandleYield"
- @handle_register "HandleRegister"
- @handle_unregister "HandleUnregister"
- @handle_subscribe "HandleSubscribe"
- @handle_unsubscribe "HandleUnsubscribe"
- @handle_publish "HandlePublish"
- # @handle_interrupt "HandleInterrupt"
- @handle_abort "HandleAbort"
- @handle_goodbye "HandleGoodbye"
- # @handle_cancel "HandleCancel"
-
- def get_global_id do
- Enum.random(0..@max_id)
- end
-
- @impl true
- def handle_resource(
- @handle_init,
- _,
- @init,
- data
- ) do
- debug("Init")
-
- {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}},
- [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_established,
- _,
- @established,
- data
- ) do
- debug("Established")
- {:ok, data, []}
- end
-
- @impl true
- def handle_resource(
- @handle_message,
- _,
- _,
- %SL{data: %Sess{message: msg, roles: roles}} = data
- ) do
- debug("Handling Message #{inspect(msg)}")
-
- {data, actions} =
- case handle_message(msg, roles) do
- {actions, _id, response} ->
- {data, response} = maybe_update_response(data, response)
- debug("Response: #{inspect(response)}")
- {data, actions}
-
- nil ->
- {data, [{:next_event, :internal, :abort}]}
- end
-
- {:ok, data, actions}
- end
-
- @impl true
- def handle_resource(
- @handle_hello,
- _,
- @hello,
- %SL{
- data:
- %Sess{
- id: id,
- transport: tt,
- transport_pid: t,
- hello: {:hello, realm, dets},
- hello_received: false,
- authentication: auth
- } = data
- } = sl
- ) do
- Logger.info("Hello #{inspect(dets)}")
-
- {actions, challenge} =
- case dets do
- %{"authid" => ai, "authmethods" => am} ->
- case auth.authenticate?(am) do
- true ->
- ch = auth.challenge(realm, ai, id)
-
- chal = %Challenge{
- auth_method: auth.method(),
- options: ch
- }
-
- send_to_peer(
- Peer.challenge(chal),
- tt,
- t
- )
-
- {[], ch}
-
- false ->
- {[{:next_event, :internal, :abort}], nil}
- end
-
- %{} ->
- {[{:next_event, :internal, :abort}], nil}
- end
-
- {:ok,
- %SL{
- sl
- | data: %Sess{
- data
- | challenge: challenge,
- hello_received: true,
- realm: realm,
- peer_information: dets
- }
- }, [{:next_event, :internal, :transition}] ++ actions}
- end
-
- @impl true
- def handle_resource(
- @handle_authenticate,
- _,
- @authenticate,
- %SL{
- data: %Sess{
- id: session_id,
- transport: tt,
- transport_pid: t,
- authentication: auth,
- challenge: challenge,
- realm: realm,
- authenticate: {:authenticate, sig, _dets}
- }
- } = sl
- ) do
- ch = JSON.deserialize!(challenge.challenge)
- authid = get_in(ch, ["authid"])
- authrole = get_in(ch, ["authrole"])
- authmethod = get_in(ch, ["authmethod"])
- authprovider = get_in(ch, ["authprovider"])
-
- actions =
- case auth.authenticate(sig, realm, authid, challenge) do
- true ->
- send_to_peer(
- Peer.welcome(%Welcome{
- session_id: session_id,
- options: %{
- agent: "WAMPex Router",
- authid: authid,
- authrole: authrole,
- authmethod: authmethod,
- authprovider: authprovider,
- roles: %{broker: %{}, dealer: %{}}
- }
- }),
- tt,
- t
- )
-
- [{:next_event, :internal, :transition}]
-
- false ->
- [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}]
- end
-
- {:ok, sl, actions}
- end
-
- @impl true
- def handle_resource(
- @handle_hello,
- _,
- @hello,
- %SL{
- data:
- %Sess{
- hello_received: true
- } = data
- } = sl
- ) do
- {:ok,
- %SL{
- sl
- | data: %Sess{
- data
- | error: "wamp.error.protocol_violation"
- }
- }, [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}]}
- end
-
- @impl true
- def handle_resource(
- @handle_subscribe,
- _,
- @subscribe,
- %SL{
- data:
- %Sess{
- transport: tt,
- transport_pid: t,
- subscriptions: subs,
- realm: realm,
- subscribe: {:subscribe, ri, opts, topic},
- db: db
- } = data
- } = sl
- ) do
- id = get_global_id()
-
- wc =
- case opts do
- %{"match" => "wildcard"} ->
- ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "")
- true
-
- _ ->
- ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}})
- false
- end
-
- send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t)
-
- {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic, wc} | subs]}},
- [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_unsubscribe,
- _,
- @unsubscribe,
- %SL{
- data: %Sess{
- realm: realm,
- transport: tt,
- transport_pid: t,
- db: db,
- subscriptions: subs,
- unsubscribe: {:unsubscribe, rid, subscription_id}
- }
- } = sl
- ) do
- subs = remove_subscription(subs, subscription_id, realm, db)
-
- send_to_peer(Broker.unsubscribed(%Unsubscribed{request_id: rid}), tt, t)
-
- {:ok, %SL{sl | data: %Sess{sl.data | subscriptions: subs}},
- [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_publish,
- _,
- @publish,
- %SL{
- data: %Sess{
- proxy: proxy,
- realm: realm,
- transport: tt,
- transport_pid: t,
- db: db,
- publish: {:publish, rid, opts, topic, arg_l, arg_kw}
- }
- } = sl
- ) do
- pub_id = get_global_id()
-
- {_, _, subs} =
- {db, {realm, topic}, []}
- |> get_subscribers()
- |> get_prefix()
- |> get_wildcard()
-
- subs = Enum.uniq(subs)
-
- Enum.each(subs, fn {id, {pid, node}} ->
- send(
- {proxy, node},
- {%Event{
- subscription_id: id,
- publication_id: pub_id,
- arg_list: arg_l,
- arg_kw: arg_kw,
- options: opts
- }, pid}
- )
- end)
-
- case opts do
- %{acknowledge: true} ->
- send_to_peer(Broker.published(%Published{request_id: rid, publication_id: pub_id}), tt, t)
-
- %{} ->
- :noop
- end
-
- {:ok, sl, [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_unregister,
- _,
- @unregister,
- %SL{
- data:
- %Sess{
- db: db,
- transport: tt,
- transport_pid: t,
- registrations: regs,
- realm: realm,
- unregister: {:unregister, request_id, registration_id}
- } = data
- } = sl
- ) do
- regs = remove_registration(regs, realm, registration_id, db)
-
- send_to_peer(Dealer.unregistered(%Unregistered{request_id: request_id}), tt, t)
- {:ok, %SL{sl | data: %Sess{data | registrations: regs}}}
- end
-
- @impl true
- def handle_resource(
- @handle_register,
- _,
- @register,
- %SL{
- data:
- %Sess{
- transport: tt,
- transport_pid: t,
- registrations: regs,
- db: db,
- realm: realm,
- register: {:register, rid, _opts, procedure}
- } = data
- } = sl
- ) do
- id = get_global_id()
- ClusterKV.put(db, realm, procedure, {id, {self(), Node.self()}})
- regd = %Registered{request_id: rid, registration_id: id}
- send_to_peer(Dealer.registered(regd), tt, t)
-
- {:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}},
- [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_call,
- _,
- @call,
- %SL{
- data:
- %Sess{
- db: db,
- proxy: proxy,
- transport: tt,
- transport_pid: t,
- realm: realm,
- request_id: ri,
- call: {:call, call_id, dets, proc, al, akw}
- } = data
- } = sl
- ) do
- data =
- with {_key, callees} <- ClusterKV.get(db, realm, proc, 500),
- {id, {pid, node}} <- get_live_callee(proxy, callees) do
- req_id = get_request_id(ri)
- dets = Map.put(dets, "procedure", proc)
-
- send(
- {proxy, node},
- {
- {
- call_id,
- %Invocation{
- request_id: req_id,
- registration_id: id,
- options: dets,
- arg_list: al,
- arg_kw: akw
- },
- {self(), Node.self()}
- },
- pid
- }
- )
-
- %SL{sl | data: %Sess{data | request_id: req_id}}
- else
- {:error, :no_live_callees} ->
- send_to_peer(
- Caller.call_error(%Error{
- request_id: call_id,
- error: "wamp.error.no_callees",
- details: %{procedure: proc}
- }),
- tt,
- t
- )
-
- sl
-
- :not_found ->
- send_to_peer(
- Caller.call_error(%Error{
- request_id: call_id,
- error: "wamp.error.no_registration",
- details: %{procedure: proc}
- }),
- tt,
- t
- )
-
- sl
- end
-
- {:ok, data, [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_yield,
- _,
- @yield,
- %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} =
- data
- ) do
- {call_id, _, {pid, node}} =
- Enum.find(inv, fn
- {_, %Invocation{request_id: ^id}, _} -> true
- _ -> false
- end)
-
- send(
- {proxy, node},
- {%Result{request_id: call_id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid}
- )
-
- inv =
- Enum.filter(inv, fn
- {_, %Invocation{request_id: ^id}, _} -> false
- _ -> true
- end)
-
- {:ok, %SL{data | data: %Sess{data.data | invocations: inv}},
- [{:next_event, :internal, :transition}]}
- end
-
- @impl true
- def handle_resource(
- @handle_abort,
- _,
- @abort,
- %SL{data: %Sess{transport: tt, transport_pid: t, error: er}} = data
- ) do
- send_to_peer(Peer.abort(%Abort{reason: er}), tt, t)
- Logger.warn("Aborting: #{data.data.error}")
- {:ok, data, []}
- end
-
- @impl true
- def handle_resource(
- @handle_goodbye,
- _,
- @goodbye,
- %SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data
- ) do
- send_to_peer(Peer.goodbye(goodbye), tt, t)
- {:ok, data, []}
- end
-
- @impl true
- def handle_resource(resource, _, state, data) do
- Logger.error("No specific resource handler for #{resource} in state #{state}")
- {:ok, data, []}
- end
-
- @impl true
- def handle_info(
- {call_id, %Invocation{} = e, from},
- _,
- %SL{data: %Sess{transport: tt, transport_pid: t, invocations: inv}} = data
- ) do
- send_to_peer(Dealer.invocation(e), tt, t)
- {:ok, %SL{data | data: %Sess{data.data | invocations: [{call_id, e, from} | inv]}}, []}
- end
-
- @impl true
- def handle_info(%Event{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
- send_to_peer(Broker.event(e), tt, t)
- {:ok, data, []}
- end
-
- @impl true
- def handle_info(%Result{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
- send_to_peer(Dealer.result(e), tt, t)
- {:ok, data, []}
- end
-
- @impl true
- def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do
- {:ok, %SL{data | data: %Sess{sess | message: message}},
- [{:next_event, :internal, :message_received}]}
- end
-
- @impl true
- def handle_info(event, state, data) do
- Logger.warn("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}")
- {:ok, data, []}
- end
-
- @impl true
- def handle_termination(reason, _, %SL{
- data: %Sess{
- db: db,
- transport: tt,
- transport_pid: t,
- registrations: regs,
- realm: realm,
- subscriptions: subs
- }
- }) do
- Logger.warn("Router session terminating #{inspect(reason)}")
-
- Enum.each(regs, fn {id, _proc} ->
- remove_registration(regs, realm, id, db)
- end)
-
- Enum.each(subs, fn {id, _proc, _} ->
- remove_subscription(subs, id, realm, db)
- end)
-
- send_to_peer(
- Peer.abort(%Abort{reason: "wamp.error.protocol_violation"}),
- tt,
- t
- )
- end
-
- defp remove_subscription(subs, subscription_id, realm, db) do
- {id, topic, wc} =
- sub =
- Enum.find(subs, fn
- {^subscription_id, _topic, _} -> true
- _ -> false
- end)
-
- {key, filter} =
- case wc do
- true ->
- key = ClusterKV.get_wildcard_key(topic, ".", ":", "")
-
- {key,
- fn {id, values}, value ->
- {id,
- Enum.filter(values, fn
- {_, ^value} -> false
- _ -> true
- end)}
- end}
-
- false ->
- {topic,
- fn {id, values}, value ->
- {id, List.delete(values, value)}
- end}
- end
-
- ClusterKV.update(db, realm, key, {id, {self(), Node.self()}}, filter)
- List.delete(subs, sub)
- end
-
- defp remove_registration(regs, realm, registration_id, db) do
- {id, proc} =
- reg =
- Enum.find(regs, fn
- {^registration_id, _proc} -> true
- _ -> false
- end)
-
- filter = fn {id, values}, value ->
- {id, List.delete(values, value)}
- end
-
- ClusterKV.update(db, realm, proc, {id, {self(), Node.self()}}, filter)
- List.delete(regs, reg)
- end
-
- defp get_live_callee(_proxy, []), do: {:error, :no_live_callees}
-
- defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do
- case GenServer.call({proxy, node}, {:is_up, pid}) do
- true -> c
- false -> get_live_callee(proxy, t)
- end
- end
-
- defp get_subscribers({db, {keyspace, key}, acc}) do
- case ClusterKV.get(db, keyspace, key, 500) do
- {_, subscribers} -> {db, {keyspace, key}, acc ++ subscribers}
- _ -> {db, {keyspace, key}, acc}
- end
- end
-
- defp get_prefix({db, {keyspace, key}, acc}) do
- l = ClusterKV.prefix(db, keyspace, key, ".", 2, 500)
- {db, {keyspace, key}, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)}
- end
-
- defp get_wildcard({db, {keyspace, key}, acc}) do
- l = ClusterKV.wildcard(db, keyspace, key, ".", ":", "")
- {db, {keyspace, key}, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)}
- end
-
- defp send_to_peer(msg, transport, pid) do
- transport.send_request(pid, remove_nil_values(msg))
- end
-
- defp maybe_update_response(data, {:update, key, resp}) do
- {%SL{data | data: Map.put(data.data, key, resp)}, resp}
- end
-
- defp maybe_update_response(data, resp), do: {data, resp}
-
- defp remove_nil_values(message) do
- message =
- case Enum.reverse(message) do
- [map | t] when map == %{} -> t
- [nil | t] -> t
- m -> m
- end
-
- message =
- case message do
- [list | t] when list == [] ->
- t
-
- [nil | t] ->
- t
-
- m ->
- m
- end
-
- Enum.reverse(message)
- end
-
- defp get_request_id(current_id) when current_id == @max_id do
- 1
- end
-
- defp get_request_id(current_id) do
- current_id + 1
- end
-
- defp handle_message(msg, roles) do
- Enum.reduce_while(roles, nil, fn r, _ ->
- try do
- res = r.handle(msg)
- {:halt, res}
- rescue
- FunctionClauseError -> {:cont, nil}
- end
- end)
- end
-end
+++ /dev/null
-defmodule Wampex.Router.Transports.WebSocket do
- @moduledoc false
- @behaviour :cowboy_websocket
-
- require Logger
-
- alias __MODULE__
- alias Wampex.Router.Session
- alias Wampex.Serializers.{JSON, MessagePack}
-
- @protocol_header "sec-websocket-protocol"
- @json 'wamp.2.json'
- @msgpack "wamp.2.msgpack"
-
- defstruct [:serializer, :session]
-
- def send_request(transport, message) do
- send(transport, {:send_request, message})
- end
-
- @impl true
- def init(req, %{db: db, proxy: proxy}) do
- {:ok, serializer, protocol} = get_serializer(req)
- req = :cowboy_req.set_resp_header(@protocol_header, protocol, req)
- {:cowboy_websocket, req, {%WebSocket{serializer: serializer}, db, proxy}}
- end
-
- @impl true
- def websocket_init({state, db, proxy}) do
- {:ok, session} = start_session(db, proxy)
- Logger.info("Websocket Initialized: #{inspect(state)}")
- {:ok, %WebSocket{state | session: session}}
- end
-
- @impl true
- def websocket_handle({type, payload}, state) do
- Logger.debug("Received #{type} Payload: #{payload}")
- handle_payload({payload, state})
- {:ok, state, :hibernate}
- end
-
- @impl true
- def websocket_handle(:ping, state) do
- {[:pong], state, :hibernate}
- end
-
- @impl true
- def websocket_handle(unknown, state) do
- Logger.warn("Unknown websocket frame #{inspect(unknown)}")
- {:ok, state, :hibernate}
- end
-
- @impl true
- def websocket_info({:send_request, message}, %WebSocket{serializer: s} = state) do
- {[{s.data_type(), s.serialize!(message)}], state, :hibernate}
- end
-
- @impl true
- def websocket_info(
- {:EXIT, _pid = session_pid, reason},
- %WebSocket{session: session_pid} = state
- ) do
- Logger.warn("Session ended because #{inspect(reason)}, closing connection")
- {:stop, state}
- end
-
- @impl true
- def websocket_info(event, state) do
- Logger.debug("Received unhandled event: #{inspect(event)}")
- {:ok, state}
- end
-
- @impl true
- def terminate(reason, _req, _state) do
- Logger.info("Websocket closing for reason #{inspect(reason)}")
- :ok
- end
-
- defp handle_payload({payload, %WebSocket{serializer: s} = state}) do
- payload
- |> s.deserialize!()
- |> handle_event(state)
- end
-
- defp handle_event(ms, %WebSocket{session: s} = state) do
- send(s, {:set_message, ms})
- {:ok, state}
- end
-
- defp start_session(db, proxy) do
- Session.start_link(%Session{db: db, proxy: proxy, transport: WebSocket, transport_pid: self()})
- end
-
- defp get_serializer(req) do
- @protocol_header
- |> :cowboy_req.parse_header(req)
- |> parse_protocol()
- end
-
- defp parse_protocol([]), do: Logger.error("Unknown protocol")
- defp parse_protocol([@json | _]), do: {:ok, JSON, @json}
- defp parse_protocol([@msgpack | _]), do: {:ok, MessagePack, @msgpack}
- defp parse_protocol([_ | t]), do: parse_protocol(t)
-end
+++ /dev/null
-defmodule Wampex.Serializer do
- @moduledoc "Behaviour for Serializers"
- @type data_type :: :binary | :text
- @callback data_type() :: data_type()
- @callback serialize!(data :: Wampex.message()) :: binary()
- @callback serialize(data :: Wampex.message()) :: {:ok, binary()}
- @callback deserialize!(data :: binary()) :: Wampex.message()
- @callback deserialize(data :: binary()) :: {:ok, Wampex.message()}
-end
+++ /dev/null
-defmodule Wampex.Serializers.JSON do
- @moduledoc "JSON Serializer"
- @behaviour Wampex.Serializer
-
- @impl true
- def data_type, do: :text
- @impl true
- def serialize!(data), do: Jason.encode!(data)
- @impl true
- def serialize(data), do: Jason.encode(data)
- @impl true
- def deserialize!(binary), do: Jason.decode!(binary)
- @impl true
- def deserialize(binary), do: Jason.decode(binary)
-end
+++ /dev/null
-defmodule Wampex.Serializers.MessagePack do
- @moduledoc "MessgePack Serializer"
- @behaviour Wampex.Serializer
-
- @impl true
- def data_type, do: :binary
- @impl true
- def serialize!(data), do: :msgpack.pack(data, [])
- @impl true
- def serialize(data), do: {:ok, :msgpack.pack(data, [])}
- @impl true
- def deserialize!(data) do
- {:ok, res} = :msgpack.unpack(data, [{:known_atoms, []}, {:unpack_str, :as_binary}])
- res
- end
-
- @impl true
- def deserialize(data),
- do: :msgpack.unpack(data, [{:known_atoms, []}, {:unpack_str, :as_binary}])
-end
+++ /dev/null
-defmodule Wampex do
- @moduledoc """
- Types for wampex
- """
- @type message_part :: integer() | binary() | map() | list()
- @type message :: nonempty_list(message_part())
- @type arg_list :: [] | nonempty_list(any())
- @type arg_keyword :: map()
-
- @type hello :: {:hello, realm :: String.t(), details :: map()}
-
- @type authenticate :: {:authenticate, signature :: binary(), details :: map()}
-
- @type call ::
- {:call, request_id :: integer(), details :: map(), procedure :: String.t(),
- arg_list :: arg_list(), arg_keywords :: arg_keyword()}
-
- @type yield ::
- {:yield, request_id :: integer(), details :: map(), arg_list :: arg_list(),
- arg_keywords :: arg_keyword()}
-
- @type publish ::
- {:publish, request_id :: integer(), details :: map(), topic :: String.t(),
- arg_list :: arg_list(), arg_keywords :: arg_keyword()}
-
- @type register ::
- {:register, request_id :: integer(), details :: map(), procedure :: String.t()}
-
- @type unregister :: {:unregister, request_id :: integer(), registration_id :: integer()}
-
- @type unsubscribe ::
- {:unsubscribe, request_id :: integer(), id :: integer()}
-
- @type subscribe ::
- {:subscribe, request_id :: integer(), details :: map(), topic :: String.t()}
- @type invocation ::
- {:invocation, request_id :: integer(), registration_id :: integer(), details :: map(),
- arg_list :: arg_list(), arg_keywords :: arg_keyword()}
- @type event ::
- {:event, subscription_id :: integer(), publication_id :: integer(), details :: map(),
- arg_list :: arg_list(), arg_keyword :: arg_keyword()}
- @type error ::
- {:error, reason :: binary()}
- | {:error, type :: integer(), error :: binary(), details :: map(),
- arg_list :: arg_list(), arg_keyword :: arg_keyword()}
- @type messages ::
- publish()
- | hello()
- | authenticate()
- | unsubscribe()
- | subscribe()
- | invocation()
- | register()
- | unregister()
- | call()
- | yield()
- | error()
- | event()
-
- @type handle_response ::
- {:ok, integer()}
- | {:ok, details :: map(), arg_list :: arg_list(), arg_keyword :: arg_keyword()}
- | {:update, atom(), messages()}
-end
-defmodule Wampex.MixProject do
+defmodule Wampex.Client.MixProject do
use Mix.Project
def project do
[
- app: :wampex,
+ app: :wampex_client,
version: "0.1.0",
elixir: "~> 1.9",
start_permanent: Mix.env() == :prod,
defp deps do
[
- # {:cluster_kv, path: "../cluster_kv"},
- {:cluster_kv,
- git: "https://gitlab.com/entropealabs/cluster_kv.git",
- tag: "4c36b8d68f40711cd167edb9917d32a992f49561"},
- {:cors_plug, "~> 2.0"},
{:credo, "~> 1.2", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
- {:ecto_sql, "~> 3.0"},
{:ex_doc, "~> 0.21", only: :dev, runtime: false},
{:excoveralls, "~> 0.12.2", only: [:dev, :test], runtime: false},
{:jason, "~> 1.1"},
{:msgpack, "~> 0.7.0"},
{:pbkdf2, "~> 2.0"},
- {:plug_cowboy, "~> 2.1"},
- {:postgrex, ">= 0.0.0"},
{:states_language, "~> 0.2"},
- {:websockex, "~> 0.4"}
+ {:wampex, path: "../wampex"},
+ {:websockex, "~> 0.4.2"}
]
end
%{
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"},
- "cluster_kv": {:git, "https://gitlab.com/entropealabs/cluster_kv.git", "4c36b8d68f40711cd167edb9917d32a992f49561", [tag: "4c36b8d68f40711cd167edb9917d32a992f49561"]},
- "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm", "4a0850c9be22a43af9920a71ab17c051f5f7d45c209e40269a1938832510e4d9"},
"conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"},
- "cors_plug": {:hex, :cors_plug, "2.0.2", "2b46083af45e4bc79632bd951550509395935d3e7973275b2b743bd63cc942ce", [:mix], [{:plug, "~> 1.8", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "f0d0e13f71c51fd4ef8b2c7e051388e4dfb267522a83a22392c856de7e46465f"},
- "coveralls": {:hex, :coveralls, "2.1.0", "b44cea21202c0e1994dfced8cedbff66bcd03eeeb565484345b28afacda0559b", [:rebar3], [{:jsx, "2.10.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "bbfbe2a7bebb2d22db15a657e6c1733243e1501714d60fe14a14790d96e7c3b2"},
- "cowboy": {:hex, :cowboy, "2.7.0", "91ed100138a764355f43316b1d23d7ff6bdb0de4ea618cb5d8677c93a7a2f115", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "04fd8c6a39edc6aaa9c26123009200fc61f92a3a94f3178c527b70b767c6e605"},
- "cowlib": {:hex, :cowlib, "2.8.0", "fd0ff1787db84ac415b8211573e9a30a3ebe71b5cbff7f720089972b2319c8a4", [:rebar3], [], "hexpm", "79f954a7021b302186a950a32869dbc185523d99d3e44ce430cd1f3289f41ed4"},
- "credo": {:hex, :credo, "1.2.2", "f57faf60e0a12b0ba9fd4bad07966057fde162b33496c509b95b027993494aab", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8f2623cd8c895a6f4a55ef10f3fdf6a55a9ca7bef09676bd835551687bf8a740"},
- "db_connection": {:hex, :db_connection, "2.2.1", "caee17725495f5129cb7faebde001dc4406796f12a62b8949f4ac69315080566", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "2b02ece62d9f983fcd40954e443b7d9e6589664380e5546b2b9b523cd0fb59e1"},
- "decimal": {:hex, :decimal, "1.8.1", "a4ef3f5f3428bdbc0d35374029ffcf4ede8533536fa79896dd450168d9acdf3c", [:mix], [], "hexpm", "3cb154b00225ac687f6cbd4acc4b7960027c757a5152b369923ead9ddbca7aec"},
+ "credo": {:hex, :credo, "1.3.1", "082e8d9268a489becf8e7aa75671a7b9088b1277cd6c1b13f40a55554b3f5126", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "0da816ed52fa520b9ea0e5d18a0d3ca269e0bd410b1174d88d8abd94be6cce3c"},
"dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"},
"earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm", "8cf8a291ebf1c7b9539e3cddb19e9cef066c2441b1640f13c34c1d3cfc825fec"},
- "ecto": {:hex, :ecto, "3.3.4", "95b05c82ae91361475e5491c9f3ac47632f940b3f92ae3988ac1aad04989c5bb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "9b96cbb83a94713731461ea48521b178b0e3863d310a39a3948c807266eebd69"},
- "ecto_sql": {:hex, :ecto_sql, "3.3.4", "aa18af12eb875fbcda2f75e608b3bd534ebf020fc4f6448e4672fcdcbb081244", [:mix], [{:db_connection, "~> 2.2", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.4 or ~> 3.3.3", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.3.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.15.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "5eccbdbf92e3c6f213007a82d5dbba4cd9bb659d1a21331f89f408e4c0efd7a8"},
"elixpath": {:hex, :elixpath, "0.1.0", "f860e931db7bda6856dc68145694ca429643cc068ef30d7ff6b4096d4357963e", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "30ce06079b41f1f5216ea2cd11605cfe4c82239628555cb3fde9f10055a6eb67"},
"ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "0db1ee8d1547ab4877c5b5dffc6604ef9454e189928d5ba8967d4a58a801f161"},
- "excoveralls": {:hex, :excoveralls, "0.12.2", "a513defac45c59e310ac42fcf2b8ae96f1f85746410f30b1ff2b710a4b6cd44b", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "151c476331d49b45601ffc45f43cb3a8beb396b02a34e3777fea0ad34ae57d89"},
+ "excoveralls": {:hex, :excoveralls, "0.12.3", "2142be7cb978a3ae78385487edda6d1aff0e482ffc6123877bb7270a8ffbcfe0", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "568a3e616c264283f5dea5b020783ae40eef3f7ee2163f7a67cbd7b35bcadada"},
"hackney": {:hex, :hackney, "1.15.2", "07e33c794f8f8964ee86cebec1a8ed88db5070e52e904b8f12209773c1036085", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.5", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "e0100f8ef7d1124222c11ad362c857d3df7cb5f4204054f9f0f4a728666591fc"},
"idna": {:hex, :idna, "6.0.0", "689c46cbcdf3524c44d5f3dde8001f364cd7608a99556d8fbd8239a5798d4c10", [:rebar3], [{:unicode_util_compat, "0.4.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "4bdd305eb64e18b0273864920695cb18d7a2021f31a11b9c5fbcd9a253f936e2"},
- "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},
+ "jason": {:hex, :jason, "1.2.0", "10043418c42d2493d0ee212d3fddd25d7ffe484380afad769a0a38795938e448", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "116747dbe057794c3a3e4e143b7c8390b29f634e16c78a7f59ba75bfa6852e7f"},
"json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"},
- "jsx": {:hex, :jsx, "2.10.0", "77760560d6ac2b8c51fd4c980e9e19b784016aa70be354ce746472c33beb0b1c", [:rebar3], [], "hexpm", "9a83e3704807298016968db506f9fad0f027de37546eb838b3ae1064c3a0ad62"},
- "libcluster": {:hex, :libcluster, "3.2.0", "ed483eea1b960b89a4cee374898644930b28759226716cd7aeaf707976c5966d", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "350e7c2cd5311b9dfb0156d5f5778561e5833f90fabdaac3cd79ea022c4dc3dc"},
- "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"},
"makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a10c6eb62cca416019663129699769f0c2ccf39428b3bb3c0cb38c718a0c186d"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
- "mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm", "6cbe761d6a0ca5a31a0931bf4c63204bceb64538e664a8ecf784a9a6f3b875f1"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"msgpack": {:hex, :msgpack, "0.7.0", "128ae0a2227c7e7a2847c0f0f73551c268464f8c1ee96bffb920bc0a5712b295", [:rebar3], [], "hexpm", "4649353da003e6f438d105e4b1e0f17757f6f5ec8687a6f30875ff3ac4ce2a51"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"},
"pbkdf2": {:hex, :pbkdf2, "2.0.0", "11c23279fded5c0027ab3996cfae77805521d7ef4babde2bd7ec04a9086cf499", [:rebar3], [], "hexpm", "1e793ce6fdb0576613115714deae9dfc1d1537eaba74f07efb36de139774488d"},
- "plug": {:hex, :plug, "1.9.0", "8d7c4e26962283ff9f8f3347bd73838e2413fbc38b7bb5467d5924f68f3a5a4a", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "9902eda2c52ada2a096434682e99a2493f5d06a94d6ac6bcfff9805f952350f1"},
- "plug_cowboy": {:hex, :plug_cowboy, "2.1.2", "8b0addb5908c5238fac38e442e81b6fcd32788eaa03246b4d55d147c47c5805e", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "7d722581ce865a237e14da6d946f92704101740a256bd13ec91e63c0b122fc70"},
- "plug_crypto": {:hex, :plug_crypto, "1.1.2", "bdd187572cc26dbd95b87136290425f2b580a116d3fb1f564216918c9730d227", [:mix], [], "hexpm", "6b8b608f895b6ffcfad49c37c7883e8df98ae19c6a28113b02aa1e9c5b22d6b5"},
- "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
- "postgrex": {:hex, :postgrex, "0.15.3", "5806baa8a19a68c4d07c7a624ccdb9b57e89cbc573f1b98099e3741214746ae4", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "4737ce62a31747b4c63c12b20c62307e51bb4fcd730ca0c32c280991e0606c90"},
- "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm", "13104d7897e38ed7f044c4de953a6c28597d1c952075eb2e328bc6d6f2bfc496"},
"states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"},
"telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
add(:salt, :string, null: false)
add(:iterations, :integer, null: false)
add(:keylen, :integer, null: false)
- add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all))
+ add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all), null: false)
timestamps()
end
use ExUnit.Case, async: true
doctest Wampex
- alias Wampex.{Authentication, Realm}
+ alias Wampex.Client.{Authentication, Realm}
alias Wampex.Client
alias Wampex.Roles.{Broker, Callee, Caller, Dealer, Peer, Publisher, Subscriber}
alias Wampex.Router
assert is_binary(id)
end
+ @tag :client
+ test "admin callee is invoked and responds with error" do
+ caller_name = TestAdminErrorCaller
+ Client.start_link(name: caller_name, session: @session)
+
+ assert {:error, 48, "Error registering user", %{}, [], %{}} =
+ Client.send_request(
+ caller_name,
+ Caller.call(%Call{
+ procedure: "admin.create_user",
+ arg_kw: %{authid: "chris", password: "woot!", realm: "not.real"}
+ })
+ )
+ end
+
@tag :client
test "callee is invoked and responds and caller gets result" do
callee_name = TestCalleeRespond