]> Entropealabs - wampex.git/commitdiff
initial support for progressive calls
authorChristopher <chris@entropealabs.com>
Fri, 21 Feb 2020 18:15:24 +0000 (12:15 -0600)
committerChristopher <chris@entropealabs.com>
Fri, 21 Feb 2020 18:15:24 +0000 (12:15 -0600)
lib/wampex.ex
lib/wampex/session.ex
test/support/test_callee.ex
test/wampex_test.exs

index 6fdeea0bb267fa0f4734703d04eecfc79b2936eb..bc7368e91afe85574dc2373df16988baf9f59192 100644 (file)
@@ -87,6 +87,11 @@ defmodule Wampex do
     Sess.send_request(session_name(name), request, timeout)
   end
 
+  @spec cast_send_request(name :: module(), message(), from :: pid()) :: :ok
+  def cast_send_request(name, request, pid) do
+    Sess.cast_send_request(session_name(name), request, pid)
+  end
+
   @spec cast_send_request(name :: module(), message()) :: :ok
   def cast_send_request(name, request) do
     Sess.cast_send_request(session_name(name), request)
index 0d98e5d20011431ab6eb21c85bce84610222f435..a71f4a8bad9e2c2bfeddc8e084708c5861c29fe5 100644 (file)
@@ -60,6 +60,11 @@ defmodule Wampex.Session do
           serializer: module(),
           requests: []
         }
+  @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message(), from :: pid()) ::
+          :ok
+  def cast_send_request(name, request, pid) do
+    __MODULE__.cast(name, {:send_request, request, pid})
+  end
 
   @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message()) :: :ok
   def cast_send_request(name, request) do
@@ -103,6 +108,25 @@ defmodule Wampex.Session do
     {:ok, %SL{data | data: %Sess{sess | message_queue: [{request, from} | mq]}}, []}
   end
 
+  @impl true
+  def handle_cast(
+        {:send_request, request, from},
+        "Established",
+        %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
+      ) do
+    request_id = do_send(r_id, tt, t, request)
+
+    {:ok,
+     %SL{
+       data
+       | data: %Sess{
+           sess
+           | request_id: request_id,
+             requests: [{request_id, from} | sess.requests]
+         }
+     }, []}
+  end
+
   @impl true
   def handle_cast(
         {:send_request, request},
@@ -197,8 +221,11 @@ defmodule Wampex.Session do
       ) do
     Logger.debug("Handling Message #{inspect(msg)}")
     {actions, id, response} = handle_message(msg, roles)
+    Logger.info("Handle Response: #{inspect(response)}")
     {%SL{data: sess} = data, response} = maybe_update_response(data, response)
+    Logger.info("Updated...")
     {requests, actions} = resp = handle_response(id, actions, requests, response)
+    Logger.info("Response Handled: #{inspect(actions)}")
     Logger.debug("Response: #{inspect(resp)}")
     {:ok, %SL{data | data: %Sess{sess | requests: requests}}, actions}
   end
@@ -306,20 +333,32 @@ defmodule Wampex.Session do
       {^id, nil}, _ ->
         {:halt, {remove_request(id, requests), actions}}
 
+      {^id, {_, _} = from}, _ ->
+        {:halt, {remove_request(id, requests), get_response_action(from, response, actions)}}
+
       {^id, from}, _ ->
-        {:halt, {remove_request(id, requests), [get_response_action(from, response) | actions]}}
+        {:halt, {requests, get_response_action(from, response, actions)}}
 
       {_, _}, acc ->
         {:cont, acc}
     end)
   end
 
-  defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do
-    send(from, {:progress, arg_l, arg_kw})
-    []
+  defp get_response_action(pid, {:ok, %{"progress" => true}, arg_l, arg_kw}, actions)
+       when is_pid(pid) do
+    Logger.info("Sending Progress: #{inspect(pid)}")
+    send(pid, {:progress, arg_l, arg_kw})
+    Logger.info("Progress Sent")
+    actions
+  end
+
+  defp get_response_action(from, response, actions) when is_pid(from) do
+    send(from, {:reply, response})
+    actions
   end
 
-  defp get_response_action(from, response), do: {:reply, from, response}
+  defp get_response_action({_, _} = from, response, actions),
+    do: [{:reply, from, response} | actions]
 
   defp maybe_update_response(data, {:update, key, resp}) do
     {%SL{data | data: Map.put(data.data, key, resp)}, resp}
index 9810705c7e4ca574855cc1a6b81dea7f66091c92..9c46daa7b0a7fbc1fad3d169b1ebb2ed6cf1f80a 100644 (file)
@@ -11,10 +11,55 @@ defmodule TestCallee do
 
   def init({test, name, device}) do
     {:ok, reg} = Wampex.register(name, %Register{procedure: "com.actuator.#{device}.light"})
+
+    {:ok, _reg} = Wampex.register(name, %Register{procedure: "com.long.function"})
+
     send(test, {:registered, reg})
     {:ok, {test, name, reg}}
   end
 
+  def handle_info(
+        {:invocation, id, _, %{"procedure" => "com.long.function"}, _, _arg_kw} = invocation,
+        {test, name, _reg} = state
+      ) do
+    Logger.info("Got invocation #{inspect(invocation)}")
+
+    send(test, {:long, :function})
+
+    Wampex.cast_send_request(
+      name,
+      Callee.yield(%Yield{
+        request_id: id,
+        arg_list: [1, 2, 3, 4, 5, 6, 7],
+        arg_kw: %{progress: 80},
+        options: %{progress: true}
+      })
+    )
+
+    :timer.sleep(1000)
+
+    Wampex.cast_send_request(
+      name,
+      Callee.yield(%Yield{
+        request_id: id,
+        arg_list: [1, 2, 3, 4, 5, 6, 7],
+        arg_kw: %{progress: 80},
+        options: %{progress: true}
+      })
+    )
+
+    Wampex.cast_send_request(
+      name,
+      Callee.yield(%Yield{
+        request_id: id,
+        arg_list: [8, 9, 10, 11],
+        arg_kw: %{done: true}
+      })
+    )
+
+    {:noreply, state}
+  end
+
   def handle_info({:invocation, id, _, _, _, arg_kw} = invocation, {test, name, _reg} = state) do
     Logger.info("Got invocation #{inspect(invocation)}")
 
index 8fa9dc2a7c50615370faeecafca2cb2415ffaa43..247ba4acb4934e148c0b5e5daf62f9b291165ae0 100644 (file)
@@ -165,6 +165,30 @@ defmodule WampexTest do
     assert_receive {:EXIT, ^pid, :shutdown}
   end
 
+  test "callee sends progress reports" do
+    callee_name = TestCalleeRespond
+    Wampex.start_link(name: callee_name, session: @session)
+    TestCallee.start_link(self(), callee_name, @device)
+    assert_receive {:registered, id}
+    caller_name = TestCaller
+    Wampex.start_link(name: caller_name, session: @session)
+
+    Wampex.cast_send_request(
+      caller_name,
+      Caller.call(%Call{
+        procedure: "com.long.function",
+        arg_list: [1],
+        arg_kw: %{color: "#FFFFFF"},
+        options: %{receive_progress: true}
+      }),
+      self()
+    )
+
+    assert_receive {:long, :function}
+    assert_receive {:progress, _, _}
+    assert_receive {:reply, {:ok, %{"done" => true}, ["ok"], %{"color" => "#FFFFFF"}}}, 2000
+  end
+
   test "callee is invoked and responds and caller gets result" do
     callee_name = TestCalleeRespond
     Wampex.start_link(name: callee_name, session: @session)