From: Christopher Date: Mon, 17 Feb 2020 05:46:39 +0000 (-0600) Subject: initial handling of all incoming messages X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=611687d17f96179853f5383933ab7600b61f6fa8;p=wampex_router.git initial handling of all incoming messages --- diff --git a/lib/test.ex b/lib/test.ex index 6a6d77a..9a42332 100644 --- a/lib/test.ex +++ b/lib/test.ex @@ -5,6 +5,32 @@ defmodule Wampex.Test do @url "ws://localhost:18080/ws" @realm %Realm{name: "com.myrealm"} @roles [Callee, Caller, Publisher, Subscriber] + @device "s87d6f8s7df6" + + def register do + sess = %Session{url: @url, realm: @realm, roles: @roles} + Wampex.start_link(TestCallee, sess) + :timer.sleep(500) + Wampex.send_request(TestCallee, Callee.register("com.actuator.#{@device}.light")) + end + + def call do + sess = %Session{url: @url, realm: @realm, roles: @roles} + Wampex.start_link(TestCaller, sess) + :timer.sleep(500) + + Wampex.send_request( + TestCaller, + Caller.call( + "com.actuator.#{@device}.light", + [1], + %{ + color: "#FFFFFF" + }, + %{} + ) + ) + end def subscribe do sess = %Session{url: @url, realm: @realm, roles: @roles} diff --git a/lib/wampex/messages.ex b/lib/wampex/messages.ex new file mode 100644 index 0000000..10adef2 --- /dev/null +++ b/lib/wampex/messages.ex @@ -0,0 +1,121 @@ +defmodule Wampex.Messages do + alias StatesLanguage, as: SL + alias Wampex.Session + + require Logger + + @welcome 2 + @abort 3 + @goodbye 6 + @error 8 + @published 17 + @subscribed 33 + @unsubscribed 35 + @event 36 + @result 50 + @registered 65 + @unregistered 67 + @invocation 68 + + def handle([@welcome, session_id, _dets], %SL{data: %Session{} = data} = sl) do + {%SL{sl | data: %Session{data | id: session_id}}, [{:next_event, :internal, :noop}]} + end + + def handle([@abort, _dets, reason], sl) do + Logger.warn("Session aborted: #{reason}") + {sl, [{:next_event, :internal, :abort}]} + end + + def handle([@goodbye, _dets, reason], sl) do + Logger.warn("Goodbye: #{reason}") + {sl, [{:next_event, :internal, :goodbye}]} + end + + def handle([@error, type, id, dets, error], sl) do + handle([@error, type, id, dets, error, [], %{}], sl) + end + + def handle([@error, type, id, dets, error, arg_l], sl) do + handle([@error, type, id, dets, error, arg_l, %{}], sl) + end + + def handle([@error, type, id, dets, error, arg_l, arg_kw], sl) do + Logger.error(""" + Request Type: #{type} + Id: #{id} + Details: #{inspect(dets)} + Error: #{error} + ArgList: #{inspect(arg_l)} + ArgKW: #{inspect(arg_kw)} + """) + + {sl, [{:next_event, :internal, :noop}]} + end + + def handle([@published, _request_id, id], sl) do + Logger.info("Published: #{id}") + {sl, [{:next_event, :internal, :noop}]} + end + + def handle([@subscribed, _request_id, id], sl) do + Logger.info("Subscribed: #{id}") + {sl, [{:next_event, :internal, :noop}]} + end + + def handle([@unsubscribed, request_id], sl) do + Logger.info("Unsubscribed: #{request_id}") + {sl, [{:next_event, :internal, :noop}]} + end + + def handle([@event, sub_id, pub_id, dets], sl) do + handle([@event, sub_id, pub_id, dets, [], %{}], sl) + end + + def handle([@event, sub_id, pub_id, dets, arg_l], sl) do + handle([@event, sub_id, pub_id, dets, arg_l, %{}], sl) + end + + def handle([@event, sub_id, pub_id, _dets, arg_l, arg_kw], sl) do + Logger.info( + "Got event for #{sub_id} from #{pub_id} with args #{inspect(arg_l)} and #{inspect(arg_kw)}" + ) + + {sl, [{:next_event, :internal, :noop}]} + end + + def handle([@result, id, dets], sl) do + handle([@result, id, dets, [], %{}], sl) + end + + def handle([@result, id, dets, arg_l], sl) do + handle([@result, id, dets, arg_l, %{}], sl) + end + + def handle([@result, id, _dets, arg_l, arg_kw], sl) do + Logger.info("Received result for request #{id} #{inspect(arg_l)}, #{inspect(arg_kw)}") + {sl, [{:next_event, :internal, :noop}]} + end + + def handle([@unregistered, request_id], sl) do + Logger.info("Unregistered #{request_id}") + {sl, [{:next_event, :internal, :noop}]} + end + + def handle([@registered, _request_id, id], sl) do + Logger.info("Registered #{id}") + {sl, [{:next_event, :internal, :noop}]} + end + + def handle([@invocation, id, dets], sl) do + handle([@invocation, id, dets, [], %{}], sl) + end + + def handle([@invocation, id, dets, arg_l], sl) do + handle([@invocation, id, dets, arg_l, %{}], sl) + end + + def handle([@invocation, id, _dets, arg_l, arg_kw], sl) do + Logger.info("Received invocation for request #{id} #{inspect(arg_l)}, #{inspect(arg_kw)}") + {sl, [{:next_event, :internal, :noop}]} + end +end diff --git a/lib/wampex/roles/callee.ex b/lib/wampex/roles/callee.ex index 3b802aa..417a043 100644 --- a/lib/wampex/roles/callee.ex +++ b/lib/wampex/roles/callee.ex @@ -1,5 +1,16 @@ defmodule Wampex.Role.Callee do + @register 64 + @unregister 66 + def add(roles) do Map.put(roles, :callee, %{}) end + + def register(procedure) do + [@register, %{}, procedure] + end + + def unregister(id) do + [@unregister, id] + end end diff --git a/lib/wampex/roles/caller.ex b/lib/wampex/roles/caller.ex index e69a601..03cd00a 100644 --- a/lib/wampex/roles/caller.ex +++ b/lib/wampex/roles/caller.ex @@ -1,5 +1,23 @@ defmodule Wampex.Role.Caller do + @call 48 + def add(roles) do Map.put(roles, :caller, %{}) end + + def call(procedure) do + [@call, %{}, procedure] + end + + def call(procedure, opts) do + [@call, opts, procedure] + end + + def call(procedure, args_l, opts) do + [@call, opts, procedure, args_l] + end + + def call(procedure, args_l, args_kw, opts) do + [@call, opts, procedure, args_l, args_kw] + end end diff --git a/lib/wampex/session.ex b/lib/wampex/session.ex index b2bf4c4..c3acead 100644 --- a/lib/wampex/session.ex +++ b/lib/wampex/session.ex @@ -5,7 +5,7 @@ defmodule Wampex.Session do alias StatesLanguage, as: SL alias Wampex.Session, as: Sess - alias Wampex.Realm + alias Wampex.{Messages, Realm} alias Wampex.Serializer.JSON alias Wampex.Transport.WebSocket @@ -62,7 +62,7 @@ defmodule Wampex.Session do ] ) - {:ok, data, []} + {:ok, data, [{:next_event, :internal, :hello_sent}]} end def handle_resource( @@ -75,9 +75,10 @@ defmodule Wampex.Session do {:ok, data, []} end - def handle_resource("HandleMessage", _, _, data) do - Logger.info("Handling Message #{inspect(data.data.message)}") - {:ok, data, [{:next_event, :internal, :noop}]} + def handle_resource("HandleMessage", _, _, %SL{data: %Sess{message: msg}} = data) do + Logger.info("Handling Message #{inspect(msg)}") + {data, actions} = Messages.handle(msg, data) + {:ok, data, actions} end def handle_resource("Abort", _, _, data) do @@ -105,10 +106,6 @@ defmodule Wampex.Session do {:ok, data, []} end - def handle_info({:set_session_id, id}, "Init", %SL{data: %Sess{} = sess} = data) do - {:ok, %SL{data | data: %Sess{sess | id: id}}, [{:next_event, :internal, :welcome}]} - end - def handle_info({:set_message, message}, "Established", %SL{data: %Sess{} = sess} = data) do {:ok, %SL{data | data: %Sess{sess | message: message}}, [{:next_event, :internal, :message_received}]} diff --git a/lib/wampex/transport.ex b/lib/wampex/transport.ex index 434d55f..cf6f835 100644 --- a/lib/wampex/transport.ex +++ b/lib/wampex/transport.ex @@ -62,19 +62,9 @@ defmodule Wampex.Transport.WebSocket do {:reply, :ping, state} end - def handle_frame({type, data}, %{serializer: s} = state) do - Logger.info("Received #{type} data #{data}") - data = s.deserialize!(data) - Logger.info("Deserialized #{inspect(data)}") - handle_message(data, state) - end - - def handle_message([2, id, _dets], state) do - send(state.session, {:set_session_id, id}) - {:ok, state} - end - - def handle_message(message, state) do + def handle_frame({_type, message}, %{serializer: s} = state) do + message = s.deserialize!(message) + Logger.info("Received #{inspect(message)}") send(state.session, {:set_message, message}) {:ok, state} end diff --git a/priv/session.json b/priv/session.json index 95c2218..642e173 100644 --- a/priv/session.json +++ b/priv/session.json @@ -17,7 +17,7 @@ "Init": { "Type": "Task", "Resource": "Hello", - "TransitionEvent": ":welcome", + "TransitionEvent": ":hello_sent", "Next": "Established", "Catch": [ { @@ -56,10 +56,6 @@ "StringEquals": ":event", "Next": "BroadcastEvent" }, - { - "StringEquals": ":yield", - "Next": "Yield" - }, { "StringEquals": ":published", "Next": "Published" @@ -87,20 +83,17 @@ { "StringEquals": ":invocation", "Next": "Invocation" + }, + { + "StringEquals": ":abort", + "Next": "Abort" + }, + { + "StringEquals": ":goodbye", + "Next": "GoodBye" } ] }, - "Yield": { - "Type": "Task", - "Resource": "YieldResult", - "TransitionEvent": ":yielded", - "Next": "Established" - }, - "BroadcastEvent": { - "Type": "Task", - "Resource": "HandleBroadcast", - "Next": "Established" - }, "Subscribed": { "Type": "Task", "Resource": "HandleSubscribed",