]> Entropealabs - wampex.git/commitdiff
adds unsubscribe and removes subscriptions on session close
authorChristopher <chris@entropealabs.com>
Wed, 18 Mar 2020 20:34:12 +0000 (15:34 -0500)
committerChristopher <chris@entropealabs.com>
Wed, 18 Mar 2020 20:34:12 +0000 (15:34 -0500)
lib/roles/subscriber.ex
lib/router/session.ex
mix.exs
mix.lock
test/support/test_subscriber.ex
test/wampex_test.exs

index dfced57332f3b57e18ef8d812dc9b8833509bbf4..de25f2ed7f68ceea83677abe1e220900511f45c4 100644 (file)
@@ -54,7 +54,7 @@ defmodule Wampex.Roles.Subscriber do
   end
 
   @impl true
-  def handle([@unsubscribed, request_id]) do
+  def handle(<<@unsubscribed, request_id>>) do
     {[{:next_event, :internal, :established}], request_id, :ok}
   end
 
index e68ed33da43cfd75554c1123ab5deaf183b94ae0..0727349b8da8160269aa91b8d735f0617cf8ad21 100644 (file)
@@ -13,7 +13,7 @@ defmodule Wampex.Router.Session do
   alias Wampex.Roles.Peer.{Abort, Challenge, Error, Welcome}
   alias Wampex.Router
   alias Wampex.Serializers.JSON
-  alias Broker.{Subscribed, Published, Event}
+  alias Broker.{Subscribed, Published, Event, Unsubscribed}
   alias Dealer.{Invocation, Registered, Result, Unregistered}
   alias Router.Authentication
 
@@ -90,7 +90,7 @@ defmodule Wampex.Router.Session do
   @register "Register"
   @unregister "Unregister"
   @subscribe "Subscribe"
-  @unsubscribe "Unsubscribe"
+  @unsubscribe "Unsubscribe"
   @publish "Publish"
   # @error "Error"
   @abort "Abort"
@@ -107,7 +107,7 @@ defmodule Wampex.Router.Session do
   @handle_register "HandleRegister"
   @handle_unregister "HandleUnregister"
   @handle_subscribe "HandleSubscribe"
-  @handle_unsubscribe "HandleUnsubscribe"
+  @handle_unsubscribe "HandleUnsubscribe"
   @handle_publish "HandlePublish"
   # @handle_interrupt "HandleInterrupt"
   @handle_abort "HandleAbort"
@@ -312,17 +312,44 @@ defmodule Wampex.Router.Session do
       ) do
     id = get_global_id()
 
-    case opts do
-      %{"match" => "wildcard"} ->
-        ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "")
+    wc =
+      case opts do
+        %{"match" => "wildcard"} ->
+          ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "")
+          true
 
-      _ ->
-        ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}})
-    end
+        _ ->
+          ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}})
+          false
+      end
 
     send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t)
 
-    {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic} | subs]}},
+    {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic, wc} | subs]}},
+     [{:next_event, :internal, :transition}]}
+  end
+
+  @impl true
+  def handle_resource(
+        @handle_unsubscribe,
+        _,
+        @unsubscribe,
+        %SL{
+          data: %Sess{
+            realm: realm,
+            transport: tt,
+            transport_pid: t,
+            db: db,
+            subscriptions: subs,
+            unsubscribe: {:unsubscribe, rid, subscription_id}
+          }
+        } = sl
+      ) do
+    subs = remove_subscription(subs, subscription_id, realm, db)
+
+    send_to_peer(Broker.unsubscribed(%Unsubscribed{request_id: rid}), tt, t)
+
+    {:ok, %SL{sl | data: %Sess{sl.data | subscriptions: subs}},
      [{:next_event, :internal, :transition}]}
   end
 
@@ -597,7 +624,7 @@ defmodule Wampex.Router.Session do
           transport_pid: t,
           registrations: regs,
           realm: realm,
-          subscriptions: _subs
+          subscriptions: subs
         }
       }) do
     Logger.warn("Router session terminating #{inspect(reason)}")
@@ -606,6 +633,10 @@ defmodule Wampex.Router.Session do
       remove_registration(regs, realm, id, db)
     end)
 
+    Enum.each(subs, fn {id, _proc, _} ->
+      remove_subscription(subs, id, realm, db)
+    end)
+
     send_to_peer(
       Peer.abort(%Abort{reason: "wamp.error.protocol_violation"}),
       tt,
@@ -613,6 +644,39 @@ defmodule Wampex.Router.Session do
     )
   end
 
+  defp remove_subscription(subs, subscription_id, realm, db) do
+    {id, topic, wc} =
+      sub =
+      Enum.find(subs, fn
+        {^subscription_id, _topic, _} -> true
+        _ -> false
+      end)
+
+    {key, filter} =
+      case wc do
+        true ->
+          key = ClusterKV.get_wildcard_key(topic, ".", ":", "")
+
+          {key,
+           fn {id, values}, value ->
+             {id,
+              Enum.filter(values, fn
+                {_, ^value} -> false
+                _ -> true
+              end)}
+           end}
+
+        false ->
+          {topic,
+           fn {id, values}, value ->
+             {id, List.delete(values, value)}
+           end}
+      end
+
+    ClusterKV.update(db, realm, key, {id, {self(), Node.self()}}, filter)
+    List.delete(subs, sub)
+  end
+
   defp remove_registration(regs, realm, registration_id, db) do
     {id, proc} =
       reg =
diff --git a/mix.exs b/mix.exs
index 05c6a1b3aa78fa7f2edb7eca0ba6e6b9bc5378ff..15dfcb858018d06ef510bf32fa9a289564be32e0 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -37,9 +37,10 @@ defmodule Wampex.MixProject do
 
   defp deps do
     [
+      # {:cluster_kv, path: "../cluster_kv"},
       {:cluster_kv,
        git: "https://gitlab.com/entropealabs/cluster_kv.git",
-       tag: "b169f82084f1218134407ab745b0734bbc5ef69c"},
+       tag: "4c36b8d68f40711cd167edb9917d32a992f49561"},
       {:cors_plug, "~> 2.0"},
       {:credo, "~> 1.2", only: [:dev, :test], runtime: false},
       {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
index 8511a809002a30ac2cbf8fd830556d533af28540..c629b4f9011ed756142c605237fcc2a78a897a2b 100644 (file)
--- a/mix.lock
+++ b/mix.lock
@@ -1,7 +1,7 @@
 %{
   "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
   "certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"},
-  "cluster_kv": {:git, "https://gitlab.com/entropealabs/cluster_kv.git", "b169f82084f1218134407ab745b0734bbc5ef69c", [tag: "b169f82084f1218134407ab745b0734bbc5ef69c"]},
+  "cluster_kv": {:git, "https://gitlab.com/entropealabs/cluster_kv.git", "4c36b8d68f40711cd167edb9917d32a992f49561", [tag: "4c36b8d68f40711cd167edb9917d32a992f49561"]},
   "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"},
   "cors_plug": {:hex, :cors_plug, "2.0.2", "2b46083af45e4bc79632bd951550509395935d3e7973275b2b743bd63cc942ce", [:mix], [{:plug, "~> 1.8", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "f0d0e13f71c51fd4ef8b2c7e051388e4dfb267522a83a22392c856de7e46465f"},
   "coveralls": {:hex, :coveralls, "2.1.0", "b44cea21202c0e1994dfced8cedbff66bcd03eeeb565484345b28afacda0559b", [:rebar3], [{:jsx, "2.10.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "bbfbe2a7bebb2d22db15a657e6c1733243e1501714d60fe14a14790d96e7c3b2"},
index 1c803b3ac9865fb67386cfe69eb143b0b553fc9c..e86d354e30263681e0bbcf5d479107f9a968c652 100644 (file)
@@ -11,21 +11,21 @@ defmodule TestSubscriber do
   end
 
   def init({test, name, topic}) do
-    {:ok, sub} = Client.subscribe(name, %Subscribe{topic: topic})
-    {:ok, sub} = Client.subscribe(name, %Subscribe{topic: "com.data.test"})
-    {:ok, sub} = Client.subscribe(name, %Subscribe{topic: "com.data"})
+    {:ok, sub1} = Client.subscribe(name, %Subscribe{topic: topic})
+    {:ok, sub2} = Client.subscribe(name, %Subscribe{topic: "com.data.test"})
+    {:ok, sub3} = Client.subscribe(name, %Subscribe{topic: "com.data"})
 
-    {:ok, sub} =
+    {:ok, sub4} =
       Client.subscribe(name, %Subscribe{topic: "com...temp", options: %{"match" => "wildcard"}})
 
-    {:ok, sub} =
+    {:ok, sub5} =
       Client.subscribe(name, %Subscribe{
         topic: "com..test.temp",
         options: %{"match" => "wildcard"}
       })
 
-    send(test, {:subscribed, sub})
-    {:ok, {test, name, sub}}
+    send(test, {:subscribed, sub5})
+    {:ok, {test, name, [sub1, sub2, sub3, sub4, sub5]}}
   end
 
   def handle_info(event, {test, _name, _sub} = state) do
@@ -33,11 +33,13 @@ defmodule TestSubscriber do
     {:noreply, state}
   end
 
-  def terminate(_r, {_test, name, sub}) do
-    Client.send_request(
-      name,
-      Subscriber.unsubscribe(%Unsubscribe{subscription_id: sub})
-    )
+  def terminate(_r, {_test, name, subs}) do
+    Enum.each(subs, fn sub ->
+      Client.send_request(
+        name,
+        Subscriber.unsubscribe(%Unsubscribe{subscription_id: sub})
+      )
+    end)
 
     :ok
   end
index 77027f0e4455468c59511bb30460306d137ee140..24c4631834f80eeee29e1e9b2c4d10e4c20749c5 100644 (file)
@@ -282,6 +282,25 @@ defmodule WampexTest do
     assert_receive {:registered, id}
   end
 
+  @tag :router
+  test "get wildcard key" do
+    subscriber_name = TestSubscriber
+    topic = ".test.light..status"
+    {:ok, _pid} = Client.start_link(name: subscriber_name, session: @session)
+
+    assert {:ok, id} =
+             Client.send_request(
+               subscriber_name,
+               Subscriber.subscribe(%Subscribe{topic: topic, options: %{match: "wildcard"}})
+             )
+
+    assert :ok =
+             Client.send_request(
+               subscriber_name,
+               Subscriber.unsubscribe(%Unsubscribe{subscription_id: id})
+             )
+  end
+
   @tag :abort
   test "abort" do
     Process.flag(:trap_exit, true)