]> Entropealabs - wampex_router.git/commitdiff
initial handling of all incoming messages
authorChristopher <chris@entropealabs.com>
Mon, 17 Feb 2020 05:46:39 +0000 (23:46 -0600)
committerChristopher <chris@entropealabs.com>
Mon, 17 Feb 2020 05:46:39 +0000 (23:46 -0600)
lib/test.ex
lib/wampex/messages.ex [new file with mode: 0644]
lib/wampex/roles/callee.ex
lib/wampex/roles/caller.ex
lib/wampex/session.ex
lib/wampex/transport.ex
priv/session.json

index 6a6d77a2e0183fa3dfb677e59473e9c1bf0a2c7a..9a42332733be9940f9177999a491024b39f018bb 100644 (file)
@@ -5,6 +5,32 @@ defmodule Wampex.Test do
   @url "ws://localhost:18080/ws"
   @realm %Realm{name: "com.myrealm"}
   @roles [Callee, Caller, Publisher, Subscriber]
+  @device "s87d6f8s7df6"
+
+  def register do
+    sess = %Session{url: @url, realm: @realm, roles: @roles}
+    Wampex.start_link(TestCallee, sess)
+    :timer.sleep(500)
+    Wampex.send_request(TestCallee, Callee.register("com.actuator.#{@device}.light"))
+  end
+
+  def call do
+    sess = %Session{url: @url, realm: @realm, roles: @roles}
+    Wampex.start_link(TestCaller, sess)
+    :timer.sleep(500)
+
+    Wampex.send_request(
+      TestCaller,
+      Caller.call(
+        "com.actuator.#{@device}.light",
+        [1],
+        %{
+          color: "#FFFFFF"
+        },
+        %{}
+      )
+    )
+  end
 
   def subscribe do
     sess = %Session{url: @url, realm: @realm, roles: @roles}
diff --git a/lib/wampex/messages.ex b/lib/wampex/messages.ex
new file mode 100644 (file)
index 0000000..10adef2
--- /dev/null
@@ -0,0 +1,121 @@
+defmodule Wampex.Messages do
+  alias StatesLanguage, as: SL
+  alias Wampex.Session
+
+  require Logger
+
+  @welcome 2
+  @abort 3
+  @goodbye 6
+  @error 8
+  @published 17
+  @subscribed 33
+  @unsubscribed 35
+  @event 36
+  @result 50
+  @registered 65
+  @unregistered 67
+  @invocation 68
+
+  def handle([@welcome, session_id, _dets], %SL{data: %Session{} = data} = sl) do
+    {%SL{sl | data: %Session{data | id: session_id}}, [{:next_event, :internal, :noop}]}
+  end
+
+  def handle([@abort, _dets, reason], sl) do
+    Logger.warn("Session aborted: #{reason}")
+    {sl, [{:next_event, :internal, :abort}]}
+  end
+
+  def handle([@goodbye, _dets, reason], sl) do
+    Logger.warn("Goodbye: #{reason}")
+    {sl, [{:next_event, :internal, :goodbye}]}
+  end
+
+  def handle([@error, type, id, dets, error], sl) do
+    handle([@error, type, id, dets, error, [], %{}], sl)
+  end
+
+  def handle([@error, type, id, dets, error, arg_l], sl) do
+    handle([@error, type, id, dets, error, arg_l, %{}], sl)
+  end
+
+  def handle([@error, type, id, dets, error, arg_l, arg_kw], sl) do
+    Logger.error("""
+    Request Type: #{type} 
+    Id: #{id}
+    Details: #{inspect(dets)}
+    Error: #{error}
+    ArgList: #{inspect(arg_l)}
+    ArgKW: #{inspect(arg_kw)}
+    """)
+
+    {sl, [{:next_event, :internal, :noop}]}
+  end
+
+  def handle([@published, _request_id, id], sl) do
+    Logger.info("Published: #{id}")
+    {sl, [{:next_event, :internal, :noop}]}
+  end
+
+  def handle([@subscribed, _request_id, id], sl) do
+    Logger.info("Subscribed: #{id}")
+    {sl, [{:next_event, :internal, :noop}]}
+  end
+
+  def handle([@unsubscribed, request_id], sl) do
+    Logger.info("Unsubscribed: #{request_id}")
+    {sl, [{:next_event, :internal, :noop}]}
+  end
+
+  def handle([@event, sub_id, pub_id, dets], sl) do
+    handle([@event, sub_id, pub_id, dets, [], %{}], sl)
+  end
+
+  def handle([@event, sub_id, pub_id, dets, arg_l], sl) do
+    handle([@event, sub_id, pub_id, dets, arg_l, %{}], sl)
+  end
+
+  def handle([@event, sub_id, pub_id, _dets, arg_l, arg_kw], sl) do
+    Logger.info(
+      "Got event for #{sub_id} from #{pub_id} with args #{inspect(arg_l)} and #{inspect(arg_kw)}"
+    )
+
+    {sl, [{:next_event, :internal, :noop}]}
+  end
+
+  def handle([@result, id, dets], sl) do
+    handle([@result, id, dets, [], %{}], sl)
+  end
+
+  def handle([@result, id, dets, arg_l], sl) do
+    handle([@result, id, dets, arg_l, %{}], sl)
+  end
+
+  def handle([@result, id, _dets, arg_l, arg_kw], sl) do
+    Logger.info("Received result for request #{id} #{inspect(arg_l)}, #{inspect(arg_kw)}")
+    {sl, [{:next_event, :internal, :noop}]}
+  end
+
+  def handle([@unregistered, request_id], sl) do
+    Logger.info("Unregistered #{request_id}")
+    {sl, [{:next_event, :internal, :noop}]}
+  end
+
+  def handle([@registered, _request_id, id], sl) do
+    Logger.info("Registered #{id}")
+    {sl, [{:next_event, :internal, :noop}]}
+  end
+
+  def handle([@invocation, id, dets], sl) do
+    handle([@invocation, id, dets, [], %{}], sl)
+  end
+
+  def handle([@invocation, id, dets, arg_l], sl) do
+    handle([@invocation, id, dets, arg_l, %{}], sl)
+  end
+
+  def handle([@invocation, id, _dets, arg_l, arg_kw], sl) do
+    Logger.info("Received invocation for request #{id} #{inspect(arg_l)}, #{inspect(arg_kw)}")
+    {sl, [{:next_event, :internal, :noop}]}
+  end
+end
index 3b802aaa151db862cd59aeca57ff68c53e4529e3..417a043f50634ad6d954b5b29004d84d02654353 100644 (file)
@@ -1,5 +1,16 @@
 defmodule Wampex.Role.Callee do
+  @register 64
+  @unregister 66
+
   def add(roles) do
     Map.put(roles, :callee, %{})
   end
+
+  def register(procedure) do
+    [@register, %{}, procedure]
+  end
+
+  def unregister(id) do
+    [@unregister, id]
+  end
 end
index e69a6016d0327e61cd5f7e495f57c056728732e2..03cd00a507aa69f6ebb9a3d13f145e295088f65c 100644 (file)
@@ -1,5 +1,23 @@
 defmodule Wampex.Role.Caller do
+  @call 48
+
   def add(roles) do
     Map.put(roles, :caller, %{})
   end
+
+  def call(procedure) do
+    [@call, %{}, procedure]
+  end
+
+  def call(procedure, opts) do
+    [@call, opts, procedure]
+  end
+
+  def call(procedure, args_l, opts) do
+    [@call, opts, procedure, args_l]
+  end
+
+  def call(procedure, args_l, args_kw, opts) do
+    [@call, opts, procedure, args_l, args_kw]
+  end
 end
index b2bf4c4e4206f471d86f646587c3dec9aebd457f..c3acead7362b414342463bf9b0df672564b3fd7d 100644 (file)
@@ -5,7 +5,7 @@ defmodule Wampex.Session do
 
   alias StatesLanguage, as: SL
   alias Wampex.Session, as: Sess
-  alias Wampex.Realm
+  alias Wampex.{Messages, Realm}
   alias Wampex.Serializer.JSON
   alias Wampex.Transport.WebSocket
 
@@ -62,7 +62,7 @@ defmodule Wampex.Session do
       ]
     )
 
-    {:ok, data, []}
+    {:ok, data, [{:next_event, :internal, :hello_sent}]}
   end
 
   def handle_resource(
@@ -75,9 +75,10 @@ defmodule Wampex.Session do
     {:ok, data, []}
   end
 
-  def handle_resource("HandleMessage", _, _, data) do
-    Logger.info("Handling Message #{inspect(data.data.message)}")
-    {:ok, data, [{:next_event, :internal, :noop}]}
+  def handle_resource("HandleMessage", _, _, %SL{data: %Sess{message: msg}} = data) do
+    Logger.info("Handling Message #{inspect(msg)}")
+    {data, actions} = Messages.handle(msg, data)
+    {:ok, data, actions}
   end
 
   def handle_resource("Abort", _, _, data) do
@@ -105,10 +106,6 @@ defmodule Wampex.Session do
     {:ok, data, []}
   end
 
-  def handle_info({:set_session_id, id}, "Init", %SL{data: %Sess{} = sess} = data) do
-    {:ok, %SL{data | data: %Sess{sess | id: id}}, [{:next_event, :internal, :welcome}]}
-  end
-
   def handle_info({:set_message, message}, "Established", %SL{data: %Sess{} = sess} = data) do
     {:ok, %SL{data | data: %Sess{sess | message: message}},
      [{:next_event, :internal, :message_received}]}
index 434d55f83c33058674724e9eb819057a644cdedd..cf6f835fc47f46061406b49a02f045888efec38c 100644 (file)
@@ -62,19 +62,9 @@ defmodule Wampex.Transport.WebSocket do
     {:reply, :ping, state}
   end
 
-  def handle_frame({type, data}, %{serializer: s} = state) do
-    Logger.info("Received #{type} data #{data}")
-    data = s.deserialize!(data)
-    Logger.info("Deserialized #{inspect(data)}")
-    handle_message(data, state)
-  end
-
-  def handle_message([2, id, _dets], state) do
-    send(state.session, {:set_session_id, id})
-    {:ok, state}
-  end
-
-  def handle_message(message, state) do
+  def handle_frame({_type, message}, %{serializer: s} = state) do
+    message = s.deserialize!(message)
+    Logger.info("Received #{inspect(message)}")
     send(state.session, {:set_message, message})
     {:ok, state}
   end
index 95c221876093b569d169ade7c1710de81dee3b51..642e173fea7d68a2d9b643d723d4a21ab984ffbb 100644 (file)
@@ -17,7 +17,7 @@
     "Init": {
       "Type": "Task",
       "Resource": "Hello",
-      "TransitionEvent": ":welcome",
+      "TransitionEvent": ":hello_sent",
       "Next": "Established",
       "Catch": [
         {
           "StringEquals": ":event",
           "Next": "BroadcastEvent"
         },
-        {
-          "StringEquals": ":yield",
-          "Next": "Yield"
-        },
         {
           "StringEquals": ":published",
           "Next": "Published"
         {
           "StringEquals": ":invocation",
           "Next": "Invocation"
+        },
+        {
+          "StringEquals": ":abort",
+          "Next": "Abort"
+        },
+        {
+          "StringEquals": ":goodbye",
+          "Next": "GoodBye"
         }
       ]
     },
-    "Yield": {
-      "Type": "Task",
-      "Resource": "YieldResult",
-      "TransitionEvent": ":yielded",
-      "Next": "Established"
-    },
-    "BroadcastEvent": {
-      "Type": "Task",
-      "Resource": "HandleBroadcast",
-      "Next": "Established"
-    },
     "Subscribed": {
       "Type": "Task",
       "Resource": "HandleSubscribed",