From 649c127b60cbf34c3c9337a2945b9d85fee73a3d Mon Sep 17 00:00:00 2001 From: =?utf8?q?Christopher=20Cot=C3=A9?= Date: Sat, 23 Jan 2021 19:42:35 -0600 Subject: [PATCH] simplify calls, and ensure pid is up --- lib/router/realms/session.ex | 3 +-- lib/router/session.ex | 39 ++++++++++++------------------------ 2 files changed, 14 insertions(+), 28 deletions(-) diff --git a/lib/router/realms/session.ex b/lib/router/realms/session.ex index 81784a8..7f838d6 100644 --- a/lib/router/realms/session.ex +++ b/lib/router/realms/session.ex @@ -22,8 +22,7 @@ defmodule Wampex.Router.Realms.Session do :not_found {_key, callees} -> - callee_index = get_index(db, realm, procedure) - {callees, callee_index} + {callees, 0} end end diff --git a/lib/router/session.ex b/lib/router/session.ex index 7773330..4c7cceb 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -438,8 +438,8 @@ defmodule Wampex.Router.Session do {data, actions} = case is_authorized?(am, peer, :call, proc, method) do true -> - with {callees, index} <- RealmSession.callees(db, realm, proc), - {id, {pid, node}} <- get_live_callee(proxy, callees, index, 3) do + with {callees, _index} <- RealmSession.callees(db, realm, proc), + {id, {pid, node}} <- get_live_callee(proxy, callees, 1000) do req_id = RealmSession.get_request_id(ri) opts = Map.put(opts, "procedure", proc) opts = Map.put(opts, "session_id", session_id) @@ -462,8 +462,6 @@ defmodule Wampex.Router.Session do } ) - RealmSession.round_robin(db, realm, proc, length(callees)) - Logger.info("Call #{req_id}: #{realm} - #{proc}") {%SL{sl | data: %Sess{data | request_id: req_id}}, [{:next_event, :internal, :transition}]} else @@ -783,47 +781,36 @@ defmodule Wampex.Router.Session do defp get_auth_module(auth, _methods), do: auth - defp get_live_callee(_proxy, [], _index, 0) do + defp get_live_callee(_proxy, [], 0) do Logger.error("No live callees, tried all replicas") {:error, :no_live_callees} end - defp get_live_callee(_proxy, [], _index, _) do + defp get_live_callee(_proxy, [], _) do Logger.error("No live callees, empty result from lookup") {:error, :no_live_callees} end - defp get_live_callee(proxy, callees, index, tries) when is_list(callees) do - {_id, {_pid, node}} = c = Enum.at(callees, index, Enum.at(callees, 0)) + defp get_live_callee(proxy, callees, tries) when is_list(callees) do + {_id, {pid, node}} = c = Enum.random(callees) nodes = [Node.self() | Node.list()] - case node in nodes do - true -> - c - + with true <- node in nodes, + true <- GenServer.call({proxy, node}, {:is_up, pid}) do + c + else false -> - index = get_round_robin_index(index, callees) - - get_live_callee(proxy, callees, index, tries - 1) + get_live_callee(proxy, callees, tries - 1) end end - defp get_live_callee(proxy, result, index, tries) do - Logger.error( - "No live callees, Proxy: #{inspect(proxy)} Result: #{inspect(result)} Index: #{inspect(index)} Tries: #{tries}" - ) + defp get_live_callee(proxy, result, tries) do + Logger.error("No live callees, Proxy: #{inspect(proxy)} Result: #{inspect(result)} Tries: #{tries}") {:error, :no_live_callees} end - defp get_round_robin_index(index, callees) do - case index + 1 do - ni when ni < length(callees) -> ni - _ -> 0 - end - end - defp send_to_peer(msg, transport, pid) do transport.send_request(pid, remove_nil_values(msg)) end -- 2.45.3