From: Christopher Date: Wed, 18 Mar 2020 20:34:12 +0000 (-0500) Subject: adds unsubscribe and removes subscriptions on session close X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=7ef685da3ff2bf49701df5b65f26274caf5664e7;p=wampex_router.git adds unsubscribe and removes subscriptions on session close --- diff --git a/lib/roles/subscriber.ex b/lib/roles/subscriber.ex index dfced57..de25f2e 100644 --- a/lib/roles/subscriber.ex +++ b/lib/roles/subscriber.ex @@ -54,7 +54,7 @@ defmodule Wampex.Roles.Subscriber do end @impl true - def handle([@unsubscribed, request_id]) do + def handle(<<@unsubscribed, request_id>>) do {[{:next_event, :internal, :established}], request_id, :ok} end diff --git a/lib/router/session.ex b/lib/router/session.ex index e68ed33..0727349 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -13,7 +13,7 @@ defmodule Wampex.Router.Session do alias Wampex.Roles.Peer.{Abort, Challenge, Error, Welcome} alias Wampex.Router alias Wampex.Serializers.JSON - alias Broker.{Subscribed, Published, Event} + alias Broker.{Subscribed, Published, Event, Unsubscribed} alias Dealer.{Invocation, Registered, Result, Unregistered} alias Router.Authentication @@ -90,7 +90,7 @@ defmodule Wampex.Router.Session do @register "Register" @unregister "Unregister" @subscribe "Subscribe" - # @unsubscribe "Unsubscribe" + @unsubscribe "Unsubscribe" @publish "Publish" # @error "Error" @abort "Abort" @@ -107,7 +107,7 @@ defmodule Wampex.Router.Session do @handle_register "HandleRegister" @handle_unregister "HandleUnregister" @handle_subscribe "HandleSubscribe" - # @handle_unsubscribe "HandleUnsubscribe" + @handle_unsubscribe "HandleUnsubscribe" @handle_publish "HandlePublish" # @handle_interrupt "HandleInterrupt" @handle_abort "HandleAbort" @@ -312,17 +312,44 @@ defmodule Wampex.Router.Session do ) do id = get_global_id() - case opts do - %{"match" => "wildcard"} -> - ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "") + wc = + case opts do + %{"match" => "wildcard"} -> + ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "") + true - _ -> - ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}}) - end + _ -> + ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}}) + false + end send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t) - {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic} | subs]}}, + {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic, wc} | subs]}}, + [{:next_event, :internal, :transition}]} + end + + @impl true + def handle_resource( + @handle_unsubscribe, + _, + @unsubscribe, + %SL{ + data: %Sess{ + realm: realm, + transport: tt, + transport_pid: t, + db: db, + subscriptions: subs, + unsubscribe: {:unsubscribe, rid, subscription_id} + } + } = sl + ) do + subs = remove_subscription(subs, subscription_id, realm, db) + + send_to_peer(Broker.unsubscribed(%Unsubscribed{request_id: rid}), tt, t) + + {:ok, %SL{sl | data: %Sess{sl.data | subscriptions: subs}}, [{:next_event, :internal, :transition}]} end @@ -597,7 +624,7 @@ defmodule Wampex.Router.Session do transport_pid: t, registrations: regs, realm: realm, - subscriptions: _subs + subscriptions: subs } }) do Logger.warn("Router session terminating #{inspect(reason)}") @@ -606,6 +633,10 @@ defmodule Wampex.Router.Session do remove_registration(regs, realm, id, db) end) + Enum.each(subs, fn {id, _proc, _} -> + remove_subscription(subs, id, realm, db) + end) + send_to_peer( Peer.abort(%Abort{reason: "wamp.error.protocol_violation"}), tt, @@ -613,6 +644,39 @@ defmodule Wampex.Router.Session do ) end + defp remove_subscription(subs, subscription_id, realm, db) do + {id, topic, wc} = + sub = + Enum.find(subs, fn + {^subscription_id, _topic, _} -> true + _ -> false + end) + + {key, filter} = + case wc do + true -> + key = ClusterKV.get_wildcard_key(topic, ".", ":", "") + + {key, + fn {id, values}, value -> + {id, + Enum.filter(values, fn + {_, ^value} -> false + _ -> true + end)} + end} + + false -> + {topic, + fn {id, values}, value -> + {id, List.delete(values, value)} + end} + end + + ClusterKV.update(db, realm, key, {id, {self(), Node.self()}}, filter) + List.delete(subs, sub) + end + defp remove_registration(regs, realm, registration_id, db) do {id, proc} = reg = diff --git a/mix.exs b/mix.exs index 05c6a1b..15dfcb8 100644 --- a/mix.exs +++ b/mix.exs @@ -37,9 +37,10 @@ defmodule Wampex.MixProject do defp deps do [ + # {:cluster_kv, path: "../cluster_kv"}, {:cluster_kv, git: "https://gitlab.com/entropealabs/cluster_kv.git", - tag: "b169f82084f1218134407ab745b0734bbc5ef69c"}, + tag: "4c36b8d68f40711cd167edb9917d32a992f49561"}, {:cors_plug, "~> 2.0"}, {:credo, "~> 1.2", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false}, diff --git a/mix.lock b/mix.lock index 8511a80..c629b4f 100644 --- a/mix.lock +++ b/mix.lock @@ -1,7 +1,7 @@ %{ "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, "certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"}, - "cluster_kv": {:git, "https://gitlab.com/entropealabs/cluster_kv.git", "b169f82084f1218134407ab745b0734bbc5ef69c", [tag: "b169f82084f1218134407ab745b0734bbc5ef69c"]}, + "cluster_kv": {:git, "https://gitlab.com/entropealabs/cluster_kv.git", "4c36b8d68f40711cd167edb9917d32a992f49561", [tag: "4c36b8d68f40711cd167edb9917d32a992f49561"]}, "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"}, "cors_plug": {:hex, :cors_plug, "2.0.2", "2b46083af45e4bc79632bd951550509395935d3e7973275b2b743bd63cc942ce", [:mix], [{:plug, "~> 1.8", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "f0d0e13f71c51fd4ef8b2c7e051388e4dfb267522a83a22392c856de7e46465f"}, "coveralls": {:hex, :coveralls, "2.1.0", "b44cea21202c0e1994dfced8cedbff66bcd03eeeb565484345b28afacda0559b", [:rebar3], [{:jsx, "2.10.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "bbfbe2a7bebb2d22db15a657e6c1733243e1501714d60fe14a14790d96e7c3b2"}, diff --git a/test/support/test_subscriber.ex b/test/support/test_subscriber.ex index 1c803b3..e86d354 100644 --- a/test/support/test_subscriber.ex +++ b/test/support/test_subscriber.ex @@ -11,21 +11,21 @@ defmodule TestSubscriber do end def init({test, name, topic}) do - {:ok, sub} = Client.subscribe(name, %Subscribe{topic: topic}) - {:ok, sub} = Client.subscribe(name, %Subscribe{topic: "com.data.test"}) - {:ok, sub} = Client.subscribe(name, %Subscribe{topic: "com.data"}) + {:ok, sub1} = Client.subscribe(name, %Subscribe{topic: topic}) + {:ok, sub2} = Client.subscribe(name, %Subscribe{topic: "com.data.test"}) + {:ok, sub3} = Client.subscribe(name, %Subscribe{topic: "com.data"}) - {:ok, sub} = + {:ok, sub4} = Client.subscribe(name, %Subscribe{topic: "com...temp", options: %{"match" => "wildcard"}}) - {:ok, sub} = + {:ok, sub5} = Client.subscribe(name, %Subscribe{ topic: "com..test.temp", options: %{"match" => "wildcard"} }) - send(test, {:subscribed, sub}) - {:ok, {test, name, sub}} + send(test, {:subscribed, sub5}) + {:ok, {test, name, [sub1, sub2, sub3, sub4, sub5]}} end def handle_info(event, {test, _name, _sub} = state) do @@ -33,11 +33,13 @@ defmodule TestSubscriber do {:noreply, state} end - def terminate(_r, {_test, name, sub}) do - Client.send_request( - name, - Subscriber.unsubscribe(%Unsubscribe{subscription_id: sub}) - ) + def terminate(_r, {_test, name, subs}) do + Enum.each(subs, fn sub -> + Client.send_request( + name, + Subscriber.unsubscribe(%Unsubscribe{subscription_id: sub}) + ) + end) :ok end diff --git a/test/wampex_test.exs b/test/wampex_test.exs index 77027f0..24c4631 100644 --- a/test/wampex_test.exs +++ b/test/wampex_test.exs @@ -282,6 +282,25 @@ defmodule WampexTest do assert_receive {:registered, id} end + @tag :router + test "get wildcard key" do + subscriber_name = TestSubscriber + topic = ".test.light..status" + {:ok, _pid} = Client.start_link(name: subscriber_name, session: @session) + + assert {:ok, id} = + Client.send_request( + subscriber_name, + Subscriber.subscribe(%Subscribe{topic: topic, options: %{match: "wildcard"}}) + ) + + assert :ok = + Client.send_request( + subscriber_name, + Subscriber.unsubscribe(%Unsubscribe{subscription_id: id}) + ) + end + @tag :abort test "abort" do Process.flag(:trap_exit, true)