end
@impl true
- def handle([@unsubscribed, request_id]) do
+ def handle(<<@unsubscribed, request_id>>) do
{[{:next_event, :internal, :established}], request_id, :ok}
end
alias Wampex.Roles.Peer.{Abort, Challenge, Error, Welcome}
alias Wampex.Router
alias Wampex.Serializers.JSON
- alias Broker.{Subscribed, Published, Event}
+ alias Broker.{Subscribed, Published, Event, Unsubscribed}
alias Dealer.{Invocation, Registered, Result, Unregistered}
alias Router.Authentication
@register "Register"
@unregister "Unregister"
@subscribe "Subscribe"
- # @unsubscribe "Unsubscribe"
+ @unsubscribe "Unsubscribe"
@publish "Publish"
# @error "Error"
@abort "Abort"
@handle_register "HandleRegister"
@handle_unregister "HandleUnregister"
@handle_subscribe "HandleSubscribe"
- # @handle_unsubscribe "HandleUnsubscribe"
+ @handle_unsubscribe "HandleUnsubscribe"
@handle_publish "HandlePublish"
# @handle_interrupt "HandleInterrupt"
@handle_abort "HandleAbort"
) do
id = get_global_id()
- case opts do
- %{"match" => "wildcard"} ->
- ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "")
+ wc =
+ case opts do
+ %{"match" => "wildcard"} ->
+ ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "")
+ true
- _ ->
- ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}})
- end
+ _ ->
+ ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}})
+ false
+ end
send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t)
- {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic} | subs]}},
+ {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic, wc} | subs]}},
+ [{:next_event, :internal, :transition}]}
+ end
+
+ @impl true
+ def handle_resource(
+ @handle_unsubscribe,
+ _,
+ @unsubscribe,
+ %SL{
+ data: %Sess{
+ realm: realm,
+ transport: tt,
+ transport_pid: t,
+ db: db,
+ subscriptions: subs,
+ unsubscribe: {:unsubscribe, rid, subscription_id}
+ }
+ } = sl
+ ) do
+ subs = remove_subscription(subs, subscription_id, realm, db)
+
+ send_to_peer(Broker.unsubscribed(%Unsubscribed{request_id: rid}), tt, t)
+
+ {:ok, %SL{sl | data: %Sess{sl.data | subscriptions: subs}},
[{:next_event, :internal, :transition}]}
end
transport_pid: t,
registrations: regs,
realm: realm,
- subscriptions: _subs
+ subscriptions: subs
}
}) do
Logger.warn("Router session terminating #{inspect(reason)}")
remove_registration(regs, realm, id, db)
end)
+ Enum.each(subs, fn {id, _proc, _} ->
+ remove_subscription(subs, id, realm, db)
+ end)
+
send_to_peer(
Peer.abort(%Abort{reason: "wamp.error.protocol_violation"}),
tt,
)
end
+ defp remove_subscription(subs, subscription_id, realm, db) do
+ {id, topic, wc} =
+ sub =
+ Enum.find(subs, fn
+ {^subscription_id, _topic, _} -> true
+ _ -> false
+ end)
+
+ {key, filter} =
+ case wc do
+ true ->
+ key = ClusterKV.get_wildcard_key(topic, ".", ":", "")
+
+ {key,
+ fn {id, values}, value ->
+ {id,
+ Enum.filter(values, fn
+ {_, ^value} -> false
+ _ -> true
+ end)}
+ end}
+
+ false ->
+ {topic,
+ fn {id, values}, value ->
+ {id, List.delete(values, value)}
+ end}
+ end
+
+ ClusterKV.update(db, realm, key, {id, {self(), Node.self()}}, filter)
+ List.delete(subs, sub)
+ end
+
defp remove_registration(regs, realm, registration_id, db) do
{id, proc} =
reg =
defp deps do
[
+ # {:cluster_kv, path: "../cluster_kv"},
{:cluster_kv,
git: "https://gitlab.com/entropealabs/cluster_kv.git",
- tag: "b169f82084f1218134407ab745b0734bbc5ef69c"},
+ tag: "4c36b8d68f40711cd167edb9917d32a992f49561"},
{:cors_plug, "~> 2.0"},
{:credo, "~> 1.2", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
%{
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"},
- "cluster_kv": {:git, "https://gitlab.com/entropealabs/cluster_kv.git", "b169f82084f1218134407ab745b0734bbc5ef69c", [tag: "b169f82084f1218134407ab745b0734bbc5ef69c"]},
+ "cluster_kv": {:git, "https://gitlab.com/entropealabs/cluster_kv.git", "4c36b8d68f40711cd167edb9917d32a992f49561", [tag: "4c36b8d68f40711cd167edb9917d32a992f49561"]},
"conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"},
"cors_plug": {:hex, :cors_plug, "2.0.2", "2b46083af45e4bc79632bd951550509395935d3e7973275b2b743bd63cc942ce", [:mix], [{:plug, "~> 1.8", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "f0d0e13f71c51fd4ef8b2c7e051388e4dfb267522a83a22392c856de7e46465f"},
"coveralls": {:hex, :coveralls, "2.1.0", "b44cea21202c0e1994dfced8cedbff66bcd03eeeb565484345b28afacda0559b", [:rebar3], [{:jsx, "2.10.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "bbfbe2a7bebb2d22db15a657e6c1733243e1501714d60fe14a14790d96e7c3b2"},
end
def init({test, name, topic}) do
- {:ok, sub} = Client.subscribe(name, %Subscribe{topic: topic})
- {:ok, sub} = Client.subscribe(name, %Subscribe{topic: "com.data.test"})
- {:ok, sub} = Client.subscribe(name, %Subscribe{topic: "com.data"})
+ {:ok, sub1} = Client.subscribe(name, %Subscribe{topic: topic})
+ {:ok, sub2} = Client.subscribe(name, %Subscribe{topic: "com.data.test"})
+ {:ok, sub3} = Client.subscribe(name, %Subscribe{topic: "com.data"})
- {:ok, sub} =
+ {:ok, sub4} =
Client.subscribe(name, %Subscribe{topic: "com...temp", options: %{"match" => "wildcard"}})
- {:ok, sub} =
+ {:ok, sub5} =
Client.subscribe(name, %Subscribe{
topic: "com..test.temp",
options: %{"match" => "wildcard"}
})
- send(test, {:subscribed, sub})
- {:ok, {test, name, sub}}
+ send(test, {:subscribed, sub5})
+ {:ok, {test, name, [sub1, sub2, sub3, sub4, sub5]}}
end
def handle_info(event, {test, _name, _sub} = state) do
{:noreply, state}
end
- def terminate(_r, {_test, name, sub}) do
- Client.send_request(
- name,
- Subscriber.unsubscribe(%Unsubscribe{subscription_id: sub})
- )
+ def terminate(_r, {_test, name, subs}) do
+ Enum.each(subs, fn sub ->
+ Client.send_request(
+ name,
+ Subscriber.unsubscribe(%Unsubscribe{subscription_id: sub})
+ )
+ end)
:ok
end
assert_receive {:registered, id}
end
+ @tag :router
+ test "get wildcard key" do
+ subscriber_name = TestSubscriber
+ topic = ".test.light..status"
+ {:ok, _pid} = Client.start_link(name: subscriber_name, session: @session)
+
+ assert {:ok, id} =
+ Client.send_request(
+ subscriber_name,
+ Subscriber.subscribe(%Subscribe{topic: topic, options: %{match: "wildcard"}})
+ )
+
+ assert :ok =
+ Client.send_request(
+ subscriber_name,
+ Subscriber.unsubscribe(%Unsubscribe{subscription_id: id})
+ )
+ end
+
@tag :abort
test "abort" do
Process.flag(:trap_exit, true)