]> Entropealabs - wampex_router.git/commitdiff
group subscriber events by node and send list when publishing
authorChristopher <chris@entropealabs.com>
Fri, 17 Apr 2020 21:20:23 +0000 (16:20 -0500)
committerChristopher <chris@entropealabs.com>
Fri, 17 Apr 2020 21:20:23 +0000 (16:20 -0500)
lib/router/realms/proxy.ex
lib/router/session.ex

index 9c8f64f0ee650f502cd5eb08d4dd3ce5cc1811d5..e1bfb4f8d07d9bab4c8cab2de56fb2e8c258b78a 100644 (file)
@@ -10,6 +10,11 @@ defmodule Wampex.Router.Realms.Proxy do
     {:ok, []}
   end
 
+  def handle_info(messages, state) when is_list(messages) do
+    Enum.each(messages, fn {e, to} -> send(to, e) end)
+    {:noreply, state}
+  end
+
   def handle_info({e, to}, state) do
     send(to, e)
     {:noreply, state}
index 16bf7d61497852daae39d6f2c47af631f6c28ab2..64e158f6e2e9674c65afa0cc129da642c4ea7b48 100644 (file)
@@ -310,18 +310,15 @@ defmodule Wampex.Router.Session do
 
           subs = RealmSession.subscriptions(db, realm, topic)
           details = Map.put_new(opts, "topic", topic)
-          # {%Event{
-          #       subscription_id: id,
-          #       publication_id: pub_id,
-          #       arg_list: arg_l,
-          #       arg_kw: arg_kw,
-          #       details: details
-          #     }, pid}
-          Enum.each(subs, fn {id, {pid, node}} ->
-            send(
-              {proxy, node},
-              {{:event, id, pub_id, arg_l, arg_kw, details}, pid}
-            )
+
+          groups =
+            Enum.reduce(subs, %{}, fn {id, {pid, node}}, acc ->
+              event = {{:event, id, pub_id, arg_l, arg_kw, details}, pid}
+              Map.update(acc, node, [event], &[event | &1])
+            end)
+
+          Enum.each(groups, fn {node, messages} ->
+            send({proxy, node}, messages)
           end)
 
           case opts do