From 216b1a963b80fa899cc23d52f9cb4b9713fd42cb Mon Sep 17 00:00:00 2001 From: Christopher Date: Fri, 17 Apr 2020 16:20:23 -0500 Subject: [PATCH] group subscriber events by node and send list when publishing --- lib/router/realms/proxy.ex | 5 +++++ lib/router/session.ex | 21 +++++++++------------ 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/lib/router/realms/proxy.ex b/lib/router/realms/proxy.ex index 9c8f64f..e1bfb4f 100644 --- a/lib/router/realms/proxy.ex +++ b/lib/router/realms/proxy.ex @@ -10,6 +10,11 @@ defmodule Wampex.Router.Realms.Proxy do {:ok, []} end + def handle_info(messages, state) when is_list(messages) do + Enum.each(messages, fn {e, to} -> send(to, e) end) + {:noreply, state} + end + def handle_info({e, to}, state) do send(to, e) {:noreply, state} diff --git a/lib/router/session.ex b/lib/router/session.ex index 16bf7d6..64e158f 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -310,18 +310,15 @@ defmodule Wampex.Router.Session do subs = RealmSession.subscriptions(db, realm, topic) details = Map.put_new(opts, "topic", topic) - # {%Event{ - # subscription_id: id, - # publication_id: pub_id, - # arg_list: arg_l, - # arg_kw: arg_kw, - # details: details - # }, pid} - Enum.each(subs, fn {id, {pid, node}} -> - send( - {proxy, node}, - {{:event, id, pub_id, arg_l, arg_kw, details}, pid} - ) + + groups = + Enum.reduce(subs, %{}, fn {id, {pid, node}}, acc -> + event = {{:event, id, pub_id, arg_l, arg_kw, details}, pid} + Map.update(acc, node, [event], &[event | &1]) + end) + + Enum.each(groups, fn {node, messages} -> + send({proxy, node}, messages) end) case opts do -- 2.45.3