# Used by "mix format"
[
- inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
+ inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
+ line_length: 120
]
defmodule Wampex.Router do
use Supervisor
- alias Wampex.Router.{Admin, Authentication, Proxy, REST}
+ alias Wampex.Router.{Admin, Authentication, Realms, REST}
alias Wampex.Router.Authentication.EnsureDefaultAdmin
alias Wampex.Router.Transports.WebSocket
)
end
- @spec init(
- {module(), pos_integer(), list(), integer(), integer(), String.t(), String.t(),
- String.t()}
- ) ::
+ @spec init({module(), pos_integer(), list(), integer(), integer(), String.t(), String.t(), String.t()}) ::
{:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
def init({name, port, topologies, replicas, quorum, admin_realm, admin_authid, admin_password}) do
children = [
+ {ClusterKV, name: db_name(name), topologies: topologies, replicas: replicas, quorum: quorum},
Authentication.Repo,
{EnsureDefaultAdmin, [uri: admin_realm, authid: admin_authid, password: admin_password]},
- {ClusterKV,
- [
- name: db_name(name),
- topologies: topologies,
- replicas: replicas,
- quorum: quorum
- ]},
- {Proxy, [name: proxy_name(name)]},
- {Admin,
- [name: admin_name(name), db: db_name(name), realm: admin_realm, proxy: proxy_name(name)]},
+ {Realms, name: realms_name(name)},
+ {Admin, name: admin_name(name), db: db_name(name), realm: admin_realm, realms: realms_name(name)},
{Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch(name)]}
]
end
def db_name(name), do: Module.concat([name, KV])
- def proxy_name(name), do: Module.concat([name, Proxy])
+ def realms_name(name), do: Module.concat([name, Realms])
def admin_name(name), do: Module.concat([name, Admin])
defp dispatch(name) do
[
{:_,
[
- {"/ws", WebSocket, %{db: db_name(name), proxy: proxy_name(name)}},
+ {"/ws", WebSocket, %{db: db_name(name), name: name}},
{:_, Plug.Cowboy.Handler, {REST, []}}
]}
]
alias Wampex.Roles.Dealer.{Invocation, Result}
alias Wampex.Roles.Peer.Error
alias Wampex.Router.Authentication.{User, Realm}
- alias Wampex.Router.Session
+ alias Wampex.Router.{Realms, Session}
@procedures [
"admin.create_user",
"admin.create_realm"
]
- def start_link(name: name, db: db, realm: realm, proxy: proxy) do
- GenServer.start_link(__MODULE__, {db, realm, proxy}, name: name)
+ def start_link(name: name, db: db, realm: realm, realms: realms) do
+ GenServer.start_link(__MODULE__, {db, realm, realms}, name: name)
end
- def init({db, realm, proxy}) do
- {:ok, %{db: db, realm: realm, proxy: proxy, regs: []}, {:continue, :ok}}
+ def init({db, realm, realms}) do
+ {:ok, %{db: db, realm: realm, proxy: Realms.start_realm(realms, realm), regs: []}, {:continue, :ok}}
end
def handle_continue(:ok, %{db: db, realm: realm, regs: regs} = state) do
end
def register_procedures(db, realm, regs) do
+ Logger.info("Registering admin procedures: #{inspect(@procedures)}")
+
Enum.reduce(@procedures, regs, fn proc, acc ->
val = {Session.get_global_id(), {self(), Node.self()}}
ClusterKV.put(db, realm, proc, val)
def parse_challenge(challenge) do
ch = JSON.deserialize!(challenge.challenge)
- {get_in(ch, ["authid"]), get_in(ch, ["authrole"]), get_in(ch, ["authmethod"]),
- get_in(ch, ["authprovider"])}
+ {get_in(ch, ["authid"]), get_in(ch, ["authrole"]), get_in(ch, ["authmethod"]), get_in(ch, ["authprovider"])}
end
def authenticate(signature, realm, authid, %{
--- /dev/null
+defmodule Wampex.Router.Realms do
+ @moduledoc false
+ use Supervisor
+ alias Wampex.Router.Realms.Proxy
+ require Logger
+
+ def start_link(name: name) do
+ Supervisor.start_link(__MODULE__, name, name: name)
+ end
+
+ @impl true
+ def init(name) do
+ children = [
+ {DynamicSupervisor, strategy: :one_for_one, name: realm_supervisor_name(name)}
+ ]
+
+ Logger.info("Starting Realms Supervisor: #{inspect(children)}")
+ Supervisor.init(children, strategy: :one_for_one)
+ end
+
+ def start_realm(name, realm) do
+ r_name = realm_name(realm)
+ rs_name = realm_supervisor_name(name)
+ DynamicSupervisor.start_child(rs_name, {Proxy, name: r_name})
+ r_name
+ end
+
+ def realm_supervisor_name(name), do: Module.concat([name, Supervisor])
+ def realm_name(realm), do: Module.concat([__MODULE__, realm])
+end
-defmodule Wampex.Router.Proxy do
+defmodule Wampex.Router.Realms.Proxy do
@moduledoc false
use GenServer
alias Wampex.Router
alias Broker.{Subscribed, Published, Event, Unsubscribed}
alias Dealer.{Invocation, Registered, Result, Unregistered}
- alias Router.Authentication
+ alias Router.{Authentication, Realms}
@enforce_keys [:transport, :transport_pid]
defstruct [
) do
debug("Init")
- {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}},
- [{:next_event, :internal, :transition}]}
+ {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}}, [{:next_event, :internal, :transition}]}
end
@impl true
authentication: auth,
challenge: challenge,
realm: realm,
+ name: name,
authenticate: {:authenticate, sig, _dets}
}
} = sl
) do
info = auth.parse_challenge(challenge)
- actions = maybe_welcome(auth, session_id, sig, realm, challenge, info, tt, t)
+ {actions, proxy} = maybe_welcome({auth, session_id, sig, realm, name, challenge, info, tt, t})
- {:ok, sl, actions}
+ {:ok, %SL{sl | data: %Sess{sl.data | proxy: proxy}}, actions}
end
@impl true
send_to_peer(Broker.unsubscribed(%Unsubscribed{request_id: rid}), tt, t)
- {:ok, %SL{sl | data: %Sess{sl.data | subscriptions: subs}},
- [{:next_event, :internal, :transition}]}
+ {:ok, %SL{sl | data: %Sess{sl.data | subscriptions: subs}}, [{:next_event, :internal, :transition}]}
end
@impl true
@handle_yield,
_,
@yield,
- %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} =
- data
+ %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} = data
) do
{call_id, _, {pid, node}} =
Enum.find(inv, fn
_ -> true
end)
- {:ok, %SL{data | data: %Sess{data.data | invocations: inv}},
- [{:next_event, :internal, :transition}]}
+ {:ok, %SL{data | data: %Sess{data.data | invocations: inv}}, [{:next_event, :internal, :transition}]}
end
@impl true
@impl true
def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do
- {:ok, %SL{data | data: %Sess{sess | message: message}},
- [{:next_event, :internal, :message_received}]}
+ {:ok, %SL{data | data: %Sess{sess | message: message}}, [{:next_event, :internal, :message_received}]}
end
@impl true
end
defp maybe_welcome(
- auth,
- session_id,
- sig,
- realm,
- challenge,
- {authid, authrole, authmethod, authprovider},
- tt,
- t
+ {auth, session_id, sig, realm, name, challenge, {authid, authrole, authmethod, authprovider}, tt, t}
) do
case auth.authenticate(sig, realm, authid, challenge) do
true ->
+ proxy = Realms.start_realm(Router.realms_name(name), realm)
+
send_to_peer(
Peer.welcome(%Welcome{
session_id: session_id,
t
)
- [{:next_event, :internal, :transition}]
+ {[{:next_event, :internal, :transition}], proxy}
false ->
- [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}]
+ {[{:next_event, :internal, :transition}, {:next_event, :internal, :abort}], nil}
end
end
end
@impl true
- def init(req, %{db: db, proxy: proxy}) do
+ def init(req, %{db: db, name: name}) do
{:ok, serializer, protocol} = get_serializer(req)
req = :cowboy_req.set_resp_header(@protocol_header, protocol, req)
- {:cowboy_websocket, req, {%WebSocket{serializer: serializer}, db, proxy}}
+ {:cowboy_websocket, req, {%WebSocket{serializer: serializer}, db, name}}
end
@impl true
- def websocket_init({state, db, proxy}) do
- {:ok, session} = start_session(db, proxy)
+ def websocket_init({state, db, name}) do
+ {:ok, session} = start_session(db, name)
Logger.info("Websocket Initialized: #{inspect(state)}")
{:ok, %WebSocket{state | session: session}}
end
{:ok, state}
end
- defp start_session(db, proxy) do
- Session.start_link(%Session{db: db, proxy: proxy, transport: WebSocket, transport_pid: self()})
+ defp start_session(db, name) do
+ Session.start_link(%Session{db: db, name: name, transport: WebSocket, transport_pid: self()})
end
defp get_serializer(req) do
[
# {:cluster_kv, path: "../cluster_kv"},
{:cluster_kv,
- git: "https://gitlab.com/entropealabs/cluster_kv.git",
- tag: "4c36b8d68f40711cd167edb9917d32a992f49561"},
+ git: "https://gitlab.com/entropealabs/cluster_kv.git", tag: "4c36b8d68f40711cd167edb9917d32a992f49561"},
{:cors_plug, "~> 2.0"},
{:credo, "~> 1.2", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
{:plug_cowboy, "~> 2.1"},
{:postgrex, ">= 0.0.0"},
{:states_language, "~> 0.2"},
- {:wampex,
- git: "https://gitlab.com/entropealabs/wampex.git",
- tag: "7c5d257937ace7776f61ad63f8590c24ef5e382a"},
+ {:wampex, git: "https://gitlab.com/entropealabs/wampex.git", tag: "7c5d257937ace7776f61ad63f8590c24ef5e382a"},
{:wampex_client,
git: "https://gitlab.com/entropealabs/wampex_client.git",
tag: "59424e0d6a7bbc2646ab1cad9e71a8d27a016ac9",
{:ok, sub2} = Client.subscribe(name, %Subscribe{topic: "com.data.test"})
{:ok, sub3} = Client.subscribe(name, %Subscribe{topic: "com.data"})
- {:ok, sub4} =
- Client.subscribe(name, %Subscribe{topic: "com...temp", options: %{"match" => "wildcard"}})
+ {:ok, sub4} = Client.subscribe(name, %Subscribe{topic: "com...temp", options: %{"match" => "wildcard"}})
{:ok, sub5} =
Client.subscribe(name, %Subscribe{
@tag :client
test "Peer.hello" do
- assert [1, "test", %{agent: "WAMPex", roles: %{callee: %{}}}] =
- Peer.hello(%Hello{realm: "test", roles: [Callee]})
+ assert [1, "test", %{agent: "WAMPex", roles: %{callee: %{}}}] = Peer.hello(%Hello{realm: "test", roles: [Callee]})
end
@tag :client
@tag :client
test "Subscriber.subscribe" do
- assert [32, %{test: 1}, "test"] =
- Subscriber.subscribe(%Subscribe{topic: "test", options: %{test: 1}})
+ assert [32, %{test: 1}, "test"] = Subscriber.subscribe(%Subscribe{topic: "test", options: %{test: 1}})
end
@tag :client
@tag :router
test "Dealer.registered" do
- assert [65, 123_456, 765_432] =
- Dealer.registered(%Registered{request_id: 123_456, registration_id: 765_432})
+ assert [65, 123_456, 765_432] = Dealer.registered(%Registered{request_id: 123_456, registration_id: 765_432})
end
@tag :router
@tag :router
test "Dealer.invocation" do
- assert [68, 1234, 1234, %{}, [], %{}] =
- Dealer.invocation(%Invocation{request_id: 1234, registration_id: 1234})
+ assert [68, 1234, 1234, %{}, [], %{}] = Dealer.invocation(%Invocation{request_id: 1234, registration_id: 1234})
assert [68, 1234, 1234, %{}, ["1"], %{}] =
Dealer.invocation(%Invocation{
@tag :router
test "Broker.subscribed" do
- assert [33, 123_456, 123_456] =
- Broker.subscribed(%Subscribed{request_id: 123_456, subscription_id: 123_456})
+ assert [33, 123_456, 123_456] = Broker.subscribed(%Subscribed{request_id: 123_456, subscription_id: 123_456})
end
@tag :router
@tag :router
test "Broker.published" do
- assert [17, 123_456, 654_321] =
- Broker.published(%Published{request_id: 123_456, publication_id: 654_321})
+ assert [17, 123_456, 654_321] = Broker.published(%Published{request_id: 123_456, publication_id: 654_321})
end
@tag :client
caller_name = TestExistCaller
Client.start_link(name: caller_name, session: @session)
- assert {:error, 48, "wamp.error.no_registration", %{"procedure" => "this.should.not.exist"},
- [],
- %{}} =
+ assert {:error, 48, "wamp.error.no_registration", %{"procedure" => "this.should.not.exist"}, [], %{}} =
Client.send_request(
caller_name,
Caller.call(%Call{