]> Entropealabs - wampex_client.git/commitdiff
supervisor structure
authorChristopher <chris@entropealabs.com>
Mon, 17 Feb 2020 04:04:36 +0000 (22:04 -0600)
committerChristopher <chris@entropealabs.com>
Mon, 17 Feb 2020 04:04:36 +0000 (22:04 -0600)
lib/test.ex [new file with mode: 0644]
lib/wampex.ex
lib/wampex/realm.ex [new file with mode: 0644]
lib/wampex/roles/callee.ex [new file with mode: 0644]
lib/wampex/roles/caller.ex [new file with mode: 0644]
lib/wampex/roles/publisher.ex [new file with mode: 0644]
lib/wampex/roles/subscriber.ex [new file with mode: 0644]
lib/wampex/session.ex
lib/wampex/transport.ex

diff --git a/lib/test.ex b/lib/test.ex
new file mode 100644 (file)
index 0000000..6a6d77a
--- /dev/null
@@ -0,0 +1,30 @@
+defmodule Wampex.Test do
+  alias Wampex.{Realm, Session}
+  alias Wampex.Role.{Callee, Caller, Publisher, Subscriber}
+
+  @url "ws://localhost:18080/ws"
+  @realm %Realm{name: "com.myrealm"}
+  @roles [Callee, Caller, Publisher, Subscriber]
+
+  def subscribe do
+    sess = %Session{url: @url, realm: @realm, roles: @roles}
+    Wampex.start_link(TestSubscriber, sess)
+    :timer.sleep(500)
+    Wampex.send_request(TestSubscriber, Subscriber.subscribe("com.data.temp"))
+  end
+
+  def publish do
+    sess = %Session{url: @url, realm: @realm, roles: @roles}
+    Wampex.start_link(TestPublisher, sess)
+    :timer.sleep(500)
+
+    Enum.each(1..10, fn _ ->
+      Wampex.send_request(
+        TestPublisher,
+        Publisher.publish("com.data.temp", [12.5, 45.6, 87.5], %{loc: "60645"}, %{})
+      )
+
+      :timer.sleep(500)
+    end)
+  end
+end
index 96f2178d5aea51c0937dc7badb78236f54da19b8..3c67cd2b7ffdc6b215a16d596933a52f2d1d787d 100644 (file)
@@ -2,11 +2,35 @@ defmodule Wampex do
   @moduledoc """
   Documentation for Wampex.
   """
-  alias Wampex.Session
+  use Supervisor
 
-  def test do
-    url = "ws://localhost:18080/ws"
-    sess = %Session{url: url}
-    Session.start_link(sess)
+  alias Wampex.Session, as: Sess
+
+  def start_link(name, %Sess{} = session_data) when is_atom(name) do
+    Supervisor.start_link(__MODULE__, {name, session_data}, name: name)
+  end
+
+  def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do
+    session_name = session_name(name)
+    subscriber_registry = subscriber_registry_name(name)
+    transport_name = transport_name(name)
+    session_data = %Sess{session_data | transport_pid: transport_name}
+
+    children = [
+      {Sess, [{:local, session_name}, session_data, []]},
+      {t,
+       [url: url, session: session_name, protocol: p, serializer: s, opts: [name: transport_name]]},
+      {Registry, [keys: :duplicate, name: subscriber_registry]}
+    ]
+
+    Supervisor.init(children, strategy: :one_for_all)
+  end
+
+  def session_name(name), do: Module.concat([name, Session])
+  def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry])
+  def transport_name(name), do: Module.concat([name, Transport])
+
+  def send_request(name, request) do
+    Sess.send_request(session_name(name), request)
   end
 end
diff --git a/lib/wampex/realm.ex b/lib/wampex/realm.ex
new file mode 100644 (file)
index 0000000..6e63515
--- /dev/null
@@ -0,0 +1,5 @@
+defmodule Wampex.Realm do
+  @moduledoc false
+
+  defstruct [:name, :authentication]
+end
diff --git a/lib/wampex/roles/callee.ex b/lib/wampex/roles/callee.ex
new file mode 100644 (file)
index 0000000..3b802aa
--- /dev/null
@@ -0,0 +1,5 @@
+defmodule Wampex.Role.Callee do
+  def add(roles) do
+    Map.put(roles, :callee, %{})
+  end
+end
diff --git a/lib/wampex/roles/caller.ex b/lib/wampex/roles/caller.ex
new file mode 100644 (file)
index 0000000..e69a601
--- /dev/null
@@ -0,0 +1,5 @@
+defmodule Wampex.Role.Caller do
+  def add(roles) do
+    Map.put(roles, :caller, %{})
+  end
+end
diff --git a/lib/wampex/roles/publisher.ex b/lib/wampex/roles/publisher.ex
new file mode 100644 (file)
index 0000000..1207c2d
--- /dev/null
@@ -0,0 +1,23 @@
+defmodule Wampex.Role.Publisher do
+  @publish 16
+
+  def add(roles) do
+    Map.put(roles, :publisher, %{})
+  end
+
+  def publish(topic) do
+    [@publish, %{}, topic]
+  end
+
+  def publish(topic, opts) do
+    [@publish, opts, topic]
+  end
+
+  def publish(topic, args_l, opts) do
+    [@publish, opts, topic, args_l]
+  end
+
+  def publish(topic, args_l, args_kw, opts) do
+    [@publish, opts, topic, args_l, args_kw]
+  end
+end
diff --git a/lib/wampex/roles/subscriber.ex b/lib/wampex/roles/subscriber.ex
new file mode 100644 (file)
index 0000000..84f4fe1
--- /dev/null
@@ -0,0 +1,17 @@
+defmodule Wampex.Role.Subscriber do
+  @moduledoc false
+  @subscribe 32
+  @unsubscribe 34
+
+  def add(roles) do
+    Map.put(roles, :subscriber, %{})
+  end
+
+  def subscribe(topic, opts \\ %{}) when is_binary(topic) and is_map(opts) do
+    [@subscribe, opts, topic]
+  end
+
+  def unsubscribe(sub_id) do
+    [@unsubscribe, sub_id]
+  end
+end
index e6283e1b12505510b64b1d138ae13a02a6cc8b8b..b2bf4c4e4206f471d86f646587c3dec9aebd457f 100644 (file)
@@ -1,8 +1,11 @@
 defmodule Wampex.Session do
   use StatesLanguage, data: "priv/session.json"
 
+  @max_id 9_007_199_254_740_992
+
   alias StatesLanguage, as: SL
   alias Wampex.Session, as: Sess
+  alias Wampex.Realm
   alias Wampex.Serializer.JSON
   alias Wampex.Transport.WebSocket
 
@@ -11,28 +14,51 @@ defmodule Wampex.Session do
     :url,
     :transport_pid,
     :message,
-    request_id: 1,
+    :realm,
+    roles: [],
+    request_id: 0,
     transport: WebSocket,
     protocol: "wamp.2.json",
     serializer: JSON
   ]
 
-  def get_id do
-    Enum.random(1..9_007_199_254_740_992)
+  def get_request_id(current_id) when current_id == @max_id do
+    1
+  end
+
+  def get_request_id(current_id) do
+    current_id + 1
+  end
+
+  def send_request(p, request) do
+    __MODULE__.cast(p, {:send_request, request})
+  end
+
+  def handle_cast(
+        {:send_request, request},
+        "Established",
+        %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
+      ) do
+    request_id = get_request_id(r_id)
+    request = List.insert_at(request, 1, request_id)
+    tt.send_request(t, request)
+    {:ok, %SL{data | data: %Sess{sess | request_id: request_id}}, []}
   end
 
   def handle_resource(
         "Hello",
         _,
         "Init",
-        %SL{data: %Sess{transport: tt, transport_pid: t}} = data
+        %SL{
+          data: %Sess{transport: tt, transport_pid: t, roles: roles, realm: %Realm{name: realm}}
+        } = data
       ) do
-    tt.send_message(
+    tt.send_request(
       t,
       [
         1,
-        "com.myrealm",
-        %{roles: %{publisher: %{}, subscriber: %{}, caller: %{}, callee: %{}}}
+        realm,
+        %{roles: Enum.reduce(roles, %{}, fn r, acc -> r.add(acc) end)}
       ]
     )
 
@@ -50,7 +76,7 @@ defmodule Wampex.Session do
   end
 
   def handle_resource("HandleMessage", _, _, data) do
-    Logger.info("Handling Message")
+    Logger.info("Handling Message #{inspect(data.data.message)}")
     {:ok, data, [{:next_event, :internal, :noop}]}
   end
 
@@ -63,14 +89,14 @@ defmodule Wampex.Session do
         "InitTransport",
         _,
         "WaitForTransport",
-        %SL{data: %Sess{url: url, transport: t, serializer: s, protocol: p} = sess} = data
+        data
       ) do
-    {:ok, pid} = t.start_link(url, self(), p, s)
-    {:ok, %SL{data | data: %Sess{sess | transport_pid: pid}}, []}
+    Logger.info("Waiting for transport to connect...")
+    {:ok, data, []}
   end
 
   def handle_resource("Established", _, "Established", data) do
-    Logger.info("Session #{data.data.id} Established")
+    Logger.info("Session #{data.data.id}")
     {:ok, data, []}
   end
 
index 8f7fe6605f0390794abb39eb5cdd4eea83084bd5..434d55f83c33058674724e9eb819057a644cdedd 100644 (file)
@@ -6,9 +6,10 @@ defmodule Wampex.Transport.WebSocket do
   require Logger
 
   @header "Sec-WebSocket-Protocol"
+  @ping_interval 20_000
 
-  def send_message(t, data) do
-    WebSockex.cast(t, {:send_message, data})
+  def send_request(t, data) do
+    WebSockex.cast(t, {:send_request, data})
   end
 
   def connected?(t, resp) do
@@ -16,11 +17,11 @@ defmodule Wampex.Transport.WebSocket do
   end
 
   def start_link(
-        url,
-        session,
-        protocol,
-        serializer,
-        opts \\ []
+        url: url,
+        session: session,
+        protocol: protocol,
+        serializer: serializer,
+        opts: opts
       ) do
     url
     |> Conn.new(extra_headers: [{@header, protocol}])
@@ -33,11 +34,21 @@ defmodule Wampex.Transport.WebSocket do
 
   def handle_connect(%Conn{host: h, port: p, path: pa, query: q} = _conn, state) do
     Logger.info("Connected to #{h}:#{p}#{pa}?#{q}")
+    Process.send_after(self(), :ping, @ping_interval)
     send(state.session, {:connected, true})
     {:ok, %{state | connected: true}}
   end
 
-  def handle_cast({:send_message, data}, %{serializer: s} = state) do
+  def handle_ping(_ping, state) do
+    {:reply, :pong, state}
+  end
+
+  def handle_pong(_pong, state) do
+    {:ok, state}
+  end
+
+  def handle_cast({:send_request, data}, %{serializer: s} = state) do
+    Logger.info("Sending Request #{inspect(data)}")
     {:reply, {s.data_type(), s.serialize!(data)}, state}
   end
 
@@ -46,6 +57,11 @@ defmodule Wampex.Transport.WebSocket do
     {:ok, state}
   end
 
+  def handle_info(:ping, state) do
+    Process.send_after(self(), :ping, @ping_interval)
+    {:reply, :ping, state}
+  end
+
   def handle_frame({type, data}, %{serializer: s} = state) do
     Logger.info("Received #{type} data #{data}")
     data = s.deserialize!(data)
@@ -60,5 +76,6 @@ defmodule Wampex.Transport.WebSocket do
 
   def handle_message(message, state) do
     send(state.session, {:set_message, message})
+    {:ok, state}
   end
 end