From e44832bc9104ed664ac83b06d61ccf9c6e6588c3 Mon Sep 17 00:00:00 2001 From: Christopher Date: Wed, 1 Apr 2020 17:19:08 -0500 Subject: [PATCH] update wampex lib --- lib/router/session.ex | 44 +++++- mix.exs | 8 +- mix.lock | 4 +- priv/client.json | 128 ------------------ .../20200319210508_create_realms.exs | 29 ---- priv/router.json | 8 +- test/support/test_callee.ex | 16 ++- test/wampex_test.exs | 38 ++++-- 8 files changed, 91 insertions(+), 184 deletions(-) delete mode 100644 priv/client.json delete mode 100644 priv/repo/migrations/20200319210508_create_realms.exs diff --git a/lib/router/session.ex b/lib/router/session.ex index da5a465..9dac0fe 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -5,7 +5,7 @@ defmodule Wampex.Router.Session do use StatesLanguage, data: "priv/router.json" alias StatesLanguage, as: SL - alias Wampex.Roles.{Broker, Caller, Dealer, Peer} + alias Wampex.Roles.{Broker, Dealer, Peer} alias Wampex.Roles.Broker.{Event, Published, Subscribed, Unsubscribed} alias Wampex.Roles.Callee.{Register, Unregister, Yield} alias Wampex.Roles.Caller.Call @@ -96,7 +96,7 @@ defmodule Wampex.Router.Session do @subscribe "Subscribe" @unsubscribe "Unsubscribe" @publish "Publish" - # @error "Error" + @invocation_error "InvocationError" @abort "Abort" @goodbye "Goodbye" @@ -113,6 +113,7 @@ defmodule Wampex.Router.Session do @handle_subscribe "HandleSubscribe" @handle_unsubscribe "HandleUnsubscribe" @handle_publish "HandlePublish" + @handle_invocation_error "HandleInvocationError" # @handle_interrupt "HandleInterrupt" @handle_abort "HandleAbort" @handle_goodbye "HandleGoodbye" @@ -468,7 +469,7 @@ defmodule Wampex.Router.Session do else {:error, :no_live_callees} -> send_to_peer( - Caller.call_error(%Error{ + Dealer.call_error(%Error{ request_id: call_id, error: "wamp.error.no_callees", details: %{procedure: proc} @@ -481,7 +482,7 @@ defmodule Wampex.Router.Session do :not_found -> send_to_peer( - Caller.call_error(%Error{ + Dealer.call_error(%Error{ request_id: call_id, error: "wamp.error.no_registration", details: %{procedure: proc} @@ -533,6 +534,39 @@ defmodule Wampex.Router.Session do {:ok, %SL{data | data: %Sess{data.data | invocations: inv}}, [{:next_event, :internal, :transition}]} end + @impl true + def handle_resource( + @handle_invocation_error, + _, + @invocation_error, + %SL{ + data: %Sess{ + proxy: proxy, + invocations: inv, + error: %Error{request_id: id, details: dets, arg_list: arg_l, error: err, arg_kw: arg_kw} + } + } = data + ) do + {call_id, _, {pid, node}} = + Enum.find(inv, fn + {_, %Invocation{request_id: ^id}, _} -> true + _ -> false + end) + + send( + {proxy, node}, + {%Error{request_id: call_id, type: 48, error: err, arg_list: arg_l, arg_kw: arg_kw, details: dets}, pid} + ) + + inv = + Enum.filter(inv, fn + {_, %Invocation{request_id: ^id}, _} -> false + _ -> true + end) + + {:ok, %SL{data | data: %Sess{data.data | invocations: inv}}, [{:next_event, :internal, :transition}]} + end + @impl true def handle_resource( @handle_abort, @@ -585,7 +619,7 @@ defmodule Wampex.Router.Session do @impl true def handle_info(%Error{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do - send_to_peer(Caller.call_error(e), tt, t) + send_to_peer(Dealer.call_error(e), tt, t) {:ok, data, []} end diff --git a/mix.exs b/mix.exs index 94fb7d6..6d1acaa 100644 --- a/mix.exs +++ b/mix.exs @@ -38,7 +38,6 @@ defmodule Wampex.Router.MixProject do defp deps do [ - # {:cluster_kv, path: "../cluster_kv"}, {:cluster_kv, git: "https://gitlab.com/entropealabs/cluster_kv.git", tag: "801f732c1cb5a9a9125de34ed2bdf88d2723ed00"}, {:cors_plug, "~> 2.0"}, @@ -50,13 +49,10 @@ defmodule Wampex.Router.MixProject do {:msgpack, "~> 0.7.0"}, {:plug_cowboy, "~> 2.1"}, {:states_language, "~> 0.2"}, - {:wampex, - git: "https://gitlab.com/entropealabs/wampex.git", - tag: "591a03f9ed1585942afc043fe4a73beb7641e1e3", - override: true}, + {:wampex, git: "https://gitlab.com/entropealabs/wampex.git", tag: "c7bb690087f9f14832dcd4f222a8bd92154af78d"}, {:wampex_client, git: "https://gitlab.com/entropealabs/wampex_client.git", - tag: "6861b782ce61df0d12b94953a0fb39281ce499e2", + tag: "40b0f3765cd9c2930d576b203e0e7ab3679f7129", only: [:dev, :test]} ] end diff --git a/mix.lock b/mix.lock index 1e09df0..0dbacfa 100644 --- a/mix.lock +++ b/mix.lock @@ -36,8 +36,8 @@ "states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"}, "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"}, - "wampex": {:git, "https://gitlab.com/entropealabs/wampex.git", "591a03f9ed1585942afc043fe4a73beb7641e1e3", [tag: "591a03f9ed1585942afc043fe4a73beb7641e1e3"]}, - "wampex_client": {:git, "https://gitlab.com/entropealabs/wampex_client.git", "6861b782ce61df0d12b94953a0fb39281ce499e2", [tag: "6861b782ce61df0d12b94953a0fb39281ce499e2"]}, + "wampex": {:git, "https://gitlab.com/entropealabs/wampex.git", "c7bb690087f9f14832dcd4f222a8bd92154af78d", [tag: "c7bb690087f9f14832dcd4f222a8bd92154af78d"]}, + "wampex_client": {:git, "https://gitlab.com/entropealabs/wampex_client.git", "40b0f3765cd9c2930d576b203e0e7ab3679f7129", [tag: "40b0f3765cd9c2930d576b203e0e7ab3679f7129"]}, "websockex": {:hex, :websockex, "0.4.2", "9a3b7dc25655517ecd3f8ff7109a77fce94956096b942836cdcfbc7c86603ecc", [:mix], [], "hexpm", "803cd76e91544b56f0e655e36790be797fa6436db9224f7c303db9b9df2a3df4"}, "xema": {:hex, :xema, "0.11.0", "7b5118418633cffc27092110d02d4faeea938149dd3f6c64299e41e747067e80", [:mix], [{:conv_case, "~> 0.2.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "51491c9a953d65069d4b30aa2f70bc45ff99fd1bc3345bc72ce4e644d01ea14e"}, } diff --git a/priv/client.json b/priv/client.json deleted file mode 100644 index b09b052..0000000 --- a/priv/client.json +++ /dev/null @@ -1,128 +0,0 @@ -{ - "Comment": "Session State Machine", - "StartAt": "WaitForTransport", - "States": { - "WaitForTransport": { - "Type": "Task", - "Resource": "InitTransport", - "TransitionEvent": "{:connected, true}", - "Next": "Init", - "Catch": [ - { - "ErrorEquals": ["{:connected, false}"], - "Next": "Abort" - } - ] - }, - "Init": { - "Type": "Task", - "Resource": "Hello", - "TransitionEvent": ":hello_sent", - "Next": "Handshake", - "Catch": [ - { - "ErrorEquals": [":abort"], - "Next": "Abort" - } - - ] - }, - "Handshake": { - "Type": "Choice", - "Resource": "HandleHandshake", - "Choices": [ - { - "StringEquals": ":message_received", - "Next": "Message" - }, - { - "StringEquals": ":abort", - "Next": "Abort" - } - ] - }, - "Established": { - "Type": "Choice", - "Resource": "HandleEstablished", - "Choices": [ - { - "StringEquals": ":message_received", - "Next": "Message" - }, - { - "StringEquals": ":goodbye", - "Next": "GoodBye" - }, - { - "StringEquals": ":abort", - "Next": "Abort" - } - ] - }, - "Message": { - "Type": "Choice", - "Resource": "HandleMessage", - "Choices": [ - { - "StringEquals": ":established", - "Next": "Established" - }, - { - "StringEquals": ":event", - "Next": "Event" - }, - { - "StringEquals": ":invocation", - "Next": "Invocation" - }, - { - "StringEquals": ":abort", - "Next": "Abort" - }, - { - "StringEquals": ":goodbye", - "Next": "GoodBye" - }, - { - "StringEquals": ":challenge", - "Next": "Challenge" - }, - { - "StringEquals": ":interrupt", - "Next": "Interrupt" - } - ] - }, - "Interrupt": { - "Type": "Task", - "Resource": "HandleInterrupt", - "Next": "Established" - }, - "Challenge": { - "Type": "Task", - "Resource": "HandleChallenge", - "Next": "Handshake", - "TransitionEvent": ":challenged" - }, - "Event": { - "Type": "Task", - "Resource": "HandleEvent", - "Next": "Established" - }, - "Invocation": { - "Type": "Task", - "Resource": "HandleInvocation", - "Next": "Established" - }, - "GoodBye": { - "Type": "Task", - "Resource": "GoodBye", - "End": true - }, - "Abort": { - "Type": "Task", - "Resource": "HandleAbort", - "End": true - } - } -} diff --git a/priv/repo/migrations/20200319210508_create_realms.exs b/priv/repo/migrations/20200319210508_create_realms.exs deleted file mode 100644 index d694b1f..0000000 --- a/priv/repo/migrations/20200319210508_create_realms.exs +++ /dev/null @@ -1,29 +0,0 @@ -defmodule Wampex.Router.Authentication.Repo.Migrations.CreateRealms do - use Ecto.Migration - - def change do - create table("realms", primary_key: false) do - add(:id, :binary_id, primary_key: true, default: fragment("gen_random_uuid()")) - add(:uri, :string, null: false) - add(:parent, :string, null: true) - timestamps() - end - - create(unique_index(:realms, [:uri])) - - create table("peers", primary_key: false) do - add(:id, :binary_id, primary_key: true, default: fragment("gen_random_uuid()")) - add(:authid, :string, null: false) - add(:password, :string, null: false) - add(:salt, :string, null: false) - add(:iterations, :integer, null: false) - add(:keylen, :integer, null: false) - add(:cidr, :string) - add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all), null: false) - timestamps() - end - - create(index(:peers, [:authid])) - create(unique_index(:peers, [:authid, :realm_id])) - end -end diff --git a/priv/router.json b/priv/router.json index 9f60d81..c9bcbf9 100644 --- a/priv/router.json +++ b/priv/router.json @@ -66,8 +66,8 @@ "Next": "Publish" }, { - "StringEquals": ":error", - "Next": "Error" + "StringEquals": ":invocation_error", + "Next": "InvocationError" }, { "StringEquals": ":abort", @@ -134,9 +134,9 @@ "Resource": "HandlePublish", "Next": "Established" }, - "Error":{ + "InvocationError":{ "Type": "Task", - "Resource": "HandleError", + "Resource": "HandleInvocationError", "Next": "Established" }, "GoodBye": { diff --git a/test/support/test_callee.ex b/test/support/test_callee.ex index df81f8b..ad89fc1 100644 --- a/test/support/test_callee.ex +++ b/test/support/test_callee.ex @@ -4,6 +4,7 @@ defmodule TestCallee do require Logger alias Wampex.Client alias Wampex.Roles.{Callee, Dealer} + alias Wampex.Roles.Peer.Error alias Callee.{Register, Yield} alias Dealer.Invocation @@ -12,11 +13,24 @@ defmodule TestCallee do end def init({test, name, device}) do - {:ok, reg} = Client.register(name, %Register{procedure: "com.actuator.#{device}.light"}) + {:ok, _reg} = Client.register(name, %Register{procedure: "com.actuator.#{device}.light"}) + {:ok, reg} = Client.register(name, %Register{procedure: "return.error"}) send(test, {:registered, reg}) {:ok, {test, name, reg}} end + def handle_info( + %Invocation{request_id: id, details: %{"procedure" => "return.error"}, arg_list: al, arg_kw: akw}, + {_test, name, _reg} = state + ) do + Client.error( + name, + %Error{request_id: id, error: "this.is.an.error", arg_list: al, arg_kw: akw} + ) + + {:noreply, state} + end + def handle_info( %Invocation{request_id: id, details: dets, arg_kw: arg_kw} = invocation, {test, name, _reg} = state diff --git a/test/wampex_test.exs b/test/wampex_test.exs index 6200b9e..6c1687f 100644 --- a/test/wampex_test.exs +++ b/test/wampex_test.exs @@ -74,6 +74,26 @@ defmodule WampexTest do ) end + @tag :client + test "caller receives error when callee returns an error" do + callee_name = TestErrorCalleeRespond + Client.start_link(name: callee_name, session: @session) + TestCallee.start_link(self(), callee_name, @device) + assert_receive {_, _} + caller_name = TestErrorCaller + Client.start_link(name: caller_name, session: @session) + + assert %Error{error: "this.is.an.error", arg_list: [1], arg_kw: %{"color" => "#FFFFFF"}} = + Client.call( + caller_name, + %Call{ + procedure: "return.error", + arg_list: [1], + arg_kw: %{color: "#FFFFFF"} + } + ) + end + @tag :client test "callee is invoked and responds and caller gets result" do callee_name = TestCalleeRespond @@ -83,15 +103,15 @@ defmodule WampexTest do caller_name = TestCaller Client.start_link(name: caller_name, session: @session) - %Result{arg_list: ["ok"], arg_kw: %{"color" => "#FFFFFF"}} = - Client.call( - caller_name, - %Call{ - procedure: "com.actuator.#{@device}.light", - arg_list: [1], - arg_kw: %{color: "#FFFFFF"} - } - ) + assert %Result{arg_list: ["ok"], arg_kw: %{"color" => "#FFFFFF"}} = + Client.call( + caller_name, + %Call{ + procedure: "com.actuator.#{@device}.light", + arg_list: [1], + arg_kw: %{color: "#FFFFFF"} + } + ) assert_receive %Invocation{} end -- 2.45.3