serializer: module(),
requests: []
}
+ @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message(), from :: pid()) ::
+ :ok
+ def cast_send_request(name, request, pid) do
+ __MODULE__.cast(name, {:send_request, request, pid})
+ end
@spec cast_send_request(name :: atom() | pid(), request :: Wampex.message()) :: :ok
def cast_send_request(name, request) do
{:ok, %SL{data | data: %Sess{sess | message_queue: [{request, from} | mq]}}, []}
end
+ @impl true
+ def handle_cast(
+ {:send_request, request, from},
+ "Established",
+ %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
+ ) do
+ request_id = do_send(r_id, tt, t, request)
+
+ {:ok,
+ %SL{
+ data
+ | data: %Sess{
+ sess
+ | request_id: request_id,
+ requests: [{request_id, from} | sess.requests]
+ }
+ }, []}
+ end
+
@impl true
def handle_cast(
{:send_request, request},
) do
Logger.debug("Handling Message #{inspect(msg)}")
{actions, id, response} = handle_message(msg, roles)
+ Logger.info("Handle Response: #{inspect(response)}")
{%SL{data: sess} = data, response} = maybe_update_response(data, response)
+ Logger.info("Updated...")
{requests, actions} = resp = handle_response(id, actions, requests, response)
+ Logger.info("Response Handled: #{inspect(actions)}")
Logger.debug("Response: #{inspect(resp)}")
{:ok, %SL{data | data: %Sess{sess | requests: requests}}, actions}
end
{^id, nil}, _ ->
{:halt, {remove_request(id, requests), actions}}
+ {^id, {_, _} = from}, _ ->
+ {:halt, {remove_request(id, requests), get_response_action(from, response, actions)}}
+
{^id, from}, _ ->
- {:halt, {remove_request(id, requests), [get_response_action(from, response) | actions]}}
+ {:halt, {requests, get_response_action(from, response, actions)}}
{_, _}, acc ->
{:cont, acc}
end)
end
- defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do
- send(from, {:progress, arg_l, arg_kw})
- []
+ defp get_response_action(pid, {:ok, %{"progress" => true}, arg_l, arg_kw}, actions)
+ when is_pid(pid) do
+ Logger.info("Sending Progress: #{inspect(pid)}")
+ send(pid, {:progress, arg_l, arg_kw})
+ Logger.info("Progress Sent")
+ actions
+ end
+
+ defp get_response_action(from, response, actions) when is_pid(from) do
+ send(from, {:reply, response})
+ actions
end
- defp get_response_action(from, response), do: {:reply, from, response}
+ defp get_response_action({_, _} = from, response, actions),
+ do: [{:reply, from, response} | actions]
defp maybe_update_response(data, {:update, key, resp}) do
{%SL{data | data: Map.put(data.data, key, resp)}, resp}
def init({test, name, device}) do
{:ok, reg} = Wampex.register(name, %Register{procedure: "com.actuator.#{device}.light"})
+
+ {:ok, _reg} = Wampex.register(name, %Register{procedure: "com.long.function"})
+
send(test, {:registered, reg})
{:ok, {test, name, reg}}
end
+ def handle_info(
+ {:invocation, id, _, %{"procedure" => "com.long.function"}, _, _arg_kw} = invocation,
+ {test, name, _reg} = state
+ ) do
+ Logger.info("Got invocation #{inspect(invocation)}")
+
+ send(test, {:long, :function})
+
+ Wampex.cast_send_request(
+ name,
+ Callee.yield(%Yield{
+ request_id: id,
+ arg_list: [1, 2, 3, 4, 5, 6, 7],
+ arg_kw: %{progress: 80},
+ options: %{progress: true}
+ })
+ )
+
+ :timer.sleep(1000)
+
+ Wampex.cast_send_request(
+ name,
+ Callee.yield(%Yield{
+ request_id: id,
+ arg_list: [1, 2, 3, 4, 5, 6, 7],
+ arg_kw: %{progress: 80},
+ options: %{progress: true}
+ })
+ )
+
+ Wampex.cast_send_request(
+ name,
+ Callee.yield(%Yield{
+ request_id: id,
+ arg_list: [8, 9, 10, 11],
+ arg_kw: %{done: true}
+ })
+ )
+
+ {:noreply, state}
+ end
+
def handle_info({:invocation, id, _, _, _, arg_kw} = invocation, {test, name, _reg} = state) do
Logger.info("Got invocation #{inspect(invocation)}")
assert_receive {:EXIT, ^pid, :shutdown}
end
+ test "callee sends progress reports" do
+ callee_name = TestCalleeRespond
+ Wampex.start_link(name: callee_name, session: @session)
+ TestCallee.start_link(self(), callee_name, @device)
+ assert_receive {:registered, id}
+ caller_name = TestCaller
+ Wampex.start_link(name: caller_name, session: @session)
+
+ Wampex.cast_send_request(
+ caller_name,
+ Caller.call(%Call{
+ procedure: "com.long.function",
+ arg_list: [1],
+ arg_kw: %{color: "#FFFFFF"},
+ options: %{receive_progress: true}
+ }),
+ self()
+ )
+
+ assert_receive {:long, :function}
+ assert_receive {:progress, _, _}
+ assert_receive {:reply, {:ok, %{"done" => true}, ["ok"], %{"color" => "#FFFFFF"}}}, 2000
+ end
+
test "callee is invoked and responds and caller gets result" do
callee_name = TestCalleeRespond
Wampex.start_link(name: callee_name, session: @session)