From 6e8789accfe21cd8842f5c72087ab4b6feef5b27 Mon Sep 17 00:00:00 2001 From: Christopher Date: Fri, 21 Feb 2020 12:15:24 -0600 Subject: [PATCH] initial support for progressive calls --- lib/wampex.ex | 5 ++++ lib/wampex/session.ex | 49 +++++++++++++++++++++++++++++++++---- test/support/test_callee.ex | 45 ++++++++++++++++++++++++++++++++++ test/wampex_test.exs | 24 ++++++++++++++++++ 4 files changed, 118 insertions(+), 5 deletions(-) diff --git a/lib/wampex.ex b/lib/wampex.ex index 6fdeea0..bc7368e 100644 --- a/lib/wampex.ex +++ b/lib/wampex.ex @@ -87,6 +87,11 @@ defmodule Wampex do Sess.send_request(session_name(name), request, timeout) end + @spec cast_send_request(name :: module(), message(), from :: pid()) :: :ok + def cast_send_request(name, request, pid) do + Sess.cast_send_request(session_name(name), request, pid) + end + @spec cast_send_request(name :: module(), message()) :: :ok def cast_send_request(name, request) do Sess.cast_send_request(session_name(name), request) diff --git a/lib/wampex/session.ex b/lib/wampex/session.ex index 0d98e5d..a71f4a8 100644 --- a/lib/wampex/session.ex +++ b/lib/wampex/session.ex @@ -60,6 +60,11 @@ defmodule Wampex.Session do serializer: module(), requests: [] } + @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message(), from :: pid()) :: + :ok + def cast_send_request(name, request, pid) do + __MODULE__.cast(name, {:send_request, request, pid}) + end @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message()) :: :ok def cast_send_request(name, request) do @@ -103,6 +108,25 @@ defmodule Wampex.Session do {:ok, %SL{data | data: %Sess{sess | message_queue: [{request, from} | mq]}}, []} end + @impl true + def handle_cast( + {:send_request, request, from}, + "Established", + %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data + ) do + request_id = do_send(r_id, tt, t, request) + + {:ok, + %SL{ + data + | data: %Sess{ + sess + | request_id: request_id, + requests: [{request_id, from} | sess.requests] + } + }, []} + end + @impl true def handle_cast( {:send_request, request}, @@ -197,8 +221,11 @@ defmodule Wampex.Session do ) do Logger.debug("Handling Message #{inspect(msg)}") {actions, id, response} = handle_message(msg, roles) + Logger.info("Handle Response: #{inspect(response)}") {%SL{data: sess} = data, response} = maybe_update_response(data, response) + Logger.info("Updated...") {requests, actions} = resp = handle_response(id, actions, requests, response) + Logger.info("Response Handled: #{inspect(actions)}") Logger.debug("Response: #{inspect(resp)}") {:ok, %SL{data | data: %Sess{sess | requests: requests}}, actions} end @@ -306,20 +333,32 @@ defmodule Wampex.Session do {^id, nil}, _ -> {:halt, {remove_request(id, requests), actions}} + {^id, {_, _} = from}, _ -> + {:halt, {remove_request(id, requests), get_response_action(from, response, actions)}} + {^id, from}, _ -> - {:halt, {remove_request(id, requests), [get_response_action(from, response) | actions]}} + {:halt, {requests, get_response_action(from, response, actions)}} {_, _}, acc -> {:cont, acc} end) end - defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do - send(from, {:progress, arg_l, arg_kw}) - [] + defp get_response_action(pid, {:ok, %{"progress" => true}, arg_l, arg_kw}, actions) + when is_pid(pid) do + Logger.info("Sending Progress: #{inspect(pid)}") + send(pid, {:progress, arg_l, arg_kw}) + Logger.info("Progress Sent") + actions + end + + defp get_response_action(from, response, actions) when is_pid(from) do + send(from, {:reply, response}) + actions end - defp get_response_action(from, response), do: {:reply, from, response} + defp get_response_action({_, _} = from, response, actions), + do: [{:reply, from, response} | actions] defp maybe_update_response(data, {:update, key, resp}) do {%SL{data | data: Map.put(data.data, key, resp)}, resp} diff --git a/test/support/test_callee.ex b/test/support/test_callee.ex index 9810705..9c46daa 100644 --- a/test/support/test_callee.ex +++ b/test/support/test_callee.ex @@ -11,10 +11,55 @@ defmodule TestCallee do def init({test, name, device}) do {:ok, reg} = Wampex.register(name, %Register{procedure: "com.actuator.#{device}.light"}) + + {:ok, _reg} = Wampex.register(name, %Register{procedure: "com.long.function"}) + send(test, {:registered, reg}) {:ok, {test, name, reg}} end + def handle_info( + {:invocation, id, _, %{"procedure" => "com.long.function"}, _, _arg_kw} = invocation, + {test, name, _reg} = state + ) do + Logger.info("Got invocation #{inspect(invocation)}") + + send(test, {:long, :function}) + + Wampex.cast_send_request( + name, + Callee.yield(%Yield{ + request_id: id, + arg_list: [1, 2, 3, 4, 5, 6, 7], + arg_kw: %{progress: 80}, + options: %{progress: true} + }) + ) + + :timer.sleep(1000) + + Wampex.cast_send_request( + name, + Callee.yield(%Yield{ + request_id: id, + arg_list: [1, 2, 3, 4, 5, 6, 7], + arg_kw: %{progress: 80}, + options: %{progress: true} + }) + ) + + Wampex.cast_send_request( + name, + Callee.yield(%Yield{ + request_id: id, + arg_list: [8, 9, 10, 11], + arg_kw: %{done: true} + }) + ) + + {:noreply, state} + end + def handle_info({:invocation, id, _, _, _, arg_kw} = invocation, {test, name, _reg} = state) do Logger.info("Got invocation #{inspect(invocation)}") diff --git a/test/wampex_test.exs b/test/wampex_test.exs index 8fa9dc2..247ba4a 100644 --- a/test/wampex_test.exs +++ b/test/wampex_test.exs @@ -165,6 +165,30 @@ defmodule WampexTest do assert_receive {:EXIT, ^pid, :shutdown} end + test "callee sends progress reports" do + callee_name = TestCalleeRespond + Wampex.start_link(name: callee_name, session: @session) + TestCallee.start_link(self(), callee_name, @device) + assert_receive {:registered, id} + caller_name = TestCaller + Wampex.start_link(name: caller_name, session: @session) + + Wampex.cast_send_request( + caller_name, + Caller.call(%Call{ + procedure: "com.long.function", + arg_list: [1], + arg_kw: %{color: "#FFFFFF"}, + options: %{receive_progress: true} + }), + self() + ) + + assert_receive {:long, :function} + assert_receive {:progress, _, _} + assert_receive {:reply, {:ok, %{"done" => true}, ["ok"], %{"color" => "#FFFFFF"}}}, 2000 + end + test "callee is invoked and responds and caller gets result" do callee_name = TestCalleeRespond Wampex.start_link(name: callee_name, session: @session) -- 2.45.3