alias Wampex.Roles.Publisher.Publish
alias Wampex.Roles.Subscriber.Subscribe
- @spec start_link(name: atom(), session_data: Sess.t()) ::
+ @spec start_link(name: atom(), session_data: Sess.t(), reconnect: boolean()) ::
{:ok, pid()}
| {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
- def start_link(name: name, session: session_data) when is_atom(name) do
- Supervisor.start_link(__MODULE__, {name, session_data}, name: name)
+ def start_link(name: name, session: session_data, reconnect: reconnect) when is_atom(name) do
+ Supervisor.start_link(__MODULE__, {name, session_data, reconnect}, name: name)
end
- @spec init({atom(), Sess.t()}) ::
+ @spec init({atom(), Sess.t(), boolean()}) ::
{:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
- def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do
+ def init(
+ {name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data,
+ reconnect}
+ ) do
session = session_name(name)
subscriber_registry = subscriber_registry_name(name)
callee_registry = callee_registry_name(name)
session_data = %Sess{session_data | name: name, transport_pid: transport}
children = [
- {Sess, [{:local, session}, session_data, []]},
- {t, [url: url, session: session, protocol: p, serializer: s, opts: [name: transport]]},
{Registry,
[keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]},
- {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]}
+ {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]},
+ {Sess, [{:local, session}, session_data, []]},
+ {t,
+ [
+ url: url,
+ session: session,
+ protocol: p,
+ serializer: s,
+ reconnect: reconnect,
+ opts: [name: transport]
+ ]}
]
- Supervisor.init(children, strategy: :one_for_all, max_restarts: 0)
+ Supervisor.init(children, strategy: :one_for_all, max_restarts: 10)
end
@spec session_name(module()) :: module()
@spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) ::
{:ok, integer()}
def subscribe(name, %Subscribe{topic: t} = sub, timeout \\ 5000) do
- {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(sub), timeout)
- Registry.register(subscriber_registry_name(name), id, t)
- {:ok, id}
+ case sync(name, Subscriber.subscribe(sub), timeout) do
+ {:ok, id} ->
+ Registry.register(subscriber_registry_name(name), id, t)
+ {:ok, id}
+
+ er ->
+ er
+ end
end
@spec register(name :: module(), register :: Register.t(), timeout :: integer()) ::
{:ok, integer()}
def register(name, %Register{procedure: p} = reg, timeout \\ 5000) do
- {:ok, id} = Sess.send_request(session_name(name), Callee.register(reg), timeout)
- Registry.register(callee_registry_name(name), id, p)
- {:ok, id}
+ case sync(name, Callee.register(reg), timeout) do
+ {:ok, id} ->
+ Registry.register(callee_registry_name(name), id, p)
+ {:ok, id}
+
+ er ->
+ er
+ end
end
@spec yield(name :: module(), yield :: Yield.t()) :: :ok
@spec sync(name :: module(), request :: Wampex.message(), timeout :: integer()) ::
term()
def sync(name, request, timeout \\ 5000) do
- Sess.send_request(session_name(name), request, timeout)
+ case session_exists(name) do
+ false ->
+ %Error{error: "wamp.error.no_session"}
+
+ sess_name ->
+ Sess.send_request(sess_name, request, timeout)
+ end
end
@spec cast(name :: module(), Wampex.message()) :: :ok
def cast(name, request) do
- Sess.cast_send_request(session_name(name), request)
+ case session_exists(name) do
+ false ->
+ %Error{error: "wamp.error.no_session"}
+
+ sess_name ->
+ Sess.cast_send_request(sess_name, request)
+ end
+ end
+
+ defp session_exists(name) do
+ name = session_name(name)
+
+ case Process.whereis(name) do
+ nil -> false
+ _pid -> name
+ end
end
end
@header "sec-websocket-protocol"
@ping_interval 20_000
+ @max_reconnect_interval 2 * 60_000
@impl true
def send_request(t, data) do
session: session,
protocol: protocol,
serializer: serializer,
+ reconnect: reconnect,
opts: opts
) do
+ opts = Keyword.put(opts, :handle_initial_conn_failure, true)
+
url
|> Conn.new(extra_headers: [{@header, protocol}])
|> WebSockex.start_link(
__MODULE__,
- %{session: session, serializer: serializer, connected: false},
+ %{
+ session: session,
+ serializer: serializer,
+ connected: false,
+ reconnect: reconnect,
+ reconnect_attempts: 1
+ },
opts
)
end
+ defp get_backoff_delay(attempts) do
+ backoff = :math.pow(2, attempts) * 100
+ ceil(min(backoff, @max_reconnect_interval))
+ end
+
+ @impl true
+ def handle_disconnect(_status, %{reconnect: false} = state) do
+ {:ok, state}
+ end
+
+ @impl true
+ def handle_disconnect(_status, %{reconnect_attempts: attempts} = state) do
+ delay = get_backoff_delay(attempts)
+ Logger.warn("Connection disconnected. Attempting reconnect in #{delay}")
+ :timer.sleep(delay)
+ {:reconnect, %{state | reconnect_attempts: attempts + 1}}
+ end
+
@impl true
def handle_connect(%Conn{host: h, port: p, path: pa, query: q} = _conn, state) do
Logger.debug("Connected to #{h}:#{p}#{pa}?#{q}")
Process.send_after(self(), :ping, @ping_interval)
send(state.session, {:connected, true})
- {:ok, %{state | connected: true}}
+ {:ok, %{state | reconnect_attempts: 1, connected: true}}
end
@impl true