From: Christopher Date: Mon, 17 Feb 2020 06:43:26 +0000 (-0600) Subject: adds msgpack support X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=b4f5fedc0774f4778e1f5c4f39f83e870b96adc0;p=wampex_router.git adds msgpack support --- diff --git a/lib/test.ex b/lib/test.ex index 9a42332..d995ea1 100644 --- a/lib/test.ex +++ b/lib/test.ex @@ -1,22 +1,29 @@ -defmodule Wampex.Test do +defmodule Test do alias Wampex.{Realm, Session} alias Wampex.Role.{Callee, Caller, Publisher, Subscriber} + alias Wampex.Serializer.MessagePack @url "ws://localhost:18080/ws" @realm %Realm{name: "com.myrealm"} @roles [Callee, Caller, Publisher, Subscriber] @device "s87d6f8s7df6" + @session %Session{ + serializer: MessagePack, + protocol: "wamp.2.msgpack", + url: @url, + realm: @realm, + roles: @roles + } + def register do - sess = %Session{url: @url, realm: @realm, roles: @roles} - Wampex.start_link(TestCallee, sess) + Wampex.start_link(TestCallee, @session) :timer.sleep(500) Wampex.send_request(TestCallee, Callee.register("com.actuator.#{@device}.light")) end def call do - sess = %Session{url: @url, realm: @realm, roles: @roles} - Wampex.start_link(TestCaller, sess) + Wampex.start_link(TestCaller, @session) :timer.sleep(500) Wampex.send_request( @@ -33,15 +40,13 @@ defmodule Wampex.Test do end def subscribe do - sess = %Session{url: @url, realm: @realm, roles: @roles} - Wampex.start_link(TestSubscriber, sess) + Wampex.start_link(TestSubscriber, @session) :timer.sleep(500) Wampex.send_request(TestSubscriber, Subscriber.subscribe("com.data.temp")) end def publish do - sess = %Session{url: @url, realm: @realm, roles: @roles} - Wampex.start_link(TestPublisher, sess) + Wampex.start_link(TestPublisher, @session) :timer.sleep(500) Enum.each(1..10, fn _ -> diff --git a/lib/wampex/serializers/message_pack.ex b/lib/wampex/serializers/message_pack.ex new file mode 100644 index 0000000..65078b5 --- /dev/null +++ b/lib/wampex/serializers/message_pack.ex @@ -0,0 +1,13 @@ +defmodule Wampex.Serializer.MessagePack do + def data_type, do: :binary + def serialize!(data), do: :msgpack.pack(data, []) + def serialize(data), do: :msgpack.pack(data, []) + + def deserialize!(data) do + {:ok, res} = :msgpack.unpack(data, [{:known_atoms, []}, {:unpack_str, :as_binary}]) + res + end + + def deserialize(data), + do: :msgpack.unpack(data, [{:known_atoms, []}, {:unpack_str, :as_binary}]) +end diff --git a/lib/wampex/transport.ex b/lib/wampex/transports/web_socket.ex similarity index 92% rename from lib/wampex/transport.ex rename to lib/wampex/transports/web_socket.ex index cf6f835..61e6a8b 100644 --- a/lib/wampex/transport.ex +++ b/lib/wampex/transports/web_socket.ex @@ -62,9 +62,9 @@ defmodule Wampex.Transport.WebSocket do {:reply, :ping, state} end - def handle_frame({_type, message}, %{serializer: s} = state) do + def handle_frame({type, message}, %{serializer: s} = state) do message = s.deserialize!(message) - Logger.info("Received #{inspect(message)}") + Logger.info("Received #{type} data: #{inspect(message)}") send(state.session, {:set_message, message}) {:ok, state} end diff --git a/mix.exs b/mix.exs index 3a1fa87..071b191 100644 --- a/mix.exs +++ b/mix.exs @@ -22,6 +22,7 @@ defmodule Wampex.MixProject do defp deps do [ {:jason, "~> 1.1"}, + {:msgpack, "~> 0.7.0"}, {:states_language, "~> 0.2"}, {:websockex, "~> 0.4"} # {:dep_from_hexpm, "~> 0.3.0"}, diff --git a/mix.lock b/mix.lock index 3fadc7a..b73774c 100644 --- a/mix.lock +++ b/mix.lock @@ -3,6 +3,7 @@ "elixpath": {:hex, :elixpath, "0.1.0", "f860e931db7bda6856dc68145694ca429643cc068ef30d7ff6b4096d4357963e", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "30ce06079b41f1f5216ea2cd11605cfe4c82239628555cb3fde9f10055a6eb67"}, "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"}, "json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"}, + "msgpack": {:hex, :msgpack, "0.7.0", "128ae0a2227c7e7a2847c0f0f73551c268464f8c1ee96bffb920bc0a5712b295", [:rebar3], [], "hexpm", "4649353da003e6f438d105e4b1e0f17757f6f5ec8687a6f30875ff3ac4ce2a51"}, "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"}, "states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"}, "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"}, diff --git a/priv/session.json b/priv/session.json index 642e173..7cb0f50 100644 --- a/priv/session.json +++ b/priv/session.json @@ -21,11 +21,21 @@ "Next": "Established", "Catch": [ { - "ErrorEquals": [":abort", ":invalid_credentials"], + "ErrorEquals": [":abort"], "Next": "Abort" + }, + { + "ErrorEquals": [":invalid_credentials"], + "Next": "InvalidCredentials" } ] }, + "InvalidCredentials": { + "Type": "Task", + "Resource": "InvalidCredentials", + "TransitionEvent": ":error", + "Next": "Abort" + }, "Established": { "Type": "Choice", "Resource": "Established", @@ -35,7 +45,7 @@ "Next": "HandleMessage" }, { - "StringEquals": ":good_bye", + "StringEquals": ":goodbye", "Next": "GoodBye" }, { @@ -54,7 +64,7 @@ }, { "StringEquals": ":event", - "Next": "BroadcastEvent" + "Next": "Event" }, { "StringEquals": ":published", @@ -94,6 +104,11 @@ } ] }, + "Event": { + "Type": "Task", + "Resource": "HandleEvent", + "Next": "Established" + }, "Subscribed": { "Type": "Task", "Resource": "HandleSubscribed", @@ -109,11 +124,6 @@ "Resource": "HandleUnsubscribed", "Next": "Established" }, - "Subscribed": { - "Type": "Task", - "Resource": "HandleSubscribed", - "Next": "Established" - }, "Result": { "Type": "Task", "Resource": "HandleResult",