From: Christopher Date: Sat, 4 Apr 2020 15:21:46 +0000 (-0500) Subject: handle disconnects X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=855fe4dd06bc34ced91158b286eeeb6ee74db6eb;p=wampex_client.git handle disconnects --- diff --git a/lib/client.ex b/lib/client.ex index b9815b4..b6fe5a0 100644 --- a/lib/client.ex +++ b/lib/client.ex @@ -16,16 +16,19 @@ defmodule Wampex.Client do alias Wampex.Roles.Publisher.Publish alias Wampex.Roles.Subscriber.Subscribe - @spec start_link(name: atom(), session_data: Sess.t()) :: + @spec start_link(name: atom(), session_data: Sess.t(), reconnect: boolean()) :: {:ok, pid()} | {:error, {:already_started, pid()} | {:shutdown, term()} | term()} - def start_link(name: name, session: session_data) when is_atom(name) do - Supervisor.start_link(__MODULE__, {name, session_data}, name: name) + def start_link(name: name, session: session_data, reconnect: reconnect) when is_atom(name) do + Supervisor.start_link(__MODULE__, {name, session_data, reconnect}, name: name) end - @spec init({atom(), Sess.t()}) :: + @spec init({atom(), Sess.t(), boolean()}) :: {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore - def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do + def init( + {name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data, + reconnect} + ) do session = session_name(name) subscriber_registry = subscriber_registry_name(name) callee_registry = callee_registry_name(name) @@ -33,14 +36,22 @@ defmodule Wampex.Client do session_data = %Sess{session_data | name: name, transport_pid: transport} children = [ - {Sess, [{:local, session}, session_data, []]}, - {t, [url: url, session: session, protocol: p, serializer: s, opts: [name: transport]]}, {Registry, [keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]}, - {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]} + {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]}, + {Sess, [{:local, session}, session_data, []]}, + {t, + [ + url: url, + session: session, + protocol: p, + serializer: s, + reconnect: reconnect, + opts: [name: transport] + ]} ] - Supervisor.init(children, strategy: :one_for_all, max_restarts: 0) + Supervisor.init(children, strategy: :one_for_all, max_restarts: 10) end @spec session_name(module()) :: module() @@ -55,17 +66,27 @@ defmodule Wampex.Client do @spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) :: {:ok, integer()} def subscribe(name, %Subscribe{topic: t} = sub, timeout \\ 5000) do - {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(sub), timeout) - Registry.register(subscriber_registry_name(name), id, t) - {:ok, id} + case sync(name, Subscriber.subscribe(sub), timeout) do + {:ok, id} -> + Registry.register(subscriber_registry_name(name), id, t) + {:ok, id} + + er -> + er + end end @spec register(name :: module(), register :: Register.t(), timeout :: integer()) :: {:ok, integer()} def register(name, %Register{procedure: p} = reg, timeout \\ 5000) do - {:ok, id} = Sess.send_request(session_name(name), Callee.register(reg), timeout) - Registry.register(callee_registry_name(name), id, p) - {:ok, id} + case sync(name, Callee.register(reg), timeout) do + {:ok, id} -> + Registry.register(callee_registry_name(name), id, p) + {:ok, id} + + er -> + er + end end @spec yield(name :: module(), yield :: Yield.t()) :: :ok @@ -91,11 +112,32 @@ defmodule Wampex.Client do @spec sync(name :: module(), request :: Wampex.message(), timeout :: integer()) :: term() def sync(name, request, timeout \\ 5000) do - Sess.send_request(session_name(name), request, timeout) + case session_exists(name) do + false -> + %Error{error: "wamp.error.no_session"} + + sess_name -> + Sess.send_request(sess_name, request, timeout) + end end @spec cast(name :: module(), Wampex.message()) :: :ok def cast(name, request) do - Sess.cast_send_request(session_name(name), request) + case session_exists(name) do + false -> + %Error{error: "wamp.error.no_session"} + + sess_name -> + Sess.cast_send_request(sess_name, request) + end + end + + defp session_exists(name) do + name = session_name(name) + + case Process.whereis(name) do + nil -> false + _pid -> name + end end end diff --git a/lib/client/session.ex b/lib/client/session.ex index 8222a53..fcf1b3c 100644 --- a/lib/client/session.ex +++ b/lib/client/session.ex @@ -356,6 +356,12 @@ defmodule Wampex.Client.Session do [{:next_event, :internal, :message_received}]} end + @impl true + def handle_info({:connected, true}, _, _s) do + Logger.info("Reconnected, reset.") + exit(:reset) + end + defp handle_response(nil, actions, requests, _), do: {requests, actions} defp handle_response(id, actions, requests, response) do diff --git a/lib/client/transports/web_socket.ex b/lib/client/transports/web_socket.ex index 00b8a0b..0b7db59 100644 --- a/lib/client/transports/web_socket.ex +++ b/lib/client/transports/web_socket.ex @@ -9,6 +9,7 @@ defmodule Wampex.Client.Transports.WebSocket do @header "sec-websocket-protocol" @ping_interval 20_000 + @max_reconnect_interval 2 * 60_000 @impl true def send_request(t, data) do @@ -21,23 +22,50 @@ defmodule Wampex.Client.Transports.WebSocket do session: session, protocol: protocol, serializer: serializer, + reconnect: reconnect, opts: opts ) do + opts = Keyword.put(opts, :handle_initial_conn_failure, true) + url |> Conn.new(extra_headers: [{@header, protocol}]) |> WebSockex.start_link( __MODULE__, - %{session: session, serializer: serializer, connected: false}, + %{ + session: session, + serializer: serializer, + connected: false, + reconnect: reconnect, + reconnect_attempts: 1 + }, opts ) end + defp get_backoff_delay(attempts) do + backoff = :math.pow(2, attempts) * 100 + ceil(min(backoff, @max_reconnect_interval)) + end + + @impl true + def handle_disconnect(_status, %{reconnect: false} = state) do + {:ok, state} + end + + @impl true + def handle_disconnect(_status, %{reconnect_attempts: attempts} = state) do + delay = get_backoff_delay(attempts) + Logger.warn("Connection disconnected. Attempting reconnect in #{delay}") + :timer.sleep(delay) + {:reconnect, %{state | reconnect_attempts: attempts + 1}} + end + @impl true def handle_connect(%Conn{host: h, port: p, path: pa, query: q} = _conn, state) do Logger.debug("Connected to #{h}:#{p}#{pa}?#{q}") Process.send_after(self(), :ping, @ping_interval) send(state.session, {:connected, true}) - {:ok, %{state | connected: true}} + {:ok, %{state | reconnect_attempts: 1, connected: true}} end @impl true