From 6e6069c529d8d0f9b143f05a8a0ad4ff4421175a Mon Sep 17 00:00:00 2001 From: Christopher Date: Mon, 16 Mar 2020 15:40:40 -0500 Subject: [PATCH] use send_to_peer, default error value --- lib/router/session.ex | 82 +++++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 31 deletions(-) diff --git a/lib/router/session.ex b/lib/router/session.ex index 2f515c4..b7c19f4 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -32,11 +32,11 @@ defmodule Wampex.Router.Session do :subscribe, :unsubscribe, :publish, - :error, :realm, :name, :goodbye, :peer_information, + error: "wamp.error.protocol_violation", roles: [Peer, Broker, Dealer], registrations: [], subscriptions: [], @@ -174,7 +174,14 @@ defmodule Wampex.Router.Session do } = sl ) do # TODO check for authentication request - send_to_peer(Peer.welcome(%Welcome{session_id: id}), tt, t) + send_to_peer( + Peer.welcome(%Welcome{ + session_id: id, + options: %{agent: "WAMPex Router", roles: %{broker: %{}, dealer: %{}}} + }), + tt, + t + ) {:ok, %SL{sl | data: %Sess{data | hello_received: true, realm: realm, peer_information: dets}}, @@ -235,7 +242,7 @@ defmodule Wampex.Router.Session do ClusterKV.put(db, key, {id, {self(), Node.self()}}) end - tt.send_request(t, Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id})) + send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t) {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic} | subs]}}, [{:next_event, :internal, :transition}]} @@ -253,7 +260,7 @@ defmodule Wampex.Router.Session do transport: tt, transport_pid: t, db: db, - publish: {:publish, id, opts, topic, arg_l, arg_kw} + publish: {:publish, rid, opts, topic, arg_l, arg_kw} } } = sl ) do @@ -282,6 +289,14 @@ defmodule Wampex.Router.Session do ) end) + case opts do + %{acknowledge: true} -> + send_to_peer(Broker.published(%Published{request_id: rid, publication_id: pub_id}), tt, t) + + %{} -> + :noop + end + {:ok, sl, [{:next_event, :internal, :transition}]} end @@ -294,24 +309,17 @@ defmodule Wampex.Router.Session do data: %Sess{ db: db, + transport: tt, + transport_pid: t, registrations: regs, unregister: {:unregister, request_id, registration_id} } = data } = sl ) do - {id, proc} = - reg = - Enum.find(regs, fn - {^registration_id, proc} -> true - _ -> false - end) + regs = remove_registration(regs, registration_id, db) - filter = fn {id, values}, value -> - {id, List.delete(values, value)} - end - - ClusterKV.update(db, proc, {id, {self(), Node.self()}}, filter) - {:ok, %SL{sl | data: %Sess{data | registrations: List.delete(regs, reg)}}} + send_to_peer(Dealer.unregistered(%Unregistered{request_id: request_id}), tt, t) + {:ok, %SL{sl | data: %Sess{data | registrations: regs}}} end @impl true @@ -335,7 +343,7 @@ defmodule Wampex.Router.Session do debug("Handling Registration #{inspect(key)}") id = get_global_id() ClusterKV.put(db, key, {id, {self(), Node.self()}}) - tt.send_request(t, Dealer.registered(%Registered{request_id: rid, registration_id: id})) + send_to_peer(Dealer.registered(%Registered{request_id: rid, registration_id: id}), tt, t) {:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}}, [{:next_event, :internal, :transition}]} @@ -362,11 +370,11 @@ defmodule Wampex.Router.Session do key = "#{realm}:#{proc}" {_key, callees} = ClusterKV.get(db, key, 500) - req_id = get_request_id(ri) - - actions = + {data, actions} = case get_live_callee(proxy, callees) do {id, {pid, node}} -> + req_id = get_request_id(ri) + send( {proxy, node}, { @@ -385,13 +393,15 @@ defmodule Wampex.Router.Session do } ) - [{:next_event, :internal, :transition}] + {%SL{sl | data: %Sess{data | request_id: req_id}}, + [{:next_event, :internal, :transition}]} - {:error, :no_live_callees} = er -> - [{:next_event, :internal, er}] + {:error, :no_live_callees} -> + {%SL{sl | data: %Sess{data | error: "wamp.error.no_callees_registered"}}, + [{:next_event, :internal, :transition}, {:next_event, :internal, :error}]} end - {:ok, %SL{sl | data: %Sess{data | request_id: req_id}}, actions} + {:ok, data, actions} end @impl true @@ -497,14 +507,8 @@ defmodule Wampex.Router.Session do subscriptions: subs } }) do - filter = fn {id, values}, value -> - {id, List.delete(values, value)} - end - - me = {self(), Node.self()} - Enum.each(regs, fn {id, proc} -> - ClusterKV.update(db, proc, {id, me}, filter) + remove_registration(regs, id, db) end) send_to_peer( @@ -514,6 +518,22 @@ defmodule Wampex.Router.Session do ) end + defp remove_registration(regs, registration_id, db) do + {id, proc} = + reg = + Enum.find(regs, fn + {^registration_id, proc} -> true + _ -> false + end) + + filter = fn {id, values}, value -> + {id, List.delete(values, value)} + end + + ClusterKV.update(db, proc, {id, {self(), Node.self()}}, filter) + List.delete(regs, reg) + end + defp get_live_callee(proxy, []), do: {:error, :no_live_callees} defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do -- 2.45.3