]> Entropealabs - wampex.git/commitdiff
moving router and client to their own repos
authorChristopher <chris@entropealabs.com>
Fri, 20 Mar 2020 21:40:49 +0000 (16:40 -0500)
committerChristopher <chris@entropealabs.com>
Fri, 20 Mar 2020 21:40:49 +0000 (16:40 -0500)
20 files changed:
lib/authentication.ex [deleted file]
lib/client.ex [deleted file]
lib/client/session.ex [deleted file]
lib/client/transports/web_socket.ex [deleted file]
lib/realm.ex [deleted file]
lib/router.ex [deleted file]
lib/router/admin.ex [deleted file]
lib/router/authentication.ex [deleted file]
lib/router/authentication/ensure_default_admin.ex [deleted file]
lib/router/authentication/realm.ex [deleted file]
lib/router/authentication/repo.ex [deleted file]
lib/router/authentication/user.ex [deleted file]
lib/router/proxy.ex [deleted file]
lib/router/rest.ex [deleted file]
lib/router/session.ex [deleted file]
lib/router/transports/web_socket.ex [deleted file]
lib/transport.ex [deleted file]
mix.exs
priv/repo/migrations/20200319210508_create_realms.exs
test/wampex_test.exs

diff --git a/lib/authentication.ex b/lib/authentication.ex
deleted file mode 100644 (file)
index 419fb2d..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-defmodule Wampex.Authentication do
-  @moduledoc false
-
-  alias Wampex.Crypto
-
-  @enforce_keys [:authid, :authmethods, :secret]
-  defstruct [:authid, :authmethods, :secret]
-
-  @callback handle(challenge :: binary(), auth :: __MODULE__.t()) :: {binary(), map()}
-
-  @type t :: %__MODULE__{
-          authid: binary(),
-          authmethods: [binary()],
-          secret: binary()
-        }
-
-  def handle(
-        {"wampcra", %{"challenge" => ch, "salt" => salt, "iterations" => it, "keylen" => len}},
-        auth
-      ) do
-    {auth
-     |> Map.get(:secret)
-     |> Crypto.pbkdf2(salt, it, len)
-     |> Crypto.hash_challenge(ch), %{}}
-  end
-
-  def handle({"wampcra", %{"challenge" => ch}}, auth) do
-    {auth
-     |> Map.get(:secret)
-     |> Crypto.hash_challenge(ch), %{}}
-  end
-end
diff --git a/lib/client.ex b/lib/client.ex
deleted file mode 100644 (file)
index b00a29a..0000000
+++ /dev/null
@@ -1,74 +0,0 @@
-defmodule Wampex.Client do
-  @moduledoc """
-  Documentation for Wampex.
-  """
-  use Supervisor
-
-  alias Wampex.Client.Session, as: Sess
-  alias Wampex.Roles.{Callee, Subscriber}
-  alias Wampex.Roles.Callee.Register
-  alias Wampex.Roles.Subscriber.Subscribe
-
-  @spec start_link(name: atom(), session_data: Sess.t()) ::
-          {:ok, pid()}
-          | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
-  def start_link(name: name, session: session_data) when is_atom(name) do
-    Supervisor.start_link(__MODULE__, {name, session_data}, name: name)
-  end
-
-  @spec init({atom(), Sess.t()}) ::
-          {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
-  def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do
-    session = session_name(name)
-    subscriber_registry = subscriber_registry_name(name)
-    callee_registry = callee_registry_name(name)
-    transport = transport_name(name)
-    session_data = %Sess{session_data | name: name, transport_pid: transport}
-
-    children = [
-      {Sess, [{:local, session}, session_data, []]},
-      {t, [url: url, session: session, protocol: p, serializer: s, opts: [name: transport]]},
-      {Registry,
-       [keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]},
-      {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]}
-    ]
-
-    Supervisor.init(children, strategy: :one_for_all, max_restarts: 0)
-  end
-
-  @spec session_name(module()) :: module()
-  def session_name(name), do: Module.concat([name, Session])
-  @spec subscriber_registry_name(module()) :: module()
-  def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry])
-  @spec callee_registry_name(module()) :: module()
-  def callee_registry_name(name), do: Module.concat([name, CalleeRegistry])
-  @spec transport_name(module()) :: module()
-  def transport_name(name), do: Module.concat([name, Transport])
-
-  @spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) ::
-          {:ok, integer()}
-  def subscribe(name, %Subscribe{topic: t} = sub, timeout \\ 5000) do
-    {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(sub), timeout)
-    Registry.register(subscriber_registry_name(name), id, t)
-    {:ok, id}
-  end
-
-  @spec register(name :: module(), register :: Register.t(), timeout :: integer()) ::
-          {:ok, integer()}
-  def register(name, %Register{procedure: p} = reg, timeout \\ 5000) do
-    {:ok, id} = Sess.send_request(session_name(name), Callee.register(reg), timeout)
-    Registry.register(callee_registry_name(name), id, p)
-    {:ok, id}
-  end
-
-  @spec send_request(name :: module(), request :: Wampex.message(), timeout :: integer()) ::
-          term()
-  def send_request(name, request, timeout \\ 5000) do
-    Sess.send_request(session_name(name), request, timeout)
-  end
-
-  @spec cast_send_request(name :: module(), Wampex.message()) :: :ok
-  def cast_send_request(name, request) do
-    Sess.cast_send_request(session_name(name), request)
-  end
-end
diff --git a/lib/client/session.ex b/lib/client/session.ex
deleted file mode 100644 (file)
index 5c55a1d..0000000
+++ /dev/null
@@ -1,451 +0,0 @@
-defmodule Wampex.Client.Session do
-  @moduledoc """
-  A state machine based process for managing a WAMP Session. Utilizes StatesLanguage to implement the gen_statem process. See priv/session.json for the JSON representation
-  """
-  use StatesLanguage, data: "priv/client.json"
-
-  @max_id 9_007_199_254_740_992
-
-  alias StatesLanguage, as: SL
-  alias Wampex.Realm
-  alias Wampex.Roles.Peer
-  alias Wampex.Roles.Peer.{Authenticate, Hello}
-  alias Wampex.Serializers.MessagePack
-  alias __MODULE__, as: Sess
-  alias Wampex.Client
-  alias Wampex.Client.Transports.WebSocket
-
-  @yield 70
-  @hello 1
-  @abort 3
-
-  @enforce_keys [:url, :roles]
-
-  defstruct [
-    :id,
-    :url,
-    :transport_pid,
-    :message,
-    :event,
-    :invocation,
-    :goodbye,
-    :error,
-    :realm,
-    :name,
-    :roles,
-    :challenge,
-    :interrupt,
-    message_queue: [],
-    request_id: 0,
-    protocol: "wamp.2.msgpack",
-    transport: WebSocket,
-    serializer: MessagePack,
-    requests: []
-  ]
-
-  @type t :: %__MODULE__{
-          id: integer() | nil,
-          url: binary(),
-          transport_pid: pid() | module() | nil,
-          message: Wampex.message() | nil,
-          event: Wampex.event() | nil,
-          invocation: Wampex.invocation() | nil,
-          goodbye: binary() | nil,
-          realm: Realm.t(),
-          name: module() | nil,
-          roles: [module()],
-          challenge: {binary(), binary()} | nil,
-          interrupt: integer() | nil,
-          message_queue: [],
-          request_id: integer(),
-          protocol: binary(),
-          transport: module(),
-          serializer: module(),
-          requests: []
-        }
-
-  ## States
-  @established "Established"
-  @init "Init"
-  @challenge "Challenge"
-  @abort "Abort"
-  @goodbye "Goodbye"
-  @event "Event"
-  @invocation "Invocation"
-  @interrupt "Interrupt"
-  @handshake "Handshake"
-
-  ## Resources
-  @init_transport "InitTransport"
-  @wait_transport "WaitForTransport"
-  @hello "Hello"
-  @handle_challenge "HandleChallenge"
-  @handle_established "HandleEstablished"
-  @handle_abort "HandleAbort"
-  @handle_goodbye "HandleGoodbye"
-  @handle_event "HandleEvent"
-  @handle_invocation "HandleInvocation"
-  @handle_message "HandleMessage"
-  @handle_interrupt "HandleInterrupt"
-  @handle_handshake "HandleHandshake"
-
-  @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message(), from :: pid()) ::
-          :ok
-  def cast_send_request(name, request, pid) do
-    __MODULE__.cast(name, {:send_request, request, pid})
-  end
-
-  @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message()) :: :ok
-  def cast_send_request(name, request) do
-    __MODULE__.cast(name, {:send_request, request})
-  end
-
-  @spec send_request(name :: atom() | pid(), request :: Wampex.message(), timeout :: integer()) ::
-          term()
-  def send_request(name, request, timeout) do
-    __MODULE__.call(name, {:send_request, request}, timeout)
-  end
-
-  @impl true
-  def handle_call(
-        {:send_request, request},
-        from,
-        @established,
-        %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
-      ) do
-    request_id = do_send(r_id, tt, t, request)
-
-    {:ok,
-     %SL{
-       data
-       | data: %Sess{
-           sess
-           | request_id: request_id,
-             requests: [{request_id, from} | sess.requests]
-         }
-     }, []}
-  end
-
-  @impl true
-  def handle_call(
-        {:send_request, request},
-        from,
-        _,
-        %SL{data: %Sess{message_queue: mq} = sess} = data
-      ) do
-    Logger.debug("Queueing request: #{inspect(request)}")
-    {:ok, %SL{data | data: %Sess{sess | message_queue: [{request, from} | mq]}}, []}
-  end
-
-  @impl true
-  def handle_cast(
-        {:send_request, request, from},
-        @established,
-        %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
-      ) do
-    request_id = do_send(r_id, tt, t, request)
-
-    {:ok,
-     %SL{
-       data
-       | data: %Sess{
-           sess
-           | request_id: request_id,
-             requests: [{request_id, from} | sess.requests]
-         }
-     }, []}
-  end
-
-  @impl true
-  def handle_cast(
-        {:send_request, request},
-        @established,
-        %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
-      ) do
-    request_id = do_send(r_id, tt, t, request)
-
-    {:ok,
-     %SL{
-       data
-       | data: %Sess{sess | request_id: request_id}
-     }, []}
-  end
-
-  @impl true
-  def handle_cast(
-        {:send_request, request},
-        _,
-        %SL{data: %Sess{message_queue: mq} = sess} = data
-      ) do
-    Logger.debug("Queueing request: #{inspect(request)}")
-    {:ok, %SL{data | data: %Sess{sess | message_queue: [{request, nil} | mq]}}, []}
-  end
-
-  @impl true
-  def handle_resource(
-        @init_transport,
-        _,
-        @wait_transport,
-        data
-      ) do
-    Logger.debug("Waiting for transport to connect...")
-    {:ok, %SL{data | data: %Sess{data.data | roles: [Peer | data.data.roles]}}, []}
-  end
-
-  @impl true
-  def handle_resource(
-        @hello,
-        _,
-        @init,
-        %SL{
-          data: %Sess{
-            transport: tt,
-            transport_pid: t,
-            roles: roles,
-            realm: %Realm{name: realm, authentication: auth}
-          }
-        } = data
-      ) do
-    message = maybe_authentication(%Hello{realm: realm, roles: roles}, auth)
-    tt.send_request(t, Peer.hello(message))
-    {:ok, data, [{:next_event, :internal, :hello_sent}]}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_challenge,
-        _,
-        @challenge,
-        %SL{
-          data: %Sess{
-            transport: tt,
-            transport_pid: t,
-            challenge: challenge,
-            realm: %Realm{authentication: auth}
-          }
-        } = sl
-      ) do
-    {signature, extra} = auth.__struct__.handle(challenge, auth)
-    tt.send_request(t, Peer.authenticate(%Authenticate{signature: signature, extra: extra}))
-    {:ok, sl, [{:next_event, :internal, :challenged}]}
-  end
-
-  @impl true
-  def handle_resource(
-        @goodbye,
-        _,
-        @handle_goodbye,
-        %SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data
-      ) do
-    tt.send_message(t, Peer.goodbye(goodbye))
-    {:ok, data, []}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_message,
-        _,
-        _,
-        %SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data
-      ) do
-    Logger.debug("Handling Message #{inspect(msg)}")
-    {actions, id, response} = handle_message(msg, roles)
-    {%SL{data: sess} = data, response} = maybe_update_response(data, response)
-    {requests, actions} = resp = handle_response(id, actions, requests, response)
-    Logger.debug("Response: #{inspect(resp)}")
-    {:ok, %SL{data | data: %Sess{sess | requests: requests}}, actions}
-  end
-
-  @impl true
-  def handle_resource(@handle_abort, _, @abort, data) do
-    Logger.warn("Aborting: #{data.data.error}")
-    {:ok, data, []}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_established,
-        _,
-        @established,
-        %SL{
-          data:
-            %Sess{
-              request_id: r_id,
-              transport: tt,
-              transport_pid: t,
-              message_queue: mq,
-              requests: reqs
-            } = sess
-        } = data
-      ) do
-    {request_id, requests} = send_message_queue(r_id, mq, tt, t, reqs)
-    requests = remove_cast_requests(requests)
-
-    {:ok,
-     %SL{
-       data
-       | data: %Sess{sess | requests: requests, request_id: request_id, message_queue: []}
-     }, []}
-  end
-
-  @impl true
-  def handle_resource(@handle_handshake, _, @handshake, data) do
-    {:ok, data, []}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_event,
-        _,
-        @event,
-        %SL{data: %Sess{name: name, event: event}} = sl
-      ) do
-    Logger.debug("Received Event #{inspect(event)}")
-
-    sub = Client.subscriber_registry_name(name)
-
-    Registry.dispatch(sub, elem(event, 1), fn entries ->
-      for {pid, _topic} <- entries, do: send(pid, event)
-    end)
-
-    {:ok, sl, [{:next_event, :internal, :transition}]}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_invocation,
-        _,
-        @invocation,
-        %SL{data: %Sess{name: name, invocation: invocation}} = sl
-      ) do
-    Logger.debug("Received Invocation #{inspect(invocation)}")
-    reg = Client.callee_registry_name(name)
-
-    Registry.dispatch(reg, elem(invocation, 2), fn entries ->
-      for {pid, _procedure} <- entries, do: send(pid, invocation)
-    end)
-
-    {:ok, sl, [{:next_event, :internal, :transition}]}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_interrupt,
-        _,
-        @interrupt,
-        %SL{data: %Sess{interrupt: {id, opts}, name: name}} = data
-      ) do
-    [{callee, _procedure}] = Registry.lookup(Client.callee_registry_name(name), id)
-    send(callee, {:interrupt, id, opts})
-    {:ok, data, [{:next_event, :internal, :transition}]}
-  end
-
-  @impl true
-  def handle_resource(resource, _, state, data) do
-    Logger.error("No specific resource handler for #{resource} in state #{state}")
-    {:ok, data, []}
-  end
-
-  @impl true
-  def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do
-    {:ok, %SL{data | data: %Sess{sess | message: message}},
-     [{:next_event, :internal, :message_received}]}
-  end
-
-  defp handle_response(nil, actions, requests, _), do: {requests, actions}
-
-  defp handle_response(id, actions, requests, response) do
-    Enum.reduce_while(requests, {requests, actions}, fn
-      {^id, nil}, _ ->
-        {:halt, {remove_request(id, requests), actions}}
-
-      {^id, from}, _ ->
-        {:halt, {remove_request(id, requests), [get_response_action(from, response) | actions]}}
-
-      {_, _}, acc ->
-        {:cont, acc}
-    end)
-  end
-
-  defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do
-    send(from, {:progress, arg_l, arg_kw})
-    []
-  end
-
-  defp get_response_action(from, response), do: {:reply, from, response}
-
-  defp maybe_update_response(data, {:update, key, resp}) do
-    {%SL{data | data: Map.put(data.data, key, resp)}, resp}
-  end
-
-  defp maybe_update_response(data, resp), do: {data, resp}
-
-  defp do_send(r_id, tt, t, message) do
-    {request_id, message} = maybe_inject_request_id(r_id, message)
-    tt.send_request(t, remove_nil_values(message))
-    request_id
-  end
-
-  defp maybe_authentication(message, nil), do: message
-
-  defp maybe_authentication(%Hello{} = message, auth) do
-    %Hello{message | options: Map.from_struct(auth)}
-  end
-
-  defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1)
-
-  defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {r_id, message}
-  defp maybe_inject_request_id(r_id, [@hello | _] = message), do: {r_id, message}
-  defp maybe_inject_request_id(r_id, [@abort | _] = message), do: {r_id, message}
-
-  defp maybe_inject_request_id(r_id, message) do
-    request_id = get_request_id(r_id)
-    {request_id, List.insert_at(message, 1, request_id)}
-  end
-
-  defp get_request_id(current_id) when current_id == @max_id do
-    1
-  end
-
-  defp get_request_id(current_id) do
-    current_id + 1
-  end
-
-  defp remove_request(id, requests) do
-    Enum.filter(requests, fn
-      {^id, _} -> false
-      {_id, _} -> true
-    end)
-  end
-
-  defp send_message_queue(r_id, [], _, _, reqs), do: {r_id, reqs}
-
-  defp send_message_queue(r_id, mq, tt, t, reqs) do
-    mq
-    |> Enum.reverse()
-    |> Enum.reduce({r_id, reqs}, fn {request, from}, {id, requests} ->
-      r = do_send(id, tt, t, request)
-      {r, [{r, from} | requests]}
-    end)
-  end
-
-  def remove_cast_requests([]), do: []
-
-  def remove_cast_requests(requests) do
-    Enum.filter(requests, fn
-      {_, nil} -> false
-      {_, _} -> true
-    end)
-  end
-
-  defp handle_message(msg, roles) do
-    Enum.reduce_while(roles, nil, fn r, _ ->
-      try do
-        res = r.handle(msg)
-        {:halt, res}
-      rescue
-        FunctionClauseError -> {:cont, nil}
-      end
-    end)
-  end
-end
diff --git a/lib/client/transports/web_socket.ex b/lib/client/transports/web_socket.ex
deleted file mode 100644 (file)
index b0c3d85..0000000
+++ /dev/null
@@ -1,78 +0,0 @@
-defmodule Wampex.Client.Transports.WebSocket do
-  use WebSockex
-
-  @behaviour Wampex.Transport
-
-  alias WebSockex.Conn
-
-  require Logger
-
-  @header "Sec-WebSocket-Protocol"
-  @ping_interval 20_000
-
-  @impl true
-  def send_request(t, data) do
-    WebSockex.cast(t, {:send_request, data})
-  end
-
-  @impl true
-  def start_link(
-        url: url,
-        session: session,
-        protocol: protocol,
-        serializer: serializer,
-        opts: opts
-      ) do
-    url
-    |> Conn.new(extra_headers: [{@header, protocol}])
-    |> WebSockex.start_link(
-      __MODULE__,
-      %{session: session, serializer: serializer, connected: false},
-      opts
-    )
-  end
-
-  @impl true
-  def handle_connect(%Conn{host: h, port: p, path: pa, query: q} = _conn, state) do
-    Logger.debug("Connected to #{h}:#{p}#{pa}?#{q}")
-    Process.send_after(self(), :ping, @ping_interval)
-    send(state.session, {:connected, true})
-    {:ok, %{state | connected: true}}
-  end
-
-  @impl true
-  def handle_ping(_ping, state) do
-    {:reply, :pong, state}
-  end
-
-  @impl true
-  def handle_pong(_pong, state) do
-    {:ok, state}
-  end
-
-  @impl true
-  def handle_cast({:send_request, data}, %{serializer: s} = state) do
-    Logger.debug("Sending Request #{inspect(data)}")
-    {:reply, {s.data_type(), s.serialize!(data)}, state}
-  end
-
-  @impl true
-  def handle_cast({:connected, resp}, state) do
-    send(resp, {:connected, state.connected})
-    {:ok, state}
-  end
-
-  @impl true
-  def handle_info(:ping, state) do
-    Process.send_after(self(), :ping, @ping_interval)
-    {:reply, :ping, state}
-  end
-
-  @impl true
-  def handle_frame({type, message}, %{serializer: s} = state) do
-    message = s.deserialize!(message)
-    Logger.debug("Received #{type} data: #{inspect(message)}")
-    send(state.session, {:set_message, message})
-    {:ok, state}
-  end
-end
diff --git a/lib/realm.ex b/lib/realm.ex
deleted file mode 100644 (file)
index d218bbd..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-defmodule Wampex.Realm do
-  @moduledoc "Defines the WAMP Realm"
-  alias Wampex.Authentication
-
-  @enforce_keys [:name]
-  defstruct [:name, :authentication]
-
-  @type t :: %__MODULE__{
-          name: binary(),
-          authentication: Authentication.t() | nil
-        }
-end
diff --git a/lib/router.ex b/lib/router.ex
deleted file mode 100644 (file)
index 2650155..0000000
+++ /dev/null
@@ -1,76 +0,0 @@
-defmodule Wampex.Router do
-  use Supervisor
-
-  alias Wampex.Router.{Admin, Authentication, Proxy, REST}
-  alias Wampex.Router.Authentication.EnsureDefaultAdmin
-  alias Wampex.Router.Transports.WebSocket
-
-  @spec start_link(
-          name: module(),
-          port: pos_integer(),
-          topologies: [],
-          replicas: integer(),
-          quorum: integer(),
-          admin_realm: String.t(),
-          admin_authid: String.t(),
-          admin_password: String.t()
-        ) ::
-          {:ok, pid()}
-          | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
-  def start_link(
-        name: name,
-        port: port,
-        topologies: topologies,
-        replicas: repls,
-        quorum: quorum,
-        admin_realm: admin_realm,
-        admin_authid: admin_authid,
-        admin_password: admin_password
-      )
-      when is_atom(name) do
-    Supervisor.start_link(
-      __MODULE__,
-      {name, port, topologies, repls, quorum, admin_realm, admin_authid, admin_password},
-      name: name
-    )
-  end
-
-  @spec init(
-          {module(), pos_integer(), list(), integer(), integer(), String.t(), String.t(),
-           String.t()}
-        ) ::
-          {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
-  def init({name, port, topologies, replicas, quorum, admin_realm, admin_authid, admin_password}) do
-    children = [
-      Authentication.Repo,
-      {EnsureDefaultAdmin, [uri: admin_realm, authid: admin_authid, password: admin_password]},
-      {ClusterKV,
-       [
-         name: db_name(name),
-         topologies: topologies,
-         replicas: replicas,
-         quorum: quorum
-       ]},
-      {Proxy, [name: proxy_name(name)]},
-      {Admin,
-       [name: admin_name(name), db: db_name(name), realm: admin_realm, proxy: proxy_name(name)]},
-      {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch(name)]}
-    ]
-
-    Supervisor.init(children, strategy: :one_for_one, max_restarts: 10, max_seconds: 10)
-  end
-
-  def db_name(name), do: Module.concat([name, KV])
-  def proxy_name(name), do: Module.concat([name, Proxy])
-  def admin_name(name), do: Module.concat([name, Admin])
-
-  defp dispatch(name) do
-    [
-      {:_,
-       [
-         {"/ws", WebSocket, %{db: db_name(name), proxy: proxy_name(name)}},
-         {:_, Plug.Cowboy.Handler, {REST, []}}
-       ]}
-    ]
-  end
-end
diff --git a/lib/router/admin.ex b/lib/router/admin.ex
deleted file mode 100644 (file)
index 51866dd..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-defmodule Wampex.Router.Admin do
-  @moduledoc false
-  use GenServer
-  require Logger
-  alias Wampex.Roles.Dealer.{Invocation, Result}
-  alias Wampex.Router.Authentication.{User, Realm}
-  alias Wampex.Router.Session
-
-  @procedures [
-    "admin.create_user",
-    "admin.create_realm"
-  ]
-
-  def start_link(name: name, db: db, realm: realm, proxy: proxy) do
-    GenServer.start_link(__MODULE__, {db, realm, proxy}, name: name)
-  end
-
-  def init({db, realm, proxy}) do
-    {:ok, %{db: db, realm: realm, proxy: proxy, regs: []}, {:continue, :ok}}
-  end
-
-  def handle_continue(:ok, %{db: db, realm: realm, regs: regs} = state) do
-    regs = register_procedures(db, realm, regs)
-    {:noreply, %{state | regs: regs}}
-  end
-
-  def register_procedures(db, realm, regs) do
-    Enum.reduce(@procedures, regs, fn proc, acc ->
-      val = {Session.get_global_id(), {self(), Node.self()}}
-      ClusterKV.put(db, realm, proc, val)
-      [{proc, val} | acc]
-    end)
-  end
-
-  def handle_info(
-        {req_id,
-         %Invocation{
-           arg_kw: %{"realm" => realm, "authid" => authid, "password" => password},
-           options: %{"procedure" => "admin.create_user"}
-         } = event, {pid, node}},
-        %{proxy: proxy} = state
-      ) do
-    realm = Realm.get(uri: realm)
-    %User{id: id} = User.create(authid: authid, password: password, realm: realm)
-    Logger.info("Admin handled event: #{inspect(event)}")
-    send({proxy, node}, {%Result{request_id: req_id, arg_list: [id]}, pid})
-    {:noreply, state}
-  end
-end
diff --git a/lib/router/authentication.ex b/lib/router/authentication.ex
deleted file mode 100644 (file)
index 5ebf970..0000000
+++ /dev/null
@@ -1,60 +0,0 @@
-defmodule Wampex.Router.Authentication do
-  @moduledoc false
-
-  require Logger
-
-  alias Wampex.Crypto
-  alias Wampex.Serializers.JSON
-  alias Wampex.Router.Authentication.{Realm, User}
-
-  @wampcra "wampcra"
-  @auth_provider "userdb"
-  @auth_role "user"
-
-  def authenticate?(methods) do
-    can_auth(methods)
-  end
-
-  def method, do: @wampcra
-
-  def challenge(realm, authid, session_id) do
-    %Realm{} = realm = Realm.get(uri: realm)
-    %User{} = user = User.get(authid: authid, realm: realm)
-    now = DateTime.to_iso8601(DateTime.utc_now())
-
-    %{
-      challenge:
-        JSON.serialize!(%{
-          nonce: Crypto.random_string(user.keylen),
-          authprovider: @auth_provider,
-          authid: authid,
-          timestamp: now,
-          authrole: @auth_role,
-          authmethod: @wampcra,
-          session: session_id
-        }),
-      salt: user.salt,
-      keylen: user.keylen,
-      iterations: user.iterations
-    }
-  end
-
-  def authenticate(signature, realm, authid, %{
-        challenge: challenge
-      }) do
-    authid
-    |> get_secret(realm)
-    |> Crypto.hash_challenge(challenge)
-    |> :pbkdf2.compare_secure(signature)
-  end
-
-  defp get_secret(authid, uri) do
-    realm = Realm.get(uri: uri)
-    %User{password: password} = User.get(authid: authid, realm: realm)
-    password
-  end
-
-  defp can_auth([]), do: false
-  defp can_auth([@wampcra | _]), do: true
-  defp can_auth([_ | t]), do: can_auth(t)
-end
diff --git a/lib/router/authentication/ensure_default_admin.ex b/lib/router/authentication/ensure_default_admin.ex
deleted file mode 100644 (file)
index 9052191..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-defmodule Wampex.Router.Authentication.EnsureDefaultAdmin do
-  @moduledoc false
-  require Logger
-  use GenServer
-
-  alias Wampex.Router.Authentication.{Realm, User}
-
-  def start_link(uri: uri, authid: authid, password: password) do
-    GenServer.start_link(__MODULE__, {uri, authid, password})
-  end
-
-  def init({uri, authid, password}) do
-    %Realm{} = realm = Realm.create(uri: uri)
-    %User{} = user = User.create(authid: authid, password: password, realm: realm)
-    Logger.info("Realm: #{inspect(realm)}")
-    Logger.info("User: #{inspect(user)}")
-    :ignore
-  end
-end
diff --git a/lib/router/authentication/realm.ex b/lib/router/authentication/realm.ex
deleted file mode 100644 (file)
index 10fa68b..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-defmodule Wampex.Router.Authentication.Realm do
-  @moduledoc """
-  CREATE TABLE authentication.realms (
-        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-        uri STRING(255) NOT NULL UNIQUE,
-        parent STRING NULL,
-        inserted_at TIMESTAMP NOT NULL,
-        updated_at TIMESTAMP NOT NULL
-  );
-
-  """
-  use Ecto.Schema
-
-  alias __MODULE__
-  alias Wampex.Router.Authentication.Repo
-
-  @primary_key {:id, :binary_id, autogenerate: false, read_after_writes: true}
-  @foreign_key_type :binary_id
-  schema "realms" do
-    field(:uri, :string)
-    field(:parent, :string)
-    timestamps()
-  end
-
-  def get(uri: uri) do
-    Repo.get_by(Realm, uri: uri)
-  end
-
-  def create(uri: uri) do
-    try do
-      {:ok, r} =
-        %Realm{uri: uri, parent: uri}
-        |> Repo.insert()
-
-      r
-    rescue
-      _er ->
-        get(uri: uri)
-    end
-  end
-end
diff --git a/lib/router/authentication/repo.ex b/lib/router/authentication/repo.ex
deleted file mode 100644 (file)
index 9fe8e40..0000000
+++ /dev/null
@@ -1,5 +0,0 @@
-defmodule Wampex.Router.Authentication.Repo do
-  use Ecto.Repo,
-    otp_app: :wampex,
-    adapter: Ecto.Adapters.Postgres
-end
diff --git a/lib/router/authentication/user.ex b/lib/router/authentication/user.ex
deleted file mode 100644 (file)
index ea470af..0000000
+++ /dev/null
@@ -1,66 +0,0 @@
-defmodule Wampex.Router.Authentication.User do
-  @moduledoc """
-  CREATE TABLE authentication.users (
-        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-        authid STRING(255) NOT NULL,
-        password STRING NOT NULL,
-        salt STRING NOT NULL,
-        iterations INT NOT NULL,
-        keylen INT NOT NULL,
-        realm_id UUID NOT NULL REFERENCES authentication.realms (id) ON DELETE CASCADE,
-        inserted_at TIMESTAMP NOT NULL,
-        updated_at TIMESTAMP NOT NULL,
-        UNIQUE (authid, realm_id),
-        INDEX (authid)
-  );
-
-  """
-
-  use Ecto.Schema
-  alias __MODULE__
-  alias Ecto.Migration
-  alias Wampex.Crypto
-  alias Wampex.Router.Authentication.{Realm, Repo}
-
-  @primary_key {:id, :binary_id, autogenerate: false, read_after_writes: true}
-  @foreign_key_type :binary_id
-  schema "users" do
-    field(:authid, :string)
-    field(:password, :string)
-    field(:salt, :string)
-    field(:iterations, :integer)
-    field(:keylen, :integer)
-    belongs_to(:realm, Realm)
-    timestamps()
-  end
-
-  def get(authid: authid, realm: realm) do
-    Repo.get_by(User, authid: authid, realm_id: realm.id)
-  end
-
-  def create(authid: authid, password: password, realm: realm) do
-    keylen = Application.get_env(:wampex, :keylen)
-    salt = Crypto.random_string(keylen)
-    iterations = Enum.random(10_000..50_000)
-
-    try do
-      password = Crypto.pbkdf2(password, salt, iterations, keylen)
-
-      {:ok, u} =
-        %User{
-          authid: authid,
-          password: password,
-          realm: realm,
-          salt: salt,
-          iterations: iterations,
-          keylen: keylen
-        }
-        |> Repo.insert()
-
-      u
-    rescue
-      _er ->
-        get(authid: authid, realm: realm)
-    end
-  end
-end
diff --git a/lib/router/proxy.ex b/lib/router/proxy.ex
deleted file mode 100644 (file)
index 2ddb7d6..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-defmodule Wampex.Router.Proxy do
-  @moduledoc false
-  use GenServer
-
-  def start_link(name: name) do
-    GenServer.start_link(__MODULE__, :ok, name: name)
-  end
-
-  def init(:ok) do
-    {:ok, []}
-  end
-
-  def handle_info({e, to}, state) do
-    send(to, e)
-    {:noreply, state}
-  end
-
-  def handle_call({:is_up, pid}, _from, s), do: {:reply, Process.alive?(pid), s}
-end
diff --git a/lib/router/rest.ex b/lib/router/rest.ex
deleted file mode 100644 (file)
index a15ad87..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-defmodule Wampex.Router.REST do
-  @moduledoc false
-  use Plug.Router
-  import Plug.Conn
-
-  plug(CORSPlug, origin: ["*"])
-  plug(:match)
-  plug(:dispatch)
-
-  get "/favicon.ico" do
-    send_resp(conn, 200, "ok")
-  end
-
-  get "/alivez" do
-    send_resp(conn, 200, "ok")
-  end
-
-  get "/readyz" do
-    send_resp(conn, 200, "ok")
-  end
-
-  match _ do
-    send_resp(conn, 404, "oops")
-  end
-end
diff --git a/lib/router/session.ex b/lib/router/session.ex
deleted file mode 100644 (file)
index 2cd7847..0000000
+++ /dev/null
@@ -1,779 +0,0 @@
-defmodule Wampex.Router.Session do
-  @moduledoc """
-  A state machine based process for managing a WAMP Session. Utilizes StatesLanguage to implement the gen_statem process. See priv/session.json for the JSON representation
-  """
-  use StatesLanguage, data: "priv/router.json"
-
-  @max_id 9_007_199_254_740_992
-
-  alias __MODULE__, as: Sess
-  alias StatesLanguage, as: SL
-  alias Wampex.Roles.{Broker, Caller, Dealer, Peer}
-  alias Wampex.Roles.Peer.{Abort, Challenge, Error, Welcome}
-  alias Wampex.Router
-  alias Wampex.Serializers.JSON
-  alias Broker.{Subscribed, Published, Event, Unsubscribed}
-  alias Dealer.{Invocation, Registered, Result, Unregistered}
-  alias Router.Authentication
-
-  @enforce_keys [:transport, :transport_pid]
-  defstruct [
-    :id,
-    :db,
-    :proxy,
-    :transport,
-    :transport_pid,
-    :message,
-    :hello,
-    :authenticate,
-    :register,
-    :unregister,
-    :call,
-    :yield,
-    :subscribe,
-    :unsubscribe,
-    :publish,
-    :realm,
-    :name,
-    :goodbye,
-    :peer_information,
-    :challenge,
-    error: "wamp.error.protocol_violation",
-    authentication: Authentication,
-    roles: [Peer, Broker, Dealer],
-    registrations: [],
-    subscriptions: [],
-    invocations: [],
-    message_queue: [],
-    request_id: 0,
-    requests: [],
-    hello_received: false
-  ]
-
-  @type t :: %__MODULE__{
-          id: integer() | nil,
-          db: module() | nil,
-          proxy: module() | nil,
-          transport_pid: pid() | module() | nil,
-          message: Wampex.message() | nil,
-          hello: Wampex.hello() | nil,
-          authenticate: Wampex.authenticate() | nil,
-          register: Wampex.register() | nil,
-          unregister: Wampex.unregister() | nil,
-          call: Wampex.call() | nil,
-          yield: Wampex.yield() | nil,
-          subscribe: Wampex.subscribe() | nil,
-          unsubscribe: Wampex.unsubscribe() | nil,
-          publish: Wampex.publish() | nil,
-          goodbye: binary() | nil,
-          realm: String.t() | nil,
-          name: module() | nil,
-          challenge: map() | nil,
-          roles: [module()],
-          registrations: [],
-          subscriptions: [],
-          invocations: [],
-          message_queue: [],
-          request_id: integer(),
-          requests: [],
-          hello_received: boolean()
-        }
-
-  ## States
-  @init "Init"
-  @established "Established"
-  @hello "Hello"
-  @authenticate "Authenticate"
-  @call "Call"
-  @yield "Yield"
-  @register "Register"
-  @unregister "Unregister"
-  @subscribe "Subscribe"
-  @unsubscribe "Unsubscribe"
-  @publish "Publish"
-  # @error "Error"
-  @abort "Abort"
-  @goodbye "Goodbye"
-
-  ## Resources
-  @handle_init "HandleInit"
-  @handle_established "HandleEstablished"
-  @handle_message "HandleMessage"
-  @handle_hello "HandleHello"
-  @handle_authenticate "HandleAuthenticate"
-  @handle_call "HandleCall"
-  @handle_yield "HandleYield"
-  @handle_register "HandleRegister"
-  @handle_unregister "HandleUnregister"
-  @handle_subscribe "HandleSubscribe"
-  @handle_unsubscribe "HandleUnsubscribe"
-  @handle_publish "HandlePublish"
-  # @handle_interrupt "HandleInterrupt"
-  @handle_abort "HandleAbort"
-  @handle_goodbye "HandleGoodbye"
-  # @handle_cancel "HandleCancel"
-
-  def get_global_id do
-    Enum.random(0..@max_id)
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_init,
-        _,
-        @init,
-        data
-      ) do
-    debug("Init")
-
-    {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}},
-     [{:next_event, :internal, :transition}]}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_established,
-        _,
-        @established,
-        data
-      ) do
-    debug("Established")
-    {:ok, data, []}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_message,
-        _,
-        _,
-        %SL{data: %Sess{message: msg, roles: roles}} = data
-      ) do
-    debug("Handling Message #{inspect(msg)}")
-
-    {data, actions} =
-      case handle_message(msg, roles) do
-        {actions, _id, response} ->
-          {data, response} = maybe_update_response(data, response)
-          debug("Response: #{inspect(response)}")
-          {data, actions}
-
-        nil ->
-          {data, [{:next_event, :internal, :abort}]}
-      end
-
-    {:ok, data, actions}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_hello,
-        _,
-        @hello,
-        %SL{
-          data:
-            %Sess{
-              id: id,
-              transport: tt,
-              transport_pid: t,
-              hello: {:hello, realm, dets},
-              hello_received: false,
-              authentication: auth
-            } = data
-        } = sl
-      ) do
-    Logger.info("Hello #{inspect(dets)}")
-
-    {actions, challenge} =
-      case dets do
-        %{"authid" => ai, "authmethods" => am} ->
-          case auth.authenticate?(am) do
-            true ->
-              ch = auth.challenge(realm, ai, id)
-
-              chal = %Challenge{
-                auth_method: auth.method(),
-                options: ch
-              }
-
-              send_to_peer(
-                Peer.challenge(chal),
-                tt,
-                t
-              )
-
-              {[], ch}
-
-            false ->
-              {[{:next_event, :internal, :abort}], nil}
-          end
-
-        %{} ->
-          {[{:next_event, :internal, :abort}], nil}
-      end
-
-    {:ok,
-     %SL{
-       sl
-       | data: %Sess{
-           data
-           | challenge: challenge,
-             hello_received: true,
-             realm: realm,
-             peer_information: dets
-         }
-     }, [{:next_event, :internal, :transition}] ++ actions}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_authenticate,
-        _,
-        @authenticate,
-        %SL{
-          data: %Sess{
-            id: session_id,
-            transport: tt,
-            transport_pid: t,
-            authentication: auth,
-            challenge: challenge,
-            realm: realm,
-            authenticate: {:authenticate, sig, _dets}
-          }
-        } = sl
-      ) do
-    ch = JSON.deserialize!(challenge.challenge)
-    authid = get_in(ch, ["authid"])
-    authrole = get_in(ch, ["authrole"])
-    authmethod = get_in(ch, ["authmethod"])
-    authprovider = get_in(ch, ["authprovider"])
-
-    actions =
-      case auth.authenticate(sig, realm, authid, challenge) do
-        true ->
-          send_to_peer(
-            Peer.welcome(%Welcome{
-              session_id: session_id,
-              options: %{
-                agent: "WAMPex Router",
-                authid: authid,
-                authrole: authrole,
-                authmethod: authmethod,
-                authprovider: authprovider,
-                roles: %{broker: %{}, dealer: %{}}
-              }
-            }),
-            tt,
-            t
-          )
-
-          [{:next_event, :internal, :transition}]
-
-        false ->
-          [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}]
-      end
-
-    {:ok, sl, actions}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_hello,
-        _,
-        @hello,
-        %SL{
-          data:
-            %Sess{
-              hello_received: true
-            } = data
-        } = sl
-      ) do
-    {:ok,
-     %SL{
-       sl
-       | data: %Sess{
-           data
-           | error: "wamp.error.protocol_violation"
-         }
-     }, [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}]}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_subscribe,
-        _,
-        @subscribe,
-        %SL{
-          data:
-            %Sess{
-              transport: tt,
-              transport_pid: t,
-              subscriptions: subs,
-              realm: realm,
-              subscribe: {:subscribe, ri, opts, topic},
-              db: db
-            } = data
-        } = sl
-      ) do
-    id = get_global_id()
-
-    wc =
-      case opts do
-        %{"match" => "wildcard"} ->
-          ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "")
-          true
-
-        _ ->
-          ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}})
-          false
-      end
-
-    send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t)
-
-    {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic, wc} | subs]}},
-     [{:next_event, :internal, :transition}]}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_unsubscribe,
-        _,
-        @unsubscribe,
-        %SL{
-          data: %Sess{
-            realm: realm,
-            transport: tt,
-            transport_pid: t,
-            db: db,
-            subscriptions: subs,
-            unsubscribe: {:unsubscribe, rid, subscription_id}
-          }
-        } = sl
-      ) do
-    subs = remove_subscription(subs, subscription_id, realm, db)
-
-    send_to_peer(Broker.unsubscribed(%Unsubscribed{request_id: rid}), tt, t)
-
-    {:ok, %SL{sl | data: %Sess{sl.data | subscriptions: subs}},
-     [{:next_event, :internal, :transition}]}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_publish,
-        _,
-        @publish,
-        %SL{
-          data: %Sess{
-            proxy: proxy,
-            realm: realm,
-            transport: tt,
-            transport_pid: t,
-            db: db,
-            publish: {:publish, rid, opts, topic, arg_l, arg_kw}
-          }
-        } = sl
-      ) do
-    pub_id = get_global_id()
-
-    {_, _, subs} =
-      {db, {realm, topic}, []}
-      |> get_subscribers()
-      |> get_prefix()
-      |> get_wildcard()
-
-    subs = Enum.uniq(subs)
-
-    Enum.each(subs, fn {id, {pid, node}} ->
-      send(
-        {proxy, node},
-        {%Event{
-           subscription_id: id,
-           publication_id: pub_id,
-           arg_list: arg_l,
-           arg_kw: arg_kw,
-           options: opts
-         }, pid}
-      )
-    end)
-
-    case opts do
-      %{acknowledge: true} ->
-        send_to_peer(Broker.published(%Published{request_id: rid, publication_id: pub_id}), tt, t)
-
-      %{} ->
-        :noop
-    end
-
-    {:ok, sl, [{:next_event, :internal, :transition}]}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_unregister,
-        _,
-        @unregister,
-        %SL{
-          data:
-            %Sess{
-              db: db,
-              transport: tt,
-              transport_pid: t,
-              registrations: regs,
-              realm: realm,
-              unregister: {:unregister, request_id, registration_id}
-            } = data
-        } = sl
-      ) do
-    regs = remove_registration(regs, realm, registration_id, db)
-
-    send_to_peer(Dealer.unregistered(%Unregistered{request_id: request_id}), tt, t)
-    {:ok, %SL{sl | data: %Sess{data | registrations: regs}}}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_register,
-        _,
-        @register,
-        %SL{
-          data:
-            %Sess{
-              transport: tt,
-              transport_pid: t,
-              registrations: regs,
-              db: db,
-              realm: realm,
-              register: {:register, rid, _opts, procedure}
-            } = data
-        } = sl
-      ) do
-    id = get_global_id()
-    ClusterKV.put(db, realm, procedure, {id, {self(), Node.self()}})
-    regd = %Registered{request_id: rid, registration_id: id}
-    send_to_peer(Dealer.registered(regd), tt, t)
-
-    {:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}},
-     [{:next_event, :internal, :transition}]}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_call,
-        _,
-        @call,
-        %SL{
-          data:
-            %Sess{
-              db: db,
-              proxy: proxy,
-              transport: tt,
-              transport_pid: t,
-              realm: realm,
-              request_id: ri,
-              call: {:call, call_id, dets, proc, al, akw}
-            } = data
-        } = sl
-      ) do
-    data =
-      with {_key, callees} <- ClusterKV.get(db, realm, proc, 500),
-           {id, {pid, node}} <- get_live_callee(proxy, callees) do
-        req_id = get_request_id(ri)
-        dets = Map.put(dets, "procedure", proc)
-
-        send(
-          {proxy, node},
-          {
-            {
-              call_id,
-              %Invocation{
-                request_id: req_id,
-                registration_id: id,
-                options: dets,
-                arg_list: al,
-                arg_kw: akw
-              },
-              {self(), Node.self()}
-            },
-            pid
-          }
-        )
-
-        %SL{sl | data: %Sess{data | request_id: req_id}}
-      else
-        {:error, :no_live_callees} ->
-          send_to_peer(
-            Caller.call_error(%Error{
-              request_id: call_id,
-              error: "wamp.error.no_callees",
-              details: %{procedure: proc}
-            }),
-            tt,
-            t
-          )
-
-          sl
-
-        :not_found ->
-          send_to_peer(
-            Caller.call_error(%Error{
-              request_id: call_id,
-              error: "wamp.error.no_registration",
-              details: %{procedure: proc}
-            }),
-            tt,
-            t
-          )
-
-          sl
-      end
-
-    {:ok, data, [{:next_event, :internal, :transition}]}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_yield,
-        _,
-        @yield,
-        %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} =
-          data
-      ) do
-    {call_id, _, {pid, node}} =
-      Enum.find(inv, fn
-        {_, %Invocation{request_id: ^id}, _} -> true
-        _ -> false
-      end)
-
-    send(
-      {proxy, node},
-      {%Result{request_id: call_id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid}
-    )
-
-    inv =
-      Enum.filter(inv, fn
-        {_, %Invocation{request_id: ^id}, _} -> false
-        _ -> true
-      end)
-
-    {:ok, %SL{data | data: %Sess{data.data | invocations: inv}},
-     [{:next_event, :internal, :transition}]}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_abort,
-        _,
-        @abort,
-        %SL{data: %Sess{transport: tt, transport_pid: t, error: er}} = data
-      ) do
-    send_to_peer(Peer.abort(%Abort{reason: er}), tt, t)
-    Logger.warn("Aborting: #{data.data.error}")
-    {:ok, data, []}
-  end
-
-  @impl true
-  def handle_resource(
-        @handle_goodbye,
-        _,
-        @goodbye,
-        %SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data
-      ) do
-    send_to_peer(Peer.goodbye(goodbye), tt, t)
-    {:ok, data, []}
-  end
-
-  @impl true
-  def handle_resource(resource, _, state, data) do
-    Logger.error("No specific resource handler for #{resource} in state #{state}")
-    {:ok, data, []}
-  end
-
-  @impl true
-  def handle_info(
-        {call_id, %Invocation{} = e, from},
-        _,
-        %SL{data: %Sess{transport: tt, transport_pid: t, invocations: inv}} = data
-      ) do
-    send_to_peer(Dealer.invocation(e), tt, t)
-    {:ok, %SL{data | data: %Sess{data.data | invocations: [{call_id, e, from} | inv]}}, []}
-  end
-
-  @impl true
-  def handle_info(%Event{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
-    send_to_peer(Broker.event(e), tt, t)
-    {:ok, data, []}
-  end
-
-  @impl true
-  def handle_info(%Result{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
-    send_to_peer(Dealer.result(e), tt, t)
-    {:ok, data, []}
-  end
-
-  @impl true
-  def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do
-    {:ok, %SL{data | data: %Sess{sess | message: message}},
-     [{:next_event, :internal, :message_received}]}
-  end
-
-  @impl true
-  def handle_info(event, state, data) do
-    Logger.warn("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}")
-    {:ok, data, []}
-  end
-
-  @impl true
-  def handle_termination(reason, _, %SL{
-        data: %Sess{
-          db: db,
-          transport: tt,
-          transport_pid: t,
-          registrations: regs,
-          realm: realm,
-          subscriptions: subs
-        }
-      }) do
-    Logger.warn("Router session terminating #{inspect(reason)}")
-
-    Enum.each(regs, fn {id, _proc} ->
-      remove_registration(regs, realm, id, db)
-    end)
-
-    Enum.each(subs, fn {id, _proc, _} ->
-      remove_subscription(subs, id, realm, db)
-    end)
-
-    send_to_peer(
-      Peer.abort(%Abort{reason: "wamp.error.protocol_violation"}),
-      tt,
-      t
-    )
-  end
-
-  defp remove_subscription(subs, subscription_id, realm, db) do
-    {id, topic, wc} =
-      sub =
-      Enum.find(subs, fn
-        {^subscription_id, _topic, _} -> true
-        _ -> false
-      end)
-
-    {key, filter} =
-      case wc do
-        true ->
-          key = ClusterKV.get_wildcard_key(topic, ".", ":", "")
-
-          {key,
-           fn {id, values}, value ->
-             {id,
-              Enum.filter(values, fn
-                {_, ^value} -> false
-                _ -> true
-              end)}
-           end}
-
-        false ->
-          {topic,
-           fn {id, values}, value ->
-             {id, List.delete(values, value)}
-           end}
-      end
-
-    ClusterKV.update(db, realm, key, {id, {self(), Node.self()}}, filter)
-    List.delete(subs, sub)
-  end
-
-  defp remove_registration(regs, realm, registration_id, db) do
-    {id, proc} =
-      reg =
-      Enum.find(regs, fn
-        {^registration_id, _proc} -> true
-        _ -> false
-      end)
-
-    filter = fn {id, values}, value ->
-      {id, List.delete(values, value)}
-    end
-
-    ClusterKV.update(db, realm, proc, {id, {self(), Node.self()}}, filter)
-    List.delete(regs, reg)
-  end
-
-  defp get_live_callee(_proxy, []), do: {:error, :no_live_callees}
-
-  defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do
-    case GenServer.call({proxy, node}, {:is_up, pid}) do
-      true -> c
-      false -> get_live_callee(proxy, t)
-    end
-  end
-
-  defp get_subscribers({db, {keyspace, key}, acc}) do
-    case ClusterKV.get(db, keyspace, key, 500) do
-      {_, subscribers} -> {db, {keyspace, key}, acc ++ subscribers}
-      _ -> {db, {keyspace, key}, acc}
-    end
-  end
-
-  defp get_prefix({db, {keyspace, key}, acc}) do
-    l = ClusterKV.prefix(db, keyspace, key, ".", 2, 500)
-    {db, {keyspace, key}, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)}
-  end
-
-  defp get_wildcard({db, {keyspace, key}, acc}) do
-    l = ClusterKV.wildcard(db, keyspace, key, ".", ":", "")
-    {db, {keyspace, key}, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)}
-  end
-
-  defp send_to_peer(msg, transport, pid) do
-    transport.send_request(pid, remove_nil_values(msg))
-  end
-
-  defp maybe_update_response(data, {:update, key, resp}) do
-    {%SL{data | data: Map.put(data.data, key, resp)}, resp}
-  end
-
-  defp maybe_update_response(data, resp), do: {data, resp}
-
-  defp remove_nil_values(message) do
-    message =
-      case Enum.reverse(message) do
-        [map | t] when map == %{} -> t
-        [nil | t] -> t
-        m -> m
-      end
-
-    message =
-      case message do
-        [list | t] when list == [] ->
-          t
-
-        [nil | t] ->
-          t
-
-        m ->
-          m
-      end
-
-    Enum.reverse(message)
-  end
-
-  defp get_request_id(current_id) when current_id == @max_id do
-    1
-  end
-
-  defp get_request_id(current_id) do
-    current_id + 1
-  end
-
-  defp handle_message(msg, roles) do
-    Enum.reduce_while(roles, nil, fn r, _ ->
-      try do
-        res = r.handle(msg)
-        {:halt, res}
-      rescue
-        FunctionClauseError -> {:cont, nil}
-      end
-    end)
-  end
-end
diff --git a/lib/router/transports/web_socket.ex b/lib/router/transports/web_socket.ex
deleted file mode 100644 (file)
index ae36b57..0000000
+++ /dev/null
@@ -1,104 +0,0 @@
-defmodule Wampex.Router.Transports.WebSocket do
-  @moduledoc false
-  @behaviour :cowboy_websocket
-
-  require Logger
-
-  alias __MODULE__
-  alias Wampex.Router.Session
-  alias Wampex.Serializers.{JSON, MessagePack}
-
-  @protocol_header "sec-websocket-protocol"
-  @json 'wamp.2.json'
-  @msgpack "wamp.2.msgpack"
-
-  defstruct [:serializer, :session]
-
-  def send_request(transport, message) do
-    send(transport, {:send_request, message})
-  end
-
-  @impl true
-  def init(req, %{db: db, proxy: proxy}) do
-    {:ok, serializer, protocol} = get_serializer(req)
-    req = :cowboy_req.set_resp_header(@protocol_header, protocol, req)
-    {:cowboy_websocket, req, {%WebSocket{serializer: serializer}, db, proxy}}
-  end
-
-  @impl true
-  def websocket_init({state, db, proxy}) do
-    {:ok, session} = start_session(db, proxy)
-    Logger.info("Websocket Initialized: #{inspect(state)}")
-    {:ok, %WebSocket{state | session: session}}
-  end
-
-  @impl true
-  def websocket_handle({type, payload}, state) do
-    Logger.debug("Received #{type} Payload: #{payload}")
-    handle_payload({payload, state})
-    {:ok, state, :hibernate}
-  end
-
-  @impl true
-  def websocket_handle(:ping, state) do
-    {[:pong], state, :hibernate}
-  end
-
-  @impl true
-  def websocket_handle(unknown, state) do
-    Logger.warn("Unknown websocket frame #{inspect(unknown)}")
-    {:ok, state, :hibernate}
-  end
-
-  @impl true
-  def websocket_info({:send_request, message}, %WebSocket{serializer: s} = state) do
-    {[{s.data_type(), s.serialize!(message)}], state, :hibernate}
-  end
-
-  @impl true
-  def websocket_info(
-        {:EXIT, _pid = session_pid, reason},
-        %WebSocket{session: session_pid} = state
-      ) do
-    Logger.warn("Session ended because #{inspect(reason)}, closing connection")
-    {:stop, state}
-  end
-
-  @impl true
-  def websocket_info(event, state) do
-    Logger.debug("Received unhandled event: #{inspect(event)}")
-    {:ok, state}
-  end
-
-  @impl true
-  def terminate(reason, _req, _state) do
-    Logger.info("Websocket closing for reason #{inspect(reason)}")
-    :ok
-  end
-
-  defp handle_payload({payload, %WebSocket{serializer: s} = state}) do
-    payload
-    |> s.deserialize!()
-    |> handle_event(state)
-  end
-
-  defp handle_event(ms, %WebSocket{session: s} = state) do
-    send(s, {:set_message, ms})
-    {:ok, state}
-  end
-
-  defp start_session(db, proxy) do
-    Session.start_link(%Session{db: db, proxy: proxy, transport: WebSocket, transport_pid: self()})
-  end
-
-  defp get_serializer(req) do
-    @protocol_header
-    |> :cowboy_req.parse_header(req)
-    |> parse_protocol()
-  end
-
-  defp parse_protocol([]), do: Logger.error("Unknown protocol")
-  defp parse_protocol([@json | _]), do: {:ok, JSON, @json}
-  defp parse_protocol([@msgpack | _]), do: {:ok, MessagePack, @msgpack}
-  defp parse_protocol([_ | t]), do: parse_protocol(t)
-end
diff --git a/lib/transport.ex b/lib/transport.ex
deleted file mode 100644 (file)
index b801962..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-defmodule Wampex.Transport do
-  @moduledoc "Behaviour for Transports"
-  @callback send_request(transport :: atom() | pid(), message :: Wampex.message()) :: :ok
-  @callback start_link(
-              url: binary(),
-              session: Wampex.Client.Session.t(),
-              protocol: binary(),
-              serializer: module(),
-              opts: []
-            ) ::
-              {:ok, pid}
-              | {:error, term}
-end
diff --git a/mix.exs b/mix.exs
index 33ccc3e47a730b78af2f7f5cefe369b5fb6ff622..3c600899147624b05be710a8d5ab4b0e15901c02 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -37,23 +37,13 @@ defmodule Wampex.MixProject do
 
   defp deps do
     [
-      # {:cluster_kv, path: "../cluster_kv"},
-      {:cluster_kv,
-       git: "https://gitlab.com/entropealabs/cluster_kv.git",
-       tag: "4c36b8d68f40711cd167edb9917d32a992f49561"},
-      {:cors_plug, "~> 2.0"},
       {:credo, "~> 1.2", only: [:dev, :test], runtime: false},
       {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
-      {:ecto_sql, "~> 3.0"},
       {:ex_doc, "~> 0.21", only: :dev, runtime: false},
       {:excoveralls, "~> 0.12.2", only: [:dev, :test], runtime: false},
       {:jason, "~> 1.1"},
       {:msgpack, "~> 0.7.0"},
-      {:pbkdf2, "~> 2.0"},
-      {:plug_cowboy, "~> 2.1"},
-      {:postgrex, ">= 0.0.0"},
-      {:states_language, "~> 0.2"},
-      {:websockex, "~> 0.4"}
+      {:pbkdf2, "~> 2.0"}
     ]
   end
 
index 0ec59d6d5ec7cce80dd0a7e826b66f753fe5468f..86f165b78a109574044d8ea7bdcb78ba7f891a26 100644 (file)
@@ -18,7 +18,7 @@ defmodule Wampex.Router.Authentication.Repo.Migrations.CreateRealms do
       add(:salt, :string, null: false)
       add(:iterations, :integer, null: false)
       add(:keylen, :integer, null: false)
-      add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all))
+      add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all), null: false)
       timestamps()
     end
 
index 4e874f9e25fa4052ad709821944e110ec4dcef18..b3fbc6a392d24f46a891892584f0b7ee0196c1b2 100644 (file)
@@ -351,6 +351,21 @@ defmodule WampexTest do
     assert is_binary(id)
   end
 
+  @tag :client
+  test "admin callee is invoked and responds with error" do
+    caller_name = TestAdminErrorCaller
+    Client.start_link(name: caller_name, session: @session)
+
+    assert {:error, 48, "Error registering user", %{}, [], %{}} =
+             Client.send_request(
+               caller_name,
+               Caller.call(%Call{
+                 procedure: "admin.create_user",
+                 arg_kw: %{authid: "chris", password: "woot!", realm: "not.real"}
+               })
+             )
+  end
+
   @tag :client
   test "callee is invoked and responds and caller gets result" do
     callee_name = TestCalleeRespond