From e43687ef4f30fff11aca7dbacc6018685d5957b6 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Christopher=20Cot=C3=A9?= Date: Wed, 8 Dec 2021 18:32:29 -0500 Subject: [PATCH] purge old callees when invalid --- lib/router/realms/session.ex | 4 ++++ lib/router/session.ex | 16 +++++++++------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/lib/router/realms/session.ex b/lib/router/realms/session.ex index 6898809..6d3bddb 100644 --- a/lib/router/realms/session.ex +++ b/lib/router/realms/session.ex @@ -16,6 +16,10 @@ defmodule Wampex.Router.Realms.Session do List.delete(regs, reg) end + def remove_value(db, realm, key, val) do + ClusterKV.update(db, realm, key, val, :remove) + end + def callees(db, realm, procedure) do case ClusterKV.get(db, realm, procedure, 500) do :not_found -> diff --git a/lib/router/session.ex b/lib/router/session.ex index 256ff34..b62efaa 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -437,7 +437,7 @@ defmodule Wampex.Router.Session do case is_authorized?(am, peer, :call, proc, method) do true -> with {callees, _index} <- RealmSession.callees(db, realm, proc), - {id, {pid, node}} <- get_live_callee(proxy, callees, 10) do + {id, {pid, node}} <- get_live_callee(proxy, callees, 10, db, realm, proc) do opts = Map.put(opts, "procedure", proc) opts = Map.put(opts, "session_id", session_id) @@ -787,17 +787,17 @@ defmodule Wampex.Router.Session do defp get_auth_module(auth, _methods), do: auth - defp get_live_callee(_proxy, [], 0) do + defp get_live_callee(_proxy, [], 0, _db, _realm, _proc) do Logger.error("No live callees, tried all replicas") {:error, :no_live_callees} end - defp get_live_callee(_proxy, [], _) do + defp get_live_callee(_proxy, [], _, _, _, _) do Logger.error("No live callees, empty result from lookup") {:error, :no_live_callees} end - defp get_live_callee(proxy, callees, tries) when is_list(callees) do + defp get_live_callee(proxy, callees, tries, db, realm, proc) when is_list(callees) do {_id, {pid, node}} = c = Enum.random(callees) nodes = [Node.self() | Node.list()] @@ -807,12 +807,14 @@ defmodule Wampex.Router.Session do c else false -> - Logger.error("#{inspect(c)} is ian invalid callee") - get_live_callee(proxy, callees, tries - 1) + Logger.error("#{inspect(c)} is an invalid callee, purging...") + RealmSession.remove_value(db, realm, proc, {pid, node}) + callees = List.delete(callees, c) + get_live_callee(proxy, callees, tries - 1, db, realm, proc) end end - defp get_live_callee(proxy, result, tries) do + defp get_live_callee(proxy, result, tries, _, _, _) do Logger.error("No live callees, Proxy: #{inspect(proxy)} Result: #{inspect(result)} Tries: #{tries}") {:error, :no_live_callees} -- 2.45.3