From: Christopher Date: Sun, 5 Apr 2020 01:41:41 +0000 (-0500) Subject: add welcome state, send event when session reconnects X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=7896d51d7493fd3477de2a226d7d97f2c3098736;p=wampex_client.git add welcome state, send event when session reconnects --- diff --git a/lib/client.ex b/lib/client.ex index b6fe5a0..ac5cc75 100644 --- a/lib/client.ex +++ b/lib/client.ex @@ -11,6 +11,7 @@ defmodule Wampex.Client do alias Wampex.Roles.Caller alias Wampex.Roles.Caller.Call alias Wampex.Roles.Dealer.Result + alias Wampex.Roles.Peer alias Wampex.Roles.Peer.Error alias Wampex.Roles.Publisher alias Wampex.Roles.Publisher.Publish @@ -29,26 +30,21 @@ defmodule Wampex.Client do {name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data, reconnect} ) do - session = session_name(name) subscriber_registry = subscriber_registry_name(name) callee_registry = callee_registry_name(name) - transport = transport_name(name) - session_data = %Sess{session_data | name: name, transport_pid: transport} + session_data = %Sess{session_data | name: name, roles: [Peer | session_data.roles]} children = [ {Registry, [keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]}, {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]}, - {Sess, [{:local, session}, session_data, []]}, {t, - [ - url: url, - session: session, - protocol: p, - serializer: s, - reconnect: reconnect, - opts: [name: transport] - ]} + name: name, + url: url, + session_data: session_data, + protocol: p, + serializer: s, + reconnect: reconnect} ] Supervisor.init(children, strategy: :one_for_all, max_restarts: 10) @@ -60,8 +56,6 @@ defmodule Wampex.Client do def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry]) @spec callee_registry_name(module()) :: module() def callee_registry_name(name), do: Module.concat([name, CalleeRegistry]) - @spec transport_name(module()) :: module() - def transport_name(name), do: Module.concat([name, Transport]) @spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) :: {:ok, integer()} diff --git a/lib/client/session.ex b/lib/client/session.ex index fcf1b3c..8e1b6d5 100644 --- a/lib/client/session.ex +++ b/lib/client/session.ex @@ -71,6 +71,7 @@ defmodule Wampex.Client.Session do @established "Established" @init "Init" @challenge "Challenge" + @welcome "Welcome" @abort "Abort" @goodbye "Goodbye" @event "Event" @@ -79,10 +80,9 @@ defmodule Wampex.Client.Session do @handshake "Handshake" ## Resources - @init_transport "InitTransport" - @wait_transport "WaitForTransport" @hello "Hello" @handle_challenge "HandleChallenge" + @handle_welcome "HandleWelcome" @handle_established "HandleEstablished" @handle_abort "HandleAbort" @handle_goodbye "HandleGoodbye" @@ -184,17 +184,6 @@ defmodule Wampex.Client.Session do {:ok, %SL{data | data: %Sess{sess | message_queue: [{request, nil} | mq]}}, []} end - @impl true - def handle_resource( - @init_transport, - _, - @wait_transport, - data - ) do - Logger.debug("Waiting for transport to connect...") - {:ok, %SL{data | data: %Sess{data.data | roles: [Peer | data.data.roles]}}, []} - end - @impl true def handle_resource( @hello, @@ -233,6 +222,26 @@ defmodule Wampex.Client.Session do {:ok, sl, [{:next_event, :internal, :challenged}]} end + @impl true + def handle_resource( + @handle_welcome, + _, + @welcome, + %SL{data: %Sess{name: name}} = sl + ) do + sub = Client.subscriber_registry_name(name) + reg = Client.callee_registry_name(name) + subscribers = Registry.select(sub, [{{:_, :"$1", :_}, [], [:"$1"]}]) + callees = Registry.select(reg, [{{:_, :"$1", :_}, [], [:"$1"]}]) + pids = Enum.uniq(subscribers ++ callees) + + Enum.each(pids, fn pid -> + send(pid, {:connected, name}) + end) + + {:ok, sl, [{:next_event, :internal, :established}]} + end + @impl true def handle_resource( @goodbye, @@ -356,12 +365,6 @@ defmodule Wampex.Client.Session do [{:next_event, :internal, :message_received}]} end - @impl true - def handle_info({:connected, true}, _, _s) do - Logger.info("Reconnected, reset.") - exit(:reset) - end - defp handle_response(nil, actions, requests, _), do: {requests, actions} defp handle_response(id, actions, requests, response) do diff --git a/lib/client/transports/web_socket.ex b/lib/client/transports/web_socket.ex index 0b7db59..1c37e2f 100644 --- a/lib/client/transports/web_socket.ex +++ b/lib/client/transports/web_socket.ex @@ -1,8 +1,10 @@ defmodule Wampex.Client.Transports.WebSocket do use WebSockex - @behaviour Wampex.Client.Transport + # @behaviour Wampex.Client.Transport + alias Wampex.Client + alias Wampex.Client.Session alias WebSockex.Conn require Logger @@ -11,34 +13,33 @@ defmodule Wampex.Client.Transports.WebSocket do @ping_interval 20_000 @max_reconnect_interval 2 * 60_000 - @impl true def send_request(t, data) do WebSockex.cast(t, {:send_request, data}) end - @impl true def start_link( + name: name, url: url, - session: session, + session_data: session_data, protocol: protocol, serializer: serializer, - reconnect: reconnect, - opts: opts + reconnect: reconnect ) do - opts = Keyword.put(opts, :handle_initial_conn_failure, true) + conn = Conn.new(url, extra_headers: [{@header, protocol}]) - url - |> Conn.new(extra_headers: [{@header, protocol}]) - |> WebSockex.start_link( + WebSockex.start_link( + conn, __MODULE__, %{ - session: session, + name: name, + session: nil, + session_data: session_data, serializer: serializer, connected: false, reconnect: reconnect, reconnect_attempts: 1 }, - opts + handle_initial_conn_failure: true ) end @@ -53,7 +54,8 @@ defmodule Wampex.Client.Transports.WebSocket do end @impl true - def handle_disconnect(_status, %{reconnect_attempts: attempts} = state) do + def handle_disconnect(_status, %{session: sess, reconnect_attempts: attempts} = state) do + stop_session(sess) delay = get_backoff_delay(attempts) Logger.warn("Connection disconnected. Attempting reconnect in #{delay}") :timer.sleep(delay) @@ -61,11 +63,21 @@ defmodule Wampex.Client.Transports.WebSocket do end @impl true - def handle_connect(%Conn{host: h, port: p, path: pa, query: q} = _conn, state) do - Logger.debug("Connected to #{h}:#{p}#{pa}?#{q}") + def handle_connect( + %Conn{host: h, port: p, path: pa, query: q} = _conn, + %{name: name, session_data: sd} = state + ) do + Logger.info("Connected to #{h}:#{p}#{pa}?#{q}") Process.send_after(self(), :ping, @ping_interval) - send(state.session, {:connected, true}) - {:ok, %{state | reconnect_attempts: 1, connected: true}} + + {:ok, sess} = + Session.start_link( + {:local, Client.session_name(name)}, + %Session{sd | transport_pid: self()}, + [] + ) + + {:ok, %{state | session: sess, reconnect_attempts: 1, connected: true}} end @impl true @@ -97,10 +109,18 @@ defmodule Wampex.Client.Transports.WebSocket do end @impl true - def handle_frame({type, message}, %{serializer: s} = state) do + def handle_frame({_type, message}, %{serializer: s, session: sess} = state) do message = s.deserialize!(message) - Logger.debug("Received #{type} data: #{inspect(message)}") - send(state.session, {:set_message, message}) + send(sess, {:set_message, message}) {:ok, state} end + + def stop_session(nil), do: :noop + + def stop_session(pid) do + case Process.alive?(pid) do + true -> Process.exit(pid, :normal) + false -> :noop + end + end end diff --git a/mix.exs b/mix.exs index 2a0dbb8..8223037 100644 --- a/mix.exs +++ b/mix.exs @@ -45,9 +45,10 @@ defmodule Wampex.Client.MixProject do {:msgpack, "~> 0.7.0"}, {:pbkdf2, "~> 2.0"}, {:states_language, "~> 0.2"}, + # {:wampex, path: "../wampex"}, {:wampex, git: "https://gitlab.com/entropealabs/wampex.git", - tag: "61d834a784f5f14ec90508cf7556a5ebaec113c3"}, + tag: "7d656d9748cb0e4d08c625d87e315839afecb4b2"}, {:websockex, "~> 0.4.2"} ] end diff --git a/priv/client.json b/priv/client.json index b09b052..5c6795e 100644 --- a/priv/client.json +++ b/priv/client.json @@ -1,19 +1,7 @@ { "Comment": "Session State Machine", - "StartAt": "WaitForTransport", + "StartAt": "Init", "States": { - "WaitForTransport": { - "Type": "Task", - "Resource": "InitTransport", - "TransitionEvent": "{:connected, true}", - "Next": "Init", - "Catch": [ - { - "ErrorEquals": ["{:connected, false}"], - "Next": "Abort" - } - ] - }, "Init": { "Type": "Task", "Resource": "Hello", @@ -67,6 +55,10 @@ "StringEquals": ":established", "Next": "Established" }, + { + "StringEquals": ":welcome", + "Next": "Welcome" + }, { "StringEquals": ":event", "Next": "Event" @@ -93,6 +85,12 @@ } ] }, + "Welcome": { + "Type": "Task", + "Resource": "HandleWelcome", + "TransitionEvent": ":established", + "Next": "Established" + }, "Interrupt": { "Type": "Task", "Resource": "HandleInterrupt", diff --git a/priv/repo/migrations/20200319210508_create_realms.exs b/priv/repo/migrations/20200319210508_create_realms.exs deleted file mode 100644 index 86f165b..0000000 --- a/priv/repo/migrations/20200319210508_create_realms.exs +++ /dev/null @@ -1,28 +0,0 @@ -defmodule Wampex.Router.Authentication.Repo.Migrations.CreateRealms do - use Ecto.Migration - - def change do - create table("realms", primary_key: false) do - add(:id, :binary_id, primary_key: true, default: fragment("gen_random_uuid()")) - add(:uri, :string, null: false) - add(:parent, :string, null: true) - timestamps() - end - - create(unique_index(:realms, [:uri])) - - create table("users", primary_key: false) do - add(:id, :binary_id, primary_key: true, default: fragment("gen_random_uuid()")) - add(:authid, :string, null: false) - add(:password, :string, null: false) - add(:salt, :string, null: false) - add(:iterations, :integer, null: false) - add(:keylen, :integer, null: false) - add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all), null: false) - timestamps() - end - - create(index(:users, [:authid])) - create(unique_index(:users, [:authid, :realm_id])) - end -end diff --git a/priv/router.json b/priv/router.json deleted file mode 100644 index 9f60d81..0000000 --- a/priv/router.json +++ /dev/null @@ -1,153 +0,0 @@ -{ - "Comment": "Router Session State Machine", - "StartAt": "Init", - "States": { - "Init": { - "Type": "Task", - "Resource": "HandleInit", - "Next": "Established" - }, - "Established": { - "Type": "Choice", - "Resource": "HandleEstablished", - "Choices": [ - { - "StringEquals": ":message_received", - "Next": "Message" - }, - { - "StringEquals": ":goodbye", - "Next": "GoodBye" - }, - { - "StringEquals": ":abort", - "Next": "Abort" - } - ] - }, - "Message": { - "Type": "Choice", - "Resource": "HandleMessage", - "Choices": [ - { - "StringEquals": ":hello", - "Next": "Hello" - }, - { - "StringEquals": ":authenticate", - "Next": "Authenticate" - }, - { - "StringEquals": ":call", - "Next": "Call" - }, - { - "StringEquals": ":yield", - "Next": "Yield" - }, - { - "StringEquals": ":register", - "Next": "Register" - }, - { - "StringEquals": ":unregister", - "Next": "Unregister" - }, - { - "StringEquals": ":subscribe", - "Next": "Subscribe" - }, - { - "StringEquals": ":unsubscribe", - "Next": "Unsubscribe" - }, - { - "StringEquals": ":publish", - "Next": "Publish" - }, - { - "StringEquals": ":error", - "Next": "Error" - }, - { - "StringEquals": ":abort", - "Next": "Abort" - }, - { - "StringEquals": ":goodbye", - "Next": "GoodBye" - }, - { - "StringEquals": ":cancel", - "Next": "Cancel" - } - ] - }, - "Hello": { - "Type": "Task", - "Resource": "HandleHello", - "Next": "Established" - }, - "Authenticate": { - "Type": "Task", - "Resource": "HandleAuthenticate", - "Next": "Established" - }, - "Call": { - "Type": "Task", - "Resource": "HandleCall", - "Next": "Established", - "Catch": [ - { - "ErrorEquals": ["{:error, :no_live_callees}"], - "Next": "Error" - } - ] - }, - "Yield": { - "Type": "Task", - "Resource": "HandleYield", - "Next": "Established" - }, - "Register":{ - "Type": "Task", - "Resource": "HandleRegister", - "Next": "Established" - }, - "Unregister":{ - "Type": "Task", - "Resource": "HandleUnregister", - "Next": "Established" - }, - "Subscribe":{ - "Type": "Task", - "Resource": "HandleSubscribe", - "Next": "Established" - }, - "Unsubscribe":{ - "Type": "Task", - "Resource": "HandleUnsubscribe", - "Next": "Established" - }, - "Publish":{ - "Type": "Task", - "Resource": "HandlePublish", - "Next": "Established" - }, - "Error":{ - "Type": "Task", - "Resource": "HandleError", - "Next": "Established" - }, - "GoodBye": { - "Type": "Task", - "Resource": "HandleGoodbye", - "End": true - }, - "Abort": { - "Type": "Task", - "Resource": "HandleAbort", - "End": true - } - } -}