]> Entropealabs - wampex_router.git/commitdiff
purge old callees when invalid
authorChristopher Coté <chris@entropealabs.com>
Wed, 8 Dec 2021 23:32:29 +0000 (18:32 -0500)
committerChristopher Coté <chris@entropealabs.com>
Wed, 8 Dec 2021 23:32:29 +0000 (18:32 -0500)
lib/router/realms/session.ex
lib/router/session.ex

index 68988098bd51f1e5a8f04877bd2c59aa682f9e8e..6d3bddb2e1858fcd890432f77146f4ae6cb134a5 100644 (file)
@@ -16,6 +16,10 @@ defmodule Wampex.Router.Realms.Session do
     List.delete(regs, reg)
   end
 
+  def remove_value(db, realm, key, val) do
+    ClusterKV.update(db, realm, key, val, :remove)
+  end
+
   def callees(db, realm, procedure) do
     case ClusterKV.get(db, realm, procedure, 500) do
       :not_found ->
index 256ff3456c39c2b23b736e6b4efed55be301bebf..b62efaaec4a990b69fbf9220838dc76f1210efd1 100644 (file)
@@ -437,7 +437,7 @@ defmodule Wampex.Router.Session do
       case is_authorized?(am, peer, :call, proc, method) do
         true ->
           with {callees, _index} <- RealmSession.callees(db, realm, proc),
-               {id, {pid, node}} <- get_live_callee(proxy, callees, 10) do
+               {id, {pid, node}} <- get_live_callee(proxy, callees, 10, db, realm, proc) do
             opts = Map.put(opts, "procedure", proc)
             opts = Map.put(opts, "session_id", session_id)
 
@@ -787,17 +787,17 @@ defmodule Wampex.Router.Session do
 
   defp get_auth_module(auth, _methods), do: auth
 
-  defp get_live_callee(_proxy, [], 0) do
+  defp get_live_callee(_proxy, [], 0, _db, _realm, _proc) do
     Logger.error("No live callees, tried all replicas")
     {:error, :no_live_callees}
   end
 
-  defp get_live_callee(_proxy, [], _) do
+  defp get_live_callee(_proxy, [], _, _, _, _) do
     Logger.error("No live callees, empty result from lookup")
     {:error, :no_live_callees}
   end
 
-  defp get_live_callee(proxy, callees, tries) when is_list(callees) do
+  defp get_live_callee(proxy, callees, tries, db, realm, proc) when is_list(callees) do
     {_id, {pid, node}} = c = Enum.random(callees)
 
     nodes = [Node.self() | Node.list()]
@@ -807,12 +807,14 @@ defmodule Wampex.Router.Session do
       c
     else
       false ->
-        Logger.error("#{inspect(c)} is ian invalid callee")
-        get_live_callee(proxy, callees, tries - 1)
+        Logger.error("#{inspect(c)} is an invalid callee, purging...")
+        RealmSession.remove_value(db, realm, proc, {pid, node})
+        callees = List.delete(callees, c)
+        get_live_callee(proxy, callees, tries - 1, db, realm, proc)
     end
   end
 
-  defp get_live_callee(proxy, result, tries) do
+  defp get_live_callee(proxy, result, tries, _, _, _) do
     Logger.error("No live callees, Proxy: #{inspect(proxy)} Result: #{inspect(result)} Tries: #{tries}")
 
     {:error, :no_live_callees}