]> Entropealabs - wampex.git/commitdiff
use send_to_peer, default error value
authorChristopher <chris@entropealabs.com>
Mon, 16 Mar 2020 20:40:40 +0000 (15:40 -0500)
committerChristopher <chris@entropealabs.com>
Mon, 16 Mar 2020 20:40:40 +0000 (15:40 -0500)
lib/router/session.ex

index 2f515c4dba1677089f79c627a3b83dc0ea85db1a..b7c19f4487638a1bc87e925d4b5f74dc1f697167 100644 (file)
@@ -32,11 +32,11 @@ defmodule Wampex.Router.Session do
     :subscribe,
     :unsubscribe,
     :publish,
-    :error,
     :realm,
     :name,
     :goodbye,
     :peer_information,
+    error: "wamp.error.protocol_violation",
     roles: [Peer, Broker, Dealer],
     registrations: [],
     subscriptions: [],
@@ -174,7 +174,14 @@ defmodule Wampex.Router.Session do
         } = sl
       ) do
     # TODO check for authentication request
-    send_to_peer(Peer.welcome(%Welcome{session_id: id}), tt, t)
+    send_to_peer(
+      Peer.welcome(%Welcome{
+        session_id: id,
+        options: %{agent: "WAMPex Router", roles: %{broker: %{}, dealer: %{}}}
+      }),
+      tt,
+      t
+    )
 
     {:ok,
      %SL{sl | data: %Sess{data | hello_received: true, realm: realm, peer_information: dets}},
@@ -235,7 +242,7 @@ defmodule Wampex.Router.Session do
         ClusterKV.put(db, key, {id, {self(), Node.self()}})
     end
 
-    tt.send_request(t, Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}))
+    send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t)
 
     {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic} | subs]}},
      [{:next_event, :internal, :transition}]}
@@ -253,7 +260,7 @@ defmodule Wampex.Router.Session do
             transport: tt,
             transport_pid: t,
             db: db,
-            publish: {:publish, id, opts, topic, arg_l, arg_kw}
+            publish: {:publish, rid, opts, topic, arg_l, arg_kw}
           }
         } = sl
       ) do
@@ -282,6 +289,14 @@ defmodule Wampex.Router.Session do
       )
     end)
 
+    case opts do
+      %{acknowledge: true} ->
+        send_to_peer(Broker.published(%Published{request_id: rid, publication_id: pub_id}), tt, t)
+
+      %{} ->
+        :noop
+    end
+
     {:ok, sl, [{:next_event, :internal, :transition}]}
   end
 
@@ -294,24 +309,17 @@ defmodule Wampex.Router.Session do
           data:
             %Sess{
               db: db,
+              transport: tt,
+              transport_pid: t,
               registrations: regs,
               unregister: {:unregister, request_id, registration_id}
             } = data
         } = sl
       ) do
-    {id, proc} =
-      reg =
-      Enum.find(regs, fn
-        {^registration_id, proc} -> true
-        _ -> false
-      end)
+    regs = remove_registration(regs, registration_id, db)
 
-    filter = fn {id, values}, value ->
-      {id, List.delete(values, value)}
-    end
-
-    ClusterKV.update(db, proc, {id, {self(), Node.self()}}, filter)
-    {:ok, %SL{sl | data: %Sess{data | registrations: List.delete(regs, reg)}}}
+    send_to_peer(Dealer.unregistered(%Unregistered{request_id: request_id}), tt, t)
+    {:ok, %SL{sl | data: %Sess{data | registrations: regs}}}
   end
 
   @impl true
@@ -335,7 +343,7 @@ defmodule Wampex.Router.Session do
     debug("Handling Registration #{inspect(key)}")
     id = get_global_id()
     ClusterKV.put(db, key, {id, {self(), Node.self()}})
-    tt.send_request(t, Dealer.registered(%Registered{request_id: rid, registration_id: id}))
+    send_to_peer(Dealer.registered(%Registered{request_id: rid, registration_id: id}), tt, t)
 
     {:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}},
      [{:next_event, :internal, :transition}]}
@@ -362,11 +370,11 @@ defmodule Wampex.Router.Session do
     key = "#{realm}:#{proc}"
     {_key, callees} = ClusterKV.get(db, key, 500)
 
-    req_id = get_request_id(ri)
-
-    actions =
+    {data, actions} =
       case get_live_callee(proxy, callees) do
         {id, {pid, node}} ->
+          req_id = get_request_id(ri)
+
           send(
             {proxy, node},
             {
@@ -385,13 +393,15 @@ defmodule Wampex.Router.Session do
             }
           )
 
-          [{:next_event, :internal, :transition}]
+          {%SL{sl | data: %Sess{data | request_id: req_id}},
+           [{:next_event, :internal, :transition}]}
 
-        {:error, :no_live_callees} = er ->
-          [{:next_event, :internal, er}]
+        {:error, :no_live_callees} ->
+          {%SL{sl | data: %Sess{data | error: "wamp.error.no_callees_registered"}},
+           [{:next_event, :internal, :transition}, {:next_event, :internal, :error}]}
       end
 
-    {:ok, %SL{sl | data: %Sess{data | request_id: req_id}}, actions}
+    {:ok, data, actions}
   end
 
   @impl true
@@ -497,14 +507,8 @@ defmodule Wampex.Router.Session do
           subscriptions: subs
         }
       }) do
-    filter = fn {id, values}, value ->
-      {id, List.delete(values, value)}
-    end
-
-    me = {self(), Node.self()}
-
     Enum.each(regs, fn {id, proc} ->
-      ClusterKV.update(db, proc, {id, me}, filter)
+      remove_registration(regs, id, db)
     end)
 
     send_to_peer(
@@ -514,6 +518,22 @@ defmodule Wampex.Router.Session do
     )
   end
 
+  defp remove_registration(regs, registration_id, db) do
+    {id, proc} =
+      reg =
+      Enum.find(regs, fn
+        {^registration_id, proc} -> true
+        _ -> false
+      end)
+
+    filter = fn {id, values}, value ->
+      {id, List.delete(values, value)}
+    end
+
+    ClusterKV.update(db, proc, {id, {self(), Node.self()}}, filter)
+    List.delete(regs, reg)
+  end
+
   defp get_live_callee(proxy, []), do: {:error, :no_live_callees}
 
   defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do