From: Christopher Coté Date: Wed, 8 Dec 2021 01:36:45 +0000 (-0500) Subject: get Invocation request_id from callee session X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=5566d7d42d440057269e8b7fa1b92b0d466d57b6;p=wampex_router.git get Invocation request_id from callee session --- diff --git a/lib/router/session.ex b/lib/router/session.ex index 66de4b5..33e018c 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -419,28 +419,25 @@ defmodule Wampex.Router.Session do _, @call, %SL{ - data: - %Sess{ - db: db, - proxy: proxy, - transport: tt, - transport_pid: t, - realm: realm, - request_id: ri, - challenge: %Challenge{auth_method: method}, - authorization_module: am, - id: session_id, - peer: peer, - call: %Call{request_id: call_id, options: opts, procedure: proc, arg_list: al, arg_kw: akw} - } = data + data: %Sess{ + db: db, + proxy: proxy, + transport: tt, + transport_pid: t, + realm: realm, + challenge: %Challenge{auth_method: method}, + authorization_module: am, + id: session_id, + peer: peer, + call: %Call{request_id: call_id, options: opts, procedure: proc, arg_list: al, arg_kw: akw} + } } = sl ) do {data, actions} = case is_authorized?(am, peer, :call, proc, method) do true -> with {callees, _index} <- RealmSession.callees(db, realm, proc), - {id, {pid, node}} <- get_live_callee(proxy, callees, 1000) do - req_id = RealmSession.get_request_id(ri) + {id, {pid, node}} <- get_live_callee(callees) do opts = Map.put(opts, "procedure", proc) opts = Map.put(opts, "session_id", session_id) @@ -450,7 +447,7 @@ defmodule Wampex.Router.Session do { call_id, %Invocation{ - request_id: req_id, + request_id: 0, registration_id: id, details: opts, arg_list: al, @@ -462,8 +459,7 @@ defmodule Wampex.Router.Session do } ) - Logger.info("Call #{req_id}: #{realm} - #{proc}") - {%SL{sl | data: %Sess{data | request_id: req_id}}, [{:next_event, :internal, :transition}]} + {sl, [{:next_event, :internal, :transition}]} else {:error, :no_live_callees} -> send_to_peer( @@ -612,10 +608,13 @@ defmodule Wampex.Router.Session do def handle_info( {call_id, %Invocation{} = e, from}, _, - %SL{data: %Sess{transport: tt, transport_pid: t, invocations: inv}} = data + %SL{data: %Sess{realm: realm, request_id: rid, transport: tt, transport_pid: t, invocations: inv}} = data ) do + req_id = RealmSession.get_request_id(rid) + Logger.info("Call #{req_id}: #{realm} - #{e.details["procedure"]}") + e = %Invocation{e | request_id: req_id} send_to_peer(Dealer.invocation(e), tt, t) - {:ok, %SL{data | data: %Sess{data.data | invocations: [{call_id, e, from} | inv]}}, []} + {:ok, %SL{data | data: %Sess{data.data | request_id: req_id, invocations: [{call_id, e, from} | inv]}}, []} end @impl true @@ -786,34 +785,13 @@ defmodule Wampex.Router.Session do defp get_auth_module(auth, _methods), do: auth - defp get_live_callee(_proxy, [], 0) do - Logger.error("No live callees, tried all replicas") - {:error, :no_live_callees} - end - - defp get_live_callee(_proxy, [], _) do + defp get_live_callee([]) do Logger.error("No live callees, empty result from lookup") {:error, :no_live_callees} end - defp get_live_callee(proxy, callees, tries) when is_list(callees) do - {_id, {pid, node}} = c = Enum.random(callees) - - nodes = [Node.self() | Node.list()] - - with true <- node in nodes, - true <- GenServer.call({proxy, node}, {:is_up, pid}) do - c - else - false -> - get_live_callee(proxy, callees, tries - 1) - end - end - - defp get_live_callee(proxy, result, tries) do - Logger.error("No live callees, Proxy: #{inspect(proxy)} Result: #{inspect(result)} Tries: #{tries}") - - {:error, :no_live_callees} + defp get_live_callee(callees) do + Enum.random(callees) end defp send_to_peer(msg, transport, pid) do