]> Entropealabs - wampex_client.git/commitdiff
handle disconnects
authorChristopher <chris@entropealabs.com>
Sat, 4 Apr 2020 15:21:46 +0000 (10:21 -0500)
committerChristopher <chris@entropealabs.com>
Sat, 4 Apr 2020 15:21:46 +0000 (10:21 -0500)
lib/client.ex
lib/client/session.ex
lib/client/transports/web_socket.ex

index b9815b4fe878a1463b6b150cdb768c7dabcbab37..b6fe5a0c44046d4be76ddc5a4cba1fed9bd8dfdf 100644 (file)
@@ -16,16 +16,19 @@ defmodule Wampex.Client do
   alias Wampex.Roles.Publisher.Publish
   alias Wampex.Roles.Subscriber.Subscribe
 
-  @spec start_link(name: atom(), session_data: Sess.t()) ::
+  @spec start_link(name: atom(), session_data: Sess.t(), reconnect: boolean()) ::
           {:ok, pid()}
           | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
-  def start_link(name: name, session: session_data) when is_atom(name) do
-    Supervisor.start_link(__MODULE__, {name, session_data}, name: name)
+  def start_link(name: name, session: session_data, reconnect: reconnect) when is_atom(name) do
+    Supervisor.start_link(__MODULE__, {name, session_data, reconnect}, name: name)
   end
 
-  @spec init({atom(), Sess.t()}) ::
+  @spec init({atom(), Sess.t(), boolean()}) ::
           {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
-  def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do
+  def init(
+        {name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data,
+         reconnect}
+      ) do
     session = session_name(name)
     subscriber_registry = subscriber_registry_name(name)
     callee_registry = callee_registry_name(name)
@@ -33,14 +36,22 @@ defmodule Wampex.Client do
     session_data = %Sess{session_data | name: name, transport_pid: transport}
 
     children = [
-      {Sess, [{:local, session}, session_data, []]},
-      {t, [url: url, session: session, protocol: p, serializer: s, opts: [name: transport]]},
       {Registry,
        [keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]},
-      {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]}
+      {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]},
+      {Sess, [{:local, session}, session_data, []]},
+      {t,
+       [
+         url: url,
+         session: session,
+         protocol: p,
+         serializer: s,
+         reconnect: reconnect,
+         opts: [name: transport]
+       ]}
     ]
 
-    Supervisor.init(children, strategy: :one_for_all, max_restarts: 0)
+    Supervisor.init(children, strategy: :one_for_all, max_restarts: 10)
   end
 
   @spec session_name(module()) :: module()
@@ -55,17 +66,27 @@ defmodule Wampex.Client do
   @spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) ::
           {:ok, integer()}
   def subscribe(name, %Subscribe{topic: t} = sub, timeout \\ 5000) do
-    {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(sub), timeout)
-    Registry.register(subscriber_registry_name(name), id, t)
-    {:ok, id}
+    case sync(name, Subscriber.subscribe(sub), timeout) do
+      {:ok, id} ->
+        Registry.register(subscriber_registry_name(name), id, t)
+        {:ok, id}
+
+      er ->
+        er
+    end
   end
 
   @spec register(name :: module(), register :: Register.t(), timeout :: integer()) ::
           {:ok, integer()}
   def register(name, %Register{procedure: p} = reg, timeout \\ 5000) do
-    {:ok, id} = Sess.send_request(session_name(name), Callee.register(reg), timeout)
-    Registry.register(callee_registry_name(name), id, p)
-    {:ok, id}
+    case sync(name, Callee.register(reg), timeout) do
+      {:ok, id} ->
+        Registry.register(callee_registry_name(name), id, p)
+        {:ok, id}
+
+      er ->
+        er
+    end
   end
 
   @spec yield(name :: module(), yield :: Yield.t()) :: :ok
@@ -91,11 +112,32 @@ defmodule Wampex.Client do
   @spec sync(name :: module(), request :: Wampex.message(), timeout :: integer()) ::
           term()
   def sync(name, request, timeout \\ 5000) do
-    Sess.send_request(session_name(name), request, timeout)
+    case session_exists(name) do
+      false ->
+        %Error{error: "wamp.error.no_session"}
+
+      sess_name ->
+        Sess.send_request(sess_name, request, timeout)
+    end
   end
 
   @spec cast(name :: module(), Wampex.message()) :: :ok
   def cast(name, request) do
-    Sess.cast_send_request(session_name(name), request)
+    case session_exists(name) do
+      false ->
+        %Error{error: "wamp.error.no_session"}
+
+      sess_name ->
+        Sess.cast_send_request(sess_name, request)
+    end
+  end
+
+  defp session_exists(name) do
+    name = session_name(name)
+
+    case Process.whereis(name) do
+      nil -> false
+      _pid -> name
+    end
   end
 end
index 8222a53525ba3083d333ef617b69d7bbd5689ac5..fcf1b3c1f3eab9b638f7304923bc89349575be75 100644 (file)
@@ -356,6 +356,12 @@ defmodule Wampex.Client.Session do
      [{:next_event, :internal, :message_received}]}
   end
 
+  @impl true
+  def handle_info({:connected, true}, _, _s) do
+    Logger.info("Reconnected, reset.")
+    exit(:reset)
+  end
+
   defp handle_response(nil, actions, requests, _), do: {requests, actions}
 
   defp handle_response(id, actions, requests, response) do
index 00b8a0b0c92f958f81064d8b8f0cbc52538284d4..0b7db598313cbff69643c980dee92efb6b531d29 100644 (file)
@@ -9,6 +9,7 @@ defmodule Wampex.Client.Transports.WebSocket do
 
   @header "sec-websocket-protocol"
   @ping_interval 20_000
+  @max_reconnect_interval 2 * 60_000
 
   @impl true
   def send_request(t, data) do
@@ -21,23 +22,50 @@ defmodule Wampex.Client.Transports.WebSocket do
         session: session,
         protocol: protocol,
         serializer: serializer,
+        reconnect: reconnect,
         opts: opts
       ) do
+    opts = Keyword.put(opts, :handle_initial_conn_failure, true)
+
     url
     |> Conn.new(extra_headers: [{@header, protocol}])
     |> WebSockex.start_link(
       __MODULE__,
-      %{session: session, serializer: serializer, connected: false},
+      %{
+        session: session,
+        serializer: serializer,
+        connected: false,
+        reconnect: reconnect,
+        reconnect_attempts: 1
+      },
       opts
     )
   end
 
+  defp get_backoff_delay(attempts) do
+    backoff = :math.pow(2, attempts) * 100
+    ceil(min(backoff, @max_reconnect_interval))
+  end
+
+  @impl true
+  def handle_disconnect(_status, %{reconnect: false} = state) do
+    {:ok, state}
+  end
+
+  @impl true
+  def handle_disconnect(_status, %{reconnect_attempts: attempts} = state) do
+    delay = get_backoff_delay(attempts)
+    Logger.warn("Connection disconnected. Attempting reconnect in #{delay}")
+    :timer.sleep(delay)
+    {:reconnect, %{state | reconnect_attempts: attempts + 1}}
+  end
+
   @impl true
   def handle_connect(%Conn{host: h, port: p, path: pa, query: q} = _conn, state) do
     Logger.debug("Connected to #{h}:#{p}#{pa}?#{q}")
     Process.send_after(self(), :ping, @ping_interval)
     send(state.session, {:connected, true})
-    {:ok, %{state | connected: true}}
+    {:ok, %{state | reconnect_attempts: 1, connected: true}}
   end
 
   @impl true