]> Entropealabs - wampex_client.git/commitdiff
add welcome state, send event when session reconnects
authorChristopher <chris@entropealabs.com>
Sun, 5 Apr 2020 01:41:41 +0000 (20:41 -0500)
committerChristopher <chris@entropealabs.com>
Sun, 5 Apr 2020 01:41:41 +0000 (20:41 -0500)
lib/client.ex
lib/client/session.ex
lib/client/transports/web_socket.ex
mix.exs
priv/client.json
priv/repo/migrations/20200319210508_create_realms.exs [deleted file]
priv/router.json [deleted file]

index b6fe5a0c44046d4be76ddc5a4cba1fed9bd8dfdf..ac5cc752ec343397d17027a14571ffb4714297b3 100644 (file)
@@ -11,6 +11,7 @@ defmodule Wampex.Client do
   alias Wampex.Roles.Caller
   alias Wampex.Roles.Caller.Call
   alias Wampex.Roles.Dealer.Result
+  alias Wampex.Roles.Peer
   alias Wampex.Roles.Peer.Error
   alias Wampex.Roles.Publisher
   alias Wampex.Roles.Publisher.Publish
@@ -29,26 +30,21 @@ defmodule Wampex.Client do
         {name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data,
          reconnect}
       ) do
-    session = session_name(name)
     subscriber_registry = subscriber_registry_name(name)
     callee_registry = callee_registry_name(name)
-    transport = transport_name(name)
-    session_data = %Sess{session_data | name: name, transport_pid: transport}
+    session_data = %Sess{session_data | name: name, roles: [Peer | session_data.roles]}
 
     children = [
       {Registry,
        [keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]},
       {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]},
-      {Sess, [{:local, session}, session_data, []]},
       {t,
-       [
-         url: url,
-         session: session,
-         protocol: p,
-         serializer: s,
-         reconnect: reconnect,
-         opts: [name: transport]
-       ]}
+       name: name,
+       url: url,
+       session_data: session_data,
+       protocol: p,
+       serializer: s,
+       reconnect: reconnect}
     ]
 
     Supervisor.init(children, strategy: :one_for_all, max_restarts: 10)
@@ -60,8 +56,6 @@ defmodule Wampex.Client do
   def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry])
   @spec callee_registry_name(module()) :: module()
   def callee_registry_name(name), do: Module.concat([name, CalleeRegistry])
-  @spec transport_name(module()) :: module()
-  def transport_name(name), do: Module.concat([name, Transport])
 
   @spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) ::
           {:ok, integer()}
index fcf1b3c1f3eab9b638f7304923bc89349575be75..8e1b6d5f102351184c88fad91701930d63eb5552 100644 (file)
@@ -71,6 +71,7 @@ defmodule Wampex.Client.Session do
   @established "Established"
   @init "Init"
   @challenge "Challenge"
+  @welcome "Welcome"
   @abort "Abort"
   @goodbye "Goodbye"
   @event "Event"
@@ -79,10 +80,9 @@ defmodule Wampex.Client.Session do
   @handshake "Handshake"
 
   ## Resources
-  @init_transport "InitTransport"
-  @wait_transport "WaitForTransport"
   @hello "Hello"
   @handle_challenge "HandleChallenge"
+  @handle_welcome "HandleWelcome"
   @handle_established "HandleEstablished"
   @handle_abort "HandleAbort"
   @handle_goodbye "HandleGoodbye"
@@ -184,17 +184,6 @@ defmodule Wampex.Client.Session do
     {:ok, %SL{data | data: %Sess{sess | message_queue: [{request, nil} | mq]}}, []}
   end
 
-  @impl true
-  def handle_resource(
-        @init_transport,
-        _,
-        @wait_transport,
-        data
-      ) do
-    Logger.debug("Waiting for transport to connect...")
-    {:ok, %SL{data | data: %Sess{data.data | roles: [Peer | data.data.roles]}}, []}
-  end
-
   @impl true
   def handle_resource(
         @hello,
@@ -233,6 +222,26 @@ defmodule Wampex.Client.Session do
     {:ok, sl, [{:next_event, :internal, :challenged}]}
   end
 
+  @impl true
+  def handle_resource(
+        @handle_welcome,
+        _,
+        @welcome,
+        %SL{data: %Sess{name: name}} = sl
+      ) do
+    sub = Client.subscriber_registry_name(name)
+    reg = Client.callee_registry_name(name)
+    subscribers = Registry.select(sub, [{{:_, :"$1", :_}, [], [:"$1"]}])
+    callees = Registry.select(reg, [{{:_, :"$1", :_}, [], [:"$1"]}])
+    pids = Enum.uniq(subscribers ++ callees)
+
+    Enum.each(pids, fn pid ->
+      send(pid, {:connected, name})
+    end)
+
+    {:ok, sl, [{:next_event, :internal, :established}]}
+  end
+
   @impl true
   def handle_resource(
         @goodbye,
@@ -356,12 +365,6 @@ defmodule Wampex.Client.Session do
      [{:next_event, :internal, :message_received}]}
   end
 
-  @impl true
-  def handle_info({:connected, true}, _, _s) do
-    Logger.info("Reconnected, reset.")
-    exit(:reset)
-  end
-
   defp handle_response(nil, actions, requests, _), do: {requests, actions}
 
   defp handle_response(id, actions, requests, response) do
index 0b7db598313cbff69643c980dee92efb6b531d29..1c37e2f69827422e2ac57c71b9a79e455edf4382 100644 (file)
@@ -1,8 +1,10 @@
 defmodule Wampex.Client.Transports.WebSocket do
   use WebSockex
 
-  @behaviour Wampex.Client.Transport
+  @behaviour Wampex.Client.Transport
 
+  alias Wampex.Client
+  alias Wampex.Client.Session
   alias WebSockex.Conn
 
   require Logger
@@ -11,34 +13,33 @@ defmodule Wampex.Client.Transports.WebSocket do
   @ping_interval 20_000
   @max_reconnect_interval 2 * 60_000
 
-  @impl true
   def send_request(t, data) do
     WebSockex.cast(t, {:send_request, data})
   end
 
-  @impl true
   def start_link(
+        name: name,
         url: url,
-        session: session,
+        session_data: session_data,
         protocol: protocol,
         serializer: serializer,
-        reconnect: reconnect,
-        opts: opts
+        reconnect: reconnect
       ) do
-    opts = Keyword.put(opts, :handle_initial_conn_failure, true)
+    conn = Conn.new(url, extra_headers: [{@header, protocol}])
 
-    url
-    |> Conn.new(extra_headers: [{@header, protocol}])
-    |> WebSockex.start_link(
+    WebSockex.start_link(
+      conn,
       __MODULE__,
       %{
-        session: session,
+        name: name,
+        session: nil,
+        session_data: session_data,
         serializer: serializer,
         connected: false,
         reconnect: reconnect,
         reconnect_attempts: 1
       },
-      opts
+      handle_initial_conn_failure: true
     )
   end
 
@@ -53,7 +54,8 @@ defmodule Wampex.Client.Transports.WebSocket do
   end
 
   @impl true
-  def handle_disconnect(_status, %{reconnect_attempts: attempts} = state) do
+  def handle_disconnect(_status, %{session: sess, reconnect_attempts: attempts} = state) do
+    stop_session(sess)
     delay = get_backoff_delay(attempts)
     Logger.warn("Connection disconnected. Attempting reconnect in #{delay}")
     :timer.sleep(delay)
@@ -61,11 +63,21 @@ defmodule Wampex.Client.Transports.WebSocket do
   end
 
   @impl true
-  def handle_connect(%Conn{host: h, port: p, path: pa, query: q} = _conn, state) do
-    Logger.debug("Connected to #{h}:#{p}#{pa}?#{q}")
+  def handle_connect(
+        %Conn{host: h, port: p, path: pa, query: q} = _conn,
+        %{name: name, session_data: sd} = state
+      ) do
+    Logger.info("Connected to #{h}:#{p}#{pa}?#{q}")
     Process.send_after(self(), :ping, @ping_interval)
-    send(state.session, {:connected, true})
-    {:ok, %{state | reconnect_attempts: 1, connected: true}}
+
+    {:ok, sess} =
+      Session.start_link(
+        {:local, Client.session_name(name)},
+        %Session{sd | transport_pid: self()},
+        []
+      )
+
+    {:ok, %{state | session: sess, reconnect_attempts: 1, connected: true}}
   end
 
   @impl true
@@ -97,10 +109,18 @@ defmodule Wampex.Client.Transports.WebSocket do
   end
 
   @impl true
-  def handle_frame({type, message}, %{serializer: s} = state) do
+  def handle_frame({_type, message}, %{serializer: s, session: sess} = state) do
     message = s.deserialize!(message)
-    Logger.debug("Received #{type} data: #{inspect(message)}")
-    send(state.session, {:set_message, message})
+    send(sess, {:set_message, message})
     {:ok, state}
   end
+
+  def stop_session(nil), do: :noop
+
+  def stop_session(pid) do
+    case Process.alive?(pid) do
+      true -> Process.exit(pid, :normal)
+      false -> :noop
+    end
+  end
 end
diff --git a/mix.exs b/mix.exs
index 2a0dbb8a063b8866cab2948558cf833508400d91..82230374c1ddb34a9baf06672969a35ed93d69f6 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -45,9 +45,10 @@ defmodule Wampex.Client.MixProject do
       {:msgpack, "~> 0.7.0"},
       {:pbkdf2, "~> 2.0"},
       {:states_language, "~> 0.2"},
+      # {:wampex, path: "../wampex"},
       {:wampex,
        git: "https://gitlab.com/entropealabs/wampex.git",
-       tag: "61d834a784f5f14ec90508cf7556a5ebaec113c3"},
+       tag: "7d656d9748cb0e4d08c625d87e315839afecb4b2"},
       {:websockex, "~> 0.4.2"}
     ]
   end
index b09b052d12b4c755837417b227ca0814f9b8ee61..5c6795e188617fc82d86dab2f7fc8cf0557f84df 100644 (file)
@@ -1,19 +1,7 @@
 {
   "Comment": "Session State Machine",
-  "StartAt": "WaitForTransport",
+  "StartAt": "Init",
   "States": {
-    "WaitForTransport": {
-      "Type": "Task",
-      "Resource": "InitTransport",
-      "TransitionEvent": "{:connected, true}",
-      "Next": "Init",
-      "Catch": [
-        {
-          "ErrorEquals": ["{:connected, false}"],
-          "Next": "Abort"
-        }
-      ]
-    },
     "Init": {
       "Type": "Task",
       "Resource": "Hello",
           "StringEquals": ":established",
           "Next": "Established"
         },
+        {
+          "StringEquals": ":welcome",
+          "Next": "Welcome"
+        },
         {
           "StringEquals": ":event",
           "Next": "Event"
         }
       ]
     },
+    "Welcome": {
+      "Type": "Task",
+      "Resource": "HandleWelcome",
+      "TransitionEvent": ":established",
+      "Next": "Established"
+    },
     "Interrupt": {
       "Type": "Task",
       "Resource": "HandleInterrupt",
diff --git a/priv/repo/migrations/20200319210508_create_realms.exs b/priv/repo/migrations/20200319210508_create_realms.exs
deleted file mode 100644 (file)
index 86f165b..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-defmodule Wampex.Router.Authentication.Repo.Migrations.CreateRealms do
-  use Ecto.Migration
-
-  def change do
-    create table("realms", primary_key: false) do
-      add(:id, :binary_id, primary_key: true, default: fragment("gen_random_uuid()"))
-      add(:uri, :string, null: false)
-      add(:parent, :string, null: true)
-      timestamps()
-    end
-
-    create(unique_index(:realms, [:uri]))
-
-    create table("users", primary_key: false) do
-      add(:id, :binary_id, primary_key: true, default: fragment("gen_random_uuid()"))
-      add(:authid, :string, null: false)
-      add(:password, :string, null: false)
-      add(:salt, :string, null: false)
-      add(:iterations, :integer, null: false)
-      add(:keylen, :integer, null: false)
-      add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all), null: false)
-      timestamps()
-    end
-
-    create(index(:users, [:authid]))
-    create(unique_index(:users, [:authid, :realm_id]))
-  end
-end
diff --git a/priv/router.json b/priv/router.json
deleted file mode 100644 (file)
index 9f60d81..0000000
+++ /dev/null
@@ -1,153 +0,0 @@
-{
-  "Comment": "Router Session State Machine",
-  "StartAt": "Init",
-  "States": {
-    "Init": {
-      "Type": "Task",
-      "Resource": "HandleInit",
-      "Next": "Established"
-    },
-    "Established": {
-      "Type": "Choice",
-      "Resource": "HandleEstablished",
-      "Choices": [
-        {
-          "StringEquals": ":message_received",
-          "Next": "Message"
-        },
-        {
-          "StringEquals": ":goodbye",
-          "Next": "GoodBye"
-        },
-        {
-          "StringEquals": ":abort",
-          "Next": "Abort"
-        }
-      ]
-    },
-    "Message": {
-      "Type": "Choice",
-      "Resource": "HandleMessage",
-      "Choices": [
-        {
-          "StringEquals": ":hello",
-          "Next": "Hello"
-        },
-        {
-          "StringEquals": ":authenticate",
-          "Next": "Authenticate"
-        },
-        {
-          "StringEquals": ":call",
-          "Next": "Call"
-        },
-        {
-          "StringEquals": ":yield",
-          "Next": "Yield"
-        },
-        {
-          "StringEquals": ":register",
-          "Next": "Register"
-        },
-        {
-          "StringEquals": ":unregister",
-          "Next": "Unregister"
-        },
-        {
-          "StringEquals": ":subscribe",
-          "Next": "Subscribe"
-        },
-        {
-          "StringEquals": ":unsubscribe",
-          "Next": "Unsubscribe"
-        },
-        {
-          "StringEquals": ":publish",
-          "Next": "Publish"
-        },
-        {
-          "StringEquals": ":error",
-          "Next": "Error"
-        },
-        {
-          "StringEquals": ":abort",
-          "Next": "Abort"
-        },
-        {
-          "StringEquals": ":goodbye",
-          "Next": "GoodBye"
-        },
-        {
-          "StringEquals": ":cancel",
-          "Next": "Cancel"
-        }
-      ]
-    },
-    "Hello": {
-      "Type": "Task",
-      "Resource": "HandleHello",
-      "Next": "Established"
-    },
-    "Authenticate": {
-      "Type": "Task",
-      "Resource": "HandleAuthenticate",
-      "Next": "Established"
-    },
-    "Call": {
-      "Type": "Task",
-      "Resource": "HandleCall",
-      "Next": "Established",
-      "Catch": [
-        {
-          "ErrorEquals": ["{:error, :no_live_callees}"],
-          "Next": "Error"
-        }
-      ]
-    },
-    "Yield": {
-      "Type": "Task",
-      "Resource": "HandleYield",
-      "Next": "Established"
-    },
-    "Register":{
-      "Type": "Task",
-      "Resource": "HandleRegister",
-      "Next": "Established"
-    },
-    "Unregister":{
-      "Type": "Task",
-      "Resource": "HandleUnregister",
-      "Next": "Established"
-    },
-    "Subscribe":{
-      "Type": "Task",
-      "Resource": "HandleSubscribe",
-      "Next": "Established"
-    },
-    "Unsubscribe":{
-      "Type": "Task",
-      "Resource": "HandleUnsubscribe",
-      "Next": "Established"
-    },
-    "Publish":{
-      "Type": "Task",
-      "Resource": "HandlePublish",
-      "Next": "Established"
-    },
-    "Error":{
-      "Type": "Task",
-      "Resource": "HandleError",
-      "Next": "Established"
-    },
-    "GoodBye": {
-      "Type": "Task",
-      "Resource": "HandleGoodbye",
-      "End": true
-    },
-    "Abort": {
-      "Type": "Task",
-      "Resource": "HandleAbort",
-      "End": true
-    }
-  }
-}