From: Christopher Date: Fri, 17 Apr 2020 21:20:23 +0000 (-0500) Subject: group subscriber events by node and send list when publishing X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=216b1a963b80fa899cc23d52f9cb4b9713fd42cb;p=wampex_router.git group subscriber events by node and send list when publishing --- diff --git a/lib/router/realms/proxy.ex b/lib/router/realms/proxy.ex index 9c8f64f..e1bfb4f 100644 --- a/lib/router/realms/proxy.ex +++ b/lib/router/realms/proxy.ex @@ -10,6 +10,11 @@ defmodule Wampex.Router.Realms.Proxy do {:ok, []} end + def handle_info(messages, state) when is_list(messages) do + Enum.each(messages, fn {e, to} -> send(to, e) end) + {:noreply, state} + end + def handle_info({e, to}, state) do send(to, e) {:noreply, state} diff --git a/lib/router/session.ex b/lib/router/session.ex index 16bf7d6..64e158f 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -310,18 +310,15 @@ defmodule Wampex.Router.Session do subs = RealmSession.subscriptions(db, realm, topic) details = Map.put_new(opts, "topic", topic) - # {%Event{ - # subscription_id: id, - # publication_id: pub_id, - # arg_list: arg_l, - # arg_kw: arg_kw, - # details: details - # }, pid} - Enum.each(subs, fn {id, {pid, node}} -> - send( - {proxy, node}, - {{:event, id, pub_id, arg_l, arg_kw, details}, pid} - ) + + groups = + Enum.reduce(subs, %{}, fn {id, {pid, node}}, acc -> + event = {{:event, id, pub_id, arg_l, arg_kw, details}, pid} + Map.update(acc, node, [event], &[event | &1]) + end) + + Enum.each(groups, fn {node, messages} -> + send({proxy, node}, messages) end) case opts do