]> Entropealabs - wampex_router.git/commitdiff
simplify calls, and ensure pid is up
authorChristopher Coté <ccote@cohesionib.com>
Sun, 24 Jan 2021 01:42:35 +0000 (19:42 -0600)
committerChristopher Coté <ccote@cohesionib.com>
Sun, 24 Jan 2021 01:42:35 +0000 (19:42 -0600)
lib/router/realms/session.ex
lib/router/session.ex

index 81784a8fe2b51ca6f31cdddcc33996205f220b87..7f838d6cfe21d32e9b69b7c0c204e11a7301f763 100644 (file)
@@ -22,8 +22,7 @@ defmodule Wampex.Router.Realms.Session do
         :not_found
 
       {_key, callees} ->
-        callee_index = get_index(db, realm, procedure)
-        {callees, callee_index}
+        {callees, 0}
     end
   end
 
index 7773330174c04f2e52bc36e5388e388e9b990e4c..4c7ccebeabf249a41480e7f5166cabebb093c9c8 100644 (file)
@@ -438,8 +438,8 @@ defmodule Wampex.Router.Session do
     {data, actions} =
       case is_authorized?(am, peer, :call, proc, method) do
         true ->
-          with {callees, index} <- RealmSession.callees(db, realm, proc),
-               {id, {pid, node}} <- get_live_callee(proxy, callees, index, 3) do
+          with {callees, _index} <- RealmSession.callees(db, realm, proc),
+               {id, {pid, node}} <- get_live_callee(proxy, callees, 1000) do
             req_id = RealmSession.get_request_id(ri)
             opts = Map.put(opts, "procedure", proc)
             opts = Map.put(opts, "session_id", session_id)
@@ -462,8 +462,6 @@ defmodule Wampex.Router.Session do
               }
             )
 
-            RealmSession.round_robin(db, realm, proc, length(callees))
-
             Logger.info("Call #{req_id}: #{realm} - #{proc}")
             {%SL{sl | data: %Sess{data | request_id: req_id}}, [{:next_event, :internal, :transition}]}
           else
@@ -783,47 +781,36 @@ defmodule Wampex.Router.Session do
 
   defp get_auth_module(auth, _methods), do: auth
 
-  defp get_live_callee(_proxy, [], _index, 0) do
+  defp get_live_callee(_proxy, [], 0) do
     Logger.error("No live callees, tried all replicas")
     {:error, :no_live_callees}
   end
 
-  defp get_live_callee(_proxy, [], _index, _) do
+  defp get_live_callee(_proxy, [], _) do
     Logger.error("No live callees, empty result from lookup")
     {:error, :no_live_callees}
   end
 
-  defp get_live_callee(proxy, callees, index, tries) when is_list(callees) do
-    {_id, {_pid, node}} = c = Enum.at(callees, index, Enum.at(callees, 0))
+  defp get_live_callee(proxy, callees, tries) when is_list(callees) do
+    {_id, {pid, node}} = c = Enum.random(callees)
 
     nodes = [Node.self() | Node.list()]
 
-    case node in nodes do
-      true ->
-        c
-
+    with true <- node in nodes,
+         true <- GenServer.call({proxy, node}, {:is_up, pid}) do
+      c
+    else
       false ->
-        index = get_round_robin_index(index, callees)
-
-        get_live_callee(proxy, callees, index, tries - 1)
+        get_live_callee(proxy, callees, tries - 1)
     end
   end
 
-  defp get_live_callee(proxy, result, index, tries) do
-    Logger.error(
-      "No live callees, Proxy: #{inspect(proxy)} Result: #{inspect(result)} Index: #{inspect(index)} Tries: #{tries}"
-    )
+  defp get_live_callee(proxy, result, tries) do
+    Logger.error("No live callees, Proxy: #{inspect(proxy)} Result: #{inspect(result)} Tries: #{tries}")
 
     {:error, :no_live_callees}
   end
 
-  defp get_round_robin_index(index, callees) do
-    case index + 1 do
-      ni when ni < length(callees) -> ni
-      _ -> 0
-    end
-  end
-
   defp send_to_peer(msg, transport, pid) do
     transport.send_request(pid, remove_nil_values(msg))
   end