From fa012604c862930c2480e89ded28e3d2a21b6de8 Mon Sep 17 00:00:00 2001 From: Christopher Date: Sun, 16 Feb 2020 22:04:36 -0600 Subject: [PATCH] supervisor structure --- lib/test.ex | 30 ++++++++++++++++++++ lib/wampex.ex | 34 +++++++++++++++++++---- lib/wampex/realm.ex | 5 ++++ lib/wampex/roles/callee.ex | 5 ++++ lib/wampex/roles/caller.ex | 5 ++++ lib/wampex/roles/publisher.ex | 23 ++++++++++++++++ lib/wampex/roles/subscriber.ex | 17 ++++++++++++ lib/wampex/session.ex | 50 ++++++++++++++++++++++++++-------- lib/wampex/transport.ex | 33 ++++++++++++++++------ 9 files changed, 177 insertions(+), 25 deletions(-) create mode 100644 lib/test.ex create mode 100644 lib/wampex/realm.ex create mode 100644 lib/wampex/roles/callee.ex create mode 100644 lib/wampex/roles/caller.ex create mode 100644 lib/wampex/roles/publisher.ex create mode 100644 lib/wampex/roles/subscriber.ex diff --git a/lib/test.ex b/lib/test.ex new file mode 100644 index 0000000..6a6d77a --- /dev/null +++ b/lib/test.ex @@ -0,0 +1,30 @@ +defmodule Wampex.Test do + alias Wampex.{Realm, Session} + alias Wampex.Role.{Callee, Caller, Publisher, Subscriber} + + @url "ws://localhost:18080/ws" + @realm %Realm{name: "com.myrealm"} + @roles [Callee, Caller, Publisher, Subscriber] + + def subscribe do + sess = %Session{url: @url, realm: @realm, roles: @roles} + Wampex.start_link(TestSubscriber, sess) + :timer.sleep(500) + Wampex.send_request(TestSubscriber, Subscriber.subscribe("com.data.temp")) + end + + def publish do + sess = %Session{url: @url, realm: @realm, roles: @roles} + Wampex.start_link(TestPublisher, sess) + :timer.sleep(500) + + Enum.each(1..10, fn _ -> + Wampex.send_request( + TestPublisher, + Publisher.publish("com.data.temp", [12.5, 45.6, 87.5], %{loc: "60645"}, %{}) + ) + + :timer.sleep(500) + end) + end +end diff --git a/lib/wampex.ex b/lib/wampex.ex index 96f2178..3c67cd2 100644 --- a/lib/wampex.ex +++ b/lib/wampex.ex @@ -2,11 +2,35 @@ defmodule Wampex do @moduledoc """ Documentation for Wampex. """ - alias Wampex.Session + use Supervisor - def test do - url = "ws://localhost:18080/ws" - sess = %Session{url: url} - Session.start_link(sess) + alias Wampex.Session, as: Sess + + def start_link(name, %Sess{} = session_data) when is_atom(name) do + Supervisor.start_link(__MODULE__, {name, session_data}, name: name) + end + + def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do + session_name = session_name(name) + subscriber_registry = subscriber_registry_name(name) + transport_name = transport_name(name) + session_data = %Sess{session_data | transport_pid: transport_name} + + children = [ + {Sess, [{:local, session_name}, session_data, []]}, + {t, + [url: url, session: session_name, protocol: p, serializer: s, opts: [name: transport_name]]}, + {Registry, [keys: :duplicate, name: subscriber_registry]} + ] + + Supervisor.init(children, strategy: :one_for_all) + end + + def session_name(name), do: Module.concat([name, Session]) + def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry]) + def transport_name(name), do: Module.concat([name, Transport]) + + def send_request(name, request) do + Sess.send_request(session_name(name), request) end end diff --git a/lib/wampex/realm.ex b/lib/wampex/realm.ex new file mode 100644 index 0000000..6e63515 --- /dev/null +++ b/lib/wampex/realm.ex @@ -0,0 +1,5 @@ +defmodule Wampex.Realm do + @moduledoc false + + defstruct [:name, :authentication] +end diff --git a/lib/wampex/roles/callee.ex b/lib/wampex/roles/callee.ex new file mode 100644 index 0000000..3b802aa --- /dev/null +++ b/lib/wampex/roles/callee.ex @@ -0,0 +1,5 @@ +defmodule Wampex.Role.Callee do + def add(roles) do + Map.put(roles, :callee, %{}) + end +end diff --git a/lib/wampex/roles/caller.ex b/lib/wampex/roles/caller.ex new file mode 100644 index 0000000..e69a601 --- /dev/null +++ b/lib/wampex/roles/caller.ex @@ -0,0 +1,5 @@ +defmodule Wampex.Role.Caller do + def add(roles) do + Map.put(roles, :caller, %{}) + end +end diff --git a/lib/wampex/roles/publisher.ex b/lib/wampex/roles/publisher.ex new file mode 100644 index 0000000..1207c2d --- /dev/null +++ b/lib/wampex/roles/publisher.ex @@ -0,0 +1,23 @@ +defmodule Wampex.Role.Publisher do + @publish 16 + + def add(roles) do + Map.put(roles, :publisher, %{}) + end + + def publish(topic) do + [@publish, %{}, topic] + end + + def publish(topic, opts) do + [@publish, opts, topic] + end + + def publish(topic, args_l, opts) do + [@publish, opts, topic, args_l] + end + + def publish(topic, args_l, args_kw, opts) do + [@publish, opts, topic, args_l, args_kw] + end +end diff --git a/lib/wampex/roles/subscriber.ex b/lib/wampex/roles/subscriber.ex new file mode 100644 index 0000000..84f4fe1 --- /dev/null +++ b/lib/wampex/roles/subscriber.ex @@ -0,0 +1,17 @@ +defmodule Wampex.Role.Subscriber do + @moduledoc false + @subscribe 32 + @unsubscribe 34 + + def add(roles) do + Map.put(roles, :subscriber, %{}) + end + + def subscribe(topic, opts \\ %{}) when is_binary(topic) and is_map(opts) do + [@subscribe, opts, topic] + end + + def unsubscribe(sub_id) do + [@unsubscribe, sub_id] + end +end diff --git a/lib/wampex/session.ex b/lib/wampex/session.ex index e6283e1..b2bf4c4 100644 --- a/lib/wampex/session.ex +++ b/lib/wampex/session.ex @@ -1,8 +1,11 @@ defmodule Wampex.Session do use StatesLanguage, data: "priv/session.json" + @max_id 9_007_199_254_740_992 + alias StatesLanguage, as: SL alias Wampex.Session, as: Sess + alias Wampex.Realm alias Wampex.Serializer.JSON alias Wampex.Transport.WebSocket @@ -11,28 +14,51 @@ defmodule Wampex.Session do :url, :transport_pid, :message, - request_id: 1, + :realm, + roles: [], + request_id: 0, transport: WebSocket, protocol: "wamp.2.json", serializer: JSON ] - def get_id do - Enum.random(1..9_007_199_254_740_992) + def get_request_id(current_id) when current_id == @max_id do + 1 + end + + def get_request_id(current_id) do + current_id + 1 + end + + def send_request(p, request) do + __MODULE__.cast(p, {:send_request, request}) + end + + def handle_cast( + {:send_request, request}, + "Established", + %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data + ) do + request_id = get_request_id(r_id) + request = List.insert_at(request, 1, request_id) + tt.send_request(t, request) + {:ok, %SL{data | data: %Sess{sess | request_id: request_id}}, []} end def handle_resource( "Hello", _, "Init", - %SL{data: %Sess{transport: tt, transport_pid: t}} = data + %SL{ + data: %Sess{transport: tt, transport_pid: t, roles: roles, realm: %Realm{name: realm}} + } = data ) do - tt.send_message( + tt.send_request( t, [ 1, - "com.myrealm", - %{roles: %{publisher: %{}, subscriber: %{}, caller: %{}, callee: %{}}} + realm, + %{roles: Enum.reduce(roles, %{}, fn r, acc -> r.add(acc) end)} ] ) @@ -50,7 +76,7 @@ defmodule Wampex.Session do end def handle_resource("HandleMessage", _, _, data) do - Logger.info("Handling Message") + Logger.info("Handling Message #{inspect(data.data.message)}") {:ok, data, [{:next_event, :internal, :noop}]} end @@ -63,14 +89,14 @@ defmodule Wampex.Session do "InitTransport", _, "WaitForTransport", - %SL{data: %Sess{url: url, transport: t, serializer: s, protocol: p} = sess} = data + data ) do - {:ok, pid} = t.start_link(url, self(), p, s) - {:ok, %SL{data | data: %Sess{sess | transport_pid: pid}}, []} + Logger.info("Waiting for transport to connect...") + {:ok, data, []} end def handle_resource("Established", _, "Established", data) do - Logger.info("Session #{data.data.id} Established") + Logger.info("Session #{data.data.id}") {:ok, data, []} end diff --git a/lib/wampex/transport.ex b/lib/wampex/transport.ex index 8f7fe66..434d55f 100644 --- a/lib/wampex/transport.ex +++ b/lib/wampex/transport.ex @@ -6,9 +6,10 @@ defmodule Wampex.Transport.WebSocket do require Logger @header "Sec-WebSocket-Protocol" + @ping_interval 20_000 - def send_message(t, data) do - WebSockex.cast(t, {:send_message, data}) + def send_request(t, data) do + WebSockex.cast(t, {:send_request, data}) end def connected?(t, resp) do @@ -16,11 +17,11 @@ defmodule Wampex.Transport.WebSocket do end def start_link( - url, - session, - protocol, - serializer, - opts \\ [] + url: url, + session: session, + protocol: protocol, + serializer: serializer, + opts: opts ) do url |> Conn.new(extra_headers: [{@header, protocol}]) @@ -33,11 +34,21 @@ defmodule Wampex.Transport.WebSocket do def handle_connect(%Conn{host: h, port: p, path: pa, query: q} = _conn, state) do Logger.info("Connected to #{h}:#{p}#{pa}?#{q}") + Process.send_after(self(), :ping, @ping_interval) send(state.session, {:connected, true}) {:ok, %{state | connected: true}} end - def handle_cast({:send_message, data}, %{serializer: s} = state) do + def handle_ping(_ping, state) do + {:reply, :pong, state} + end + + def handle_pong(_pong, state) do + {:ok, state} + end + + def handle_cast({:send_request, data}, %{serializer: s} = state) do + Logger.info("Sending Request #{inspect(data)}") {:reply, {s.data_type(), s.serialize!(data)}, state} end @@ -46,6 +57,11 @@ defmodule Wampex.Transport.WebSocket do {:ok, state} end + def handle_info(:ping, state) do + Process.send_after(self(), :ping, @ping_interval) + {:reply, :ping, state} + end + def handle_frame({type, data}, %{serializer: s} = state) do Logger.info("Received #{type} data #{data}") data = s.deserialize!(data) @@ -60,5 +76,6 @@ defmodule Wampex.Transport.WebSocket do def handle_message(message, state) do send(state.session, {:set_message, message}) + {:ok, state} end end -- 2.45.3