alias Wampex.Roles.Caller
alias Wampex.Roles.Caller.Call
alias Wampex.Roles.Dealer.Result
+ alias Wampex.Roles.Peer
alias Wampex.Roles.Peer.Error
alias Wampex.Roles.Publisher
alias Wampex.Roles.Publisher.Publish
{name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data,
reconnect}
) do
- session = session_name(name)
subscriber_registry = subscriber_registry_name(name)
callee_registry = callee_registry_name(name)
- transport = transport_name(name)
- session_data = %Sess{session_data | name: name, transport_pid: transport}
+ session_data = %Sess{session_data | name: name, roles: [Peer | session_data.roles]}
children = [
{Registry,
[keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]},
{Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]},
- {Sess, [{:local, session}, session_data, []]},
{t,
- [
- url: url,
- session: session,
- protocol: p,
- serializer: s,
- reconnect: reconnect,
- opts: [name: transport]
- ]}
+ name: name,
+ url: url,
+ session_data: session_data,
+ protocol: p,
+ serializer: s,
+ reconnect: reconnect}
]
Supervisor.init(children, strategy: :one_for_all, max_restarts: 10)
def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry])
@spec callee_registry_name(module()) :: module()
def callee_registry_name(name), do: Module.concat([name, CalleeRegistry])
- @spec transport_name(module()) :: module()
- def transport_name(name), do: Module.concat([name, Transport])
@spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) ::
{:ok, integer()}
@established "Established"
@init "Init"
@challenge "Challenge"
+ @welcome "Welcome"
@abort "Abort"
@goodbye "Goodbye"
@event "Event"
@handshake "Handshake"
## Resources
- @init_transport "InitTransport"
- @wait_transport "WaitForTransport"
@hello "Hello"
@handle_challenge "HandleChallenge"
+ @handle_welcome "HandleWelcome"
@handle_established "HandleEstablished"
@handle_abort "HandleAbort"
@handle_goodbye "HandleGoodbye"
{:ok, %SL{data | data: %Sess{sess | message_queue: [{request, nil} | mq]}}, []}
end
- @impl true
- def handle_resource(
- @init_transport,
- _,
- @wait_transport,
- data
- ) do
- Logger.debug("Waiting for transport to connect...")
- {:ok, %SL{data | data: %Sess{data.data | roles: [Peer | data.data.roles]}}, []}
- end
-
@impl true
def handle_resource(
@hello,
{:ok, sl, [{:next_event, :internal, :challenged}]}
end
+ @impl true
+ def handle_resource(
+ @handle_welcome,
+ _,
+ @welcome,
+ %SL{data: %Sess{name: name}} = sl
+ ) do
+ sub = Client.subscriber_registry_name(name)
+ reg = Client.callee_registry_name(name)
+ subscribers = Registry.select(sub, [{{:_, :"$1", :_}, [], [:"$1"]}])
+ callees = Registry.select(reg, [{{:_, :"$1", :_}, [], [:"$1"]}])
+ pids = Enum.uniq(subscribers ++ callees)
+
+ Enum.each(pids, fn pid ->
+ send(pid, {:connected, name})
+ end)
+
+ {:ok, sl, [{:next_event, :internal, :established}]}
+ end
+
@impl true
def handle_resource(
@goodbye,
[{:next_event, :internal, :message_received}]}
end
- @impl true
- def handle_info({:connected, true}, _, _s) do
- Logger.info("Reconnected, reset.")
- exit(:reset)
- end
-
defp handle_response(nil, actions, requests, _), do: {requests, actions}
defp handle_response(id, actions, requests, response) do
defmodule Wampex.Client.Transports.WebSocket do
use WebSockex
- @behaviour Wampex.Client.Transport
+ # @behaviour Wampex.Client.Transport
+ alias Wampex.Client
+ alias Wampex.Client.Session
alias WebSockex.Conn
require Logger
@ping_interval 20_000
@max_reconnect_interval 2 * 60_000
- @impl true
def send_request(t, data) do
WebSockex.cast(t, {:send_request, data})
end
- @impl true
def start_link(
+ name: name,
url: url,
- session: session,
+ session_data: session_data,
protocol: protocol,
serializer: serializer,
- reconnect: reconnect,
- opts: opts
+ reconnect: reconnect
) do
- opts = Keyword.put(opts, :handle_initial_conn_failure, true)
+ conn = Conn.new(url, extra_headers: [{@header, protocol}])
- url
- |> Conn.new(extra_headers: [{@header, protocol}])
- |> WebSockex.start_link(
+ WebSockex.start_link(
+ conn,
__MODULE__,
%{
- session: session,
+ name: name,
+ session: nil,
+ session_data: session_data,
serializer: serializer,
connected: false,
reconnect: reconnect,
reconnect_attempts: 1
},
- opts
+ handle_initial_conn_failure: true
)
end
end
@impl true
- def handle_disconnect(_status, %{reconnect_attempts: attempts} = state) do
+ def handle_disconnect(_status, %{session: sess, reconnect_attempts: attempts} = state) do
+ stop_session(sess)
delay = get_backoff_delay(attempts)
Logger.warn("Connection disconnected. Attempting reconnect in #{delay}")
:timer.sleep(delay)
end
@impl true
- def handle_connect(%Conn{host: h, port: p, path: pa, query: q} = _conn, state) do
- Logger.debug("Connected to #{h}:#{p}#{pa}?#{q}")
+ def handle_connect(
+ %Conn{host: h, port: p, path: pa, query: q} = _conn,
+ %{name: name, session_data: sd} = state
+ ) do
+ Logger.info("Connected to #{h}:#{p}#{pa}?#{q}")
Process.send_after(self(), :ping, @ping_interval)
- send(state.session, {:connected, true})
- {:ok, %{state | reconnect_attempts: 1, connected: true}}
+
+ {:ok, sess} =
+ Session.start_link(
+ {:local, Client.session_name(name)},
+ %Session{sd | transport_pid: self()},
+ []
+ )
+
+ {:ok, %{state | session: sess, reconnect_attempts: 1, connected: true}}
end
@impl true
end
@impl true
- def handle_frame({type, message}, %{serializer: s} = state) do
+ def handle_frame({_type, message}, %{serializer: s, session: sess} = state) do
message = s.deserialize!(message)
- Logger.debug("Received #{type} data: #{inspect(message)}")
- send(state.session, {:set_message, message})
+ send(sess, {:set_message, message})
{:ok, state}
end
+
+ def stop_session(nil), do: :noop
+
+ def stop_session(pid) do
+ case Process.alive?(pid) do
+ true -> Process.exit(pid, :normal)
+ false -> :noop
+ end
+ end
end
{:msgpack, "~> 0.7.0"},
{:pbkdf2, "~> 2.0"},
{:states_language, "~> 0.2"},
+ # {:wampex, path: "../wampex"},
{:wampex,
git: "https://gitlab.com/entropealabs/wampex.git",
- tag: "61d834a784f5f14ec90508cf7556a5ebaec113c3"},
+ tag: "7d656d9748cb0e4d08c625d87e315839afecb4b2"},
{:websockex, "~> 0.4.2"}
]
end
{
"Comment": "Session State Machine",
- "StartAt": "WaitForTransport",
+ "StartAt": "Init",
"States": {
- "WaitForTransport": {
- "Type": "Task",
- "Resource": "InitTransport",
- "TransitionEvent": "{:connected, true}",
- "Next": "Init",
- "Catch": [
- {
- "ErrorEquals": ["{:connected, false}"],
- "Next": "Abort"
- }
- ]
- },
"Init": {
"Type": "Task",
"Resource": "Hello",
"StringEquals": ":established",
"Next": "Established"
},
+ {
+ "StringEquals": ":welcome",
+ "Next": "Welcome"
+ },
{
"StringEquals": ":event",
"Next": "Event"
}
]
},
+ "Welcome": {
+ "Type": "Task",
+ "Resource": "HandleWelcome",
+ "TransitionEvent": ":established",
+ "Next": "Established"
+ },
"Interrupt": {
"Type": "Task",
"Resource": "HandleInterrupt",
+++ /dev/null
-defmodule Wampex.Router.Authentication.Repo.Migrations.CreateRealms do
- use Ecto.Migration
-
- def change do
- create table("realms", primary_key: false) do
- add(:id, :binary_id, primary_key: true, default: fragment("gen_random_uuid()"))
- add(:uri, :string, null: false)
- add(:parent, :string, null: true)
- timestamps()
- end
-
- create(unique_index(:realms, [:uri]))
-
- create table("users", primary_key: false) do
- add(:id, :binary_id, primary_key: true, default: fragment("gen_random_uuid()"))
- add(:authid, :string, null: false)
- add(:password, :string, null: false)
- add(:salt, :string, null: false)
- add(:iterations, :integer, null: false)
- add(:keylen, :integer, null: false)
- add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all), null: false)
- timestamps()
- end
-
- create(index(:users, [:authid]))
- create(unique_index(:users, [:authid, :realm_id]))
- end
-end
+++ /dev/null
-{
- "Comment": "Router Session State Machine",
- "StartAt": "Init",
- "States": {
- "Init": {
- "Type": "Task",
- "Resource": "HandleInit",
- "Next": "Established"
- },
- "Established": {
- "Type": "Choice",
- "Resource": "HandleEstablished",
- "Choices": [
- {
- "StringEquals": ":message_received",
- "Next": "Message"
- },
- {
- "StringEquals": ":goodbye",
- "Next": "GoodBye"
- },
- {
- "StringEquals": ":abort",
- "Next": "Abort"
- }
- ]
- },
- "Message": {
- "Type": "Choice",
- "Resource": "HandleMessage",
- "Choices": [
- {
- "StringEquals": ":hello",
- "Next": "Hello"
- },
- {
- "StringEquals": ":authenticate",
- "Next": "Authenticate"
- },
- {
- "StringEquals": ":call",
- "Next": "Call"
- },
- {
- "StringEquals": ":yield",
- "Next": "Yield"
- },
- {
- "StringEquals": ":register",
- "Next": "Register"
- },
- {
- "StringEquals": ":unregister",
- "Next": "Unregister"
- },
- {
- "StringEquals": ":subscribe",
- "Next": "Subscribe"
- },
- {
- "StringEquals": ":unsubscribe",
- "Next": "Unsubscribe"
- },
- {
- "StringEquals": ":publish",
- "Next": "Publish"
- },
- {
- "StringEquals": ":error",
- "Next": "Error"
- },
- {
- "StringEquals": ":abort",
- "Next": "Abort"
- },
- {
- "StringEquals": ":goodbye",
- "Next": "GoodBye"
- },
- {
- "StringEquals": ":cancel",
- "Next": "Cancel"
- }
- ]
- },
- "Hello": {
- "Type": "Task",
- "Resource": "HandleHello",
- "Next": "Established"
- },
- "Authenticate": {
- "Type": "Task",
- "Resource": "HandleAuthenticate",
- "Next": "Established"
- },
- "Call": {
- "Type": "Task",
- "Resource": "HandleCall",
- "Next": "Established",
- "Catch": [
- {
- "ErrorEquals": ["{:error, :no_live_callees}"],
- "Next": "Error"
- }
- ]
- },
- "Yield": {
- "Type": "Task",
- "Resource": "HandleYield",
- "Next": "Established"
- },
- "Register":{
- "Type": "Task",
- "Resource": "HandleRegister",
- "Next": "Established"
- },
- "Unregister":{
- "Type": "Task",
- "Resource": "HandleUnregister",
- "Next": "Established"
- },
- "Subscribe":{
- "Type": "Task",
- "Resource": "HandleSubscribe",
- "Next": "Established"
- },
- "Unsubscribe":{
- "Type": "Task",
- "Resource": "HandleUnsubscribe",
- "Next": "Established"
- },
- "Publish":{
- "Type": "Task",
- "Resource": "HandlePublish",
- "Next": "Established"
- },
- "Error":{
- "Type": "Task",
- "Resource": "HandleError",
- "Next": "Established"
- },
- "GoodBye": {
- "Type": "Task",
- "Resource": "HandleGoodbye",
- "End": true
- },
- "Abort": {
- "Type": "Task",
- "Resource": "HandleAbort",
- "End": true
- }
- }
-}