assert {"test1:test", [:world, :cruel, :hello]} = ClusterKV.get(db, @keyspace, "test1:test")
- assert [{["com", "test", "", "temp"], {1_003_039_494, me}}] =
+ assert [{["com", "test", "", "temp"], {1_003_039_494, _me}}] =
ClusterKV.wildcard(
db,
@keyspace,
defmodule TestCallee do
@moduledoc false
- use GenServer
+ use Wampex.Client.Handler
+ @behaviour Wampex.Client.Handler
require Logger
- alias Wampex.Client
- alias Wampex.Roles.{Callee, Dealer}
- alias Wampex.Roles.Peer.Error
- alias Callee.{Register, Yield}
- alias Dealer.Invocation
- def start_link(test, name, device) do
- GenServer.start_link(__MODULE__, {test, name, device})
- end
-
- def init({test, name, device}) do
- Client.add(name, self())
- {:ok, {test, name, device}}
- end
+ @impl true
+ def do_init(test: test, device: device), do: %{device: device, test: test, client_name: nil}
- def handle_info({:connected, _}, {test, name, device} = state) do
- {:ok, _reg} = Client.register(name, %Register{procedure: "com.actuator.#{device}.light"})
- {:ok, reg} = Client.register(name, %Register{procedure: "return.error"})
- send(test, {:registered, reg})
+ @impl true
+ def handle_continue({:registered, %{registrations: [{:ok, id} | _]}}, %{test: test} = state) do
+ send(test, {:registered, id})
{:noreply, state}
end
- def handle_info(
- %Invocation{
- request_id: id,
- details: %{"procedure" => "return.error"},
- arg_list: al,
- arg_kw: akw
- },
- {_test, name, _device} = state
- ) do
- Client.error(
- name,
- %Error{request_id: id, error: "this.is.an.error", arg_list: al, arg_kw: akw}
- )
-
- {:noreply, state}
+ invocation("return.error", al, akw, state) do
+ {:error, "this.is.an.error", al, akw, state}
end
- def handle_info(
- %Invocation{request_id: id, details: _dets, arg_kw: arg_kw} = invocation,
- {test, name, _device} = state
- ) do
- send(test, invocation)
-
- Client.yield(
- name,
- %Yield{
- request_id: id,
- arg_list: [:ok],
- arg_kw: %{color: Map.get(arg_kw, "color")}
- }
- )
-
- {:noreply, state}
+ invocation("com.actuator.as987d9a8sd79a87ds.light", al, akw, %{test: test} = state) do
+ send(test, al)
+ {:ok, [:ok], %{color: Map.get(akw, "color")}, state}
end
end
defmodule TestSubscriber do
@moduledoc false
- use GenServer
+ use Wampex.Client.Handler
+ @behaviour Wampex.Client.Handler
require Logger
- alias Wampex.Client
- alias Wampex.Roles.Subscriber.Subscribe
- def start_link(test, name, topic) do
- GenServer.start_link(__MODULE__, {test, name, topic})
+ @impl true
+ def do_init(test: test, topic: topic), do: %{test: test, topic: topic, client_name: nil}
+
+ def handle_continue({:registered, %{subscriptions: [{:ok, id} | _]}}, %{test: test} = state) do
+ send(test, {:subscribed, id})
+ {:noreply, state}
end
- def init({test, name, topic}) do
- Client.add(name, self())
- {:ok, {test, name, topic}}
+ event("com.data.test.temp", al, _kw, %{test: test} = state) do
+ send(test, al)
+ state
end
- def handle_info({:connected, _client}, {test, name, topic}) do
- {:ok, _sub1} = Client.subscribe(name, %Subscribe{topic: topic})
- {:ok, _sub2} = Client.subscribe(name, %Subscribe{topic: "com.data.test"})
- {:ok, _sub3} = Client.subscribe(name, %Subscribe{topic: "com.data"})
+ event("com.data.test", al, _kw, %{test: test} = state) do
+ send(test, al)
+ state
+ end
- {:ok, _sub4} =
- Client.subscribe(name, %Subscribe{topic: "com...temp", options: %{"match" => "wildcard"}})
+ event("com.data", al, _kw, %{test: test} = state) do
+ send(test, al)
+ state
+ end
- {:ok, sub5} =
- Client.subscribe(name, %Subscribe{
- topic: "com..test.temp",
- options: %{"match" => "wildcard"}
- })
+ event({"com...temp", "wildcard"}, al, _kw, %{test: test} = state) do
+ send(test, al)
+ state
+ end
- send(test, {:subscribed, sub5})
- {:noreply, {test, name, topic}}
+ event({"com..test.temp", "wildcard"}, al, _kw, %{test: test} = state) do
+ send(test, al)
+ state
end
- def handle_info(event, {test, _name, _topic} = state) do
- send(test, event)
- {:noreply, state}
+ event("router.peer.disconnect", al, _kw, %{test: test} = state) do
+ send(test, al)
+ state
end
end
alias Wampex.Client
alias Wampex.Client.{Authentication, Realm, Session}
- alias Wampex.Roles.Broker.Event
alias Wampex.Roles.{Callee, Caller, Peer, Publisher, Subscriber}
alias Wampex.Roles.Caller.Call
- alias Wampex.Roles.Dealer.{Invocation, Result}
+ alias Wampex.Roles.Dealer.Result
alias Wampex.Roles.Peer.{Error, Hello}
alias Wampex.Roles.Publisher.Publish
alias Wampex.Router
test "role not supported" do
callee_name = TestCalleeAbortRegistration
Client.start_link(name: callee_name, session: @session, reconnect: false)
- TestCallee.start_link(self(), callee_name, @device)
+ TestCallee.start_link(test: self(), client_name: callee_name, device: @device)
name = TestCallerFail
[_ | t] = @session.roles
test "callee registration" do
name = TestCalleeRegistration
Client.start_link(name: name, session: @session, reconnect: false)
- TestCallee.start_link(self(), name, @device)
- assert_receive {:registered, id}, 500
+ TestCallee.start_link(test: self(), client_name: name, device: @device)
+ assert_receive {:registered, _id}, 500
end
@tag :abort
test "caller receives error when callee returns an error" do
callee_name = TestErrorCalleeRespond
Client.start_link(name: callee_name, session: @session, reconnect: false)
- TestCallee.start_link(self(), callee_name, @device)
- assert_receive {:registered, id}
+ TestCallee.start_link(test: self(), client_name: callee_name, device: @device)
+ assert_receive {:registered, _id}
caller_name = TestErrorCaller
Client.start_link(name: caller_name, session: @session, reconnect: false)
test "callee is invoked and responds and caller gets result" do
callee_name = TestCalleeRespond
Client.start_link(name: callee_name, session: @session, reconnect: false)
- TestCallee.start_link(self(), callee_name, @device)
- assert_receive {:registered, id}
+ TestCallee.start_link(test: self(), client_name: callee_name, device: @device)
+ assert_receive {:registered, _id}
caller_name = TestCaller
Client.start_link(name: caller_name, session: @session, reconnect: false)
}
)
- assert_receive %Invocation{}
+ assert_receive [1]
end
@tag :client
test "subscriber registration" do
name = TestSubscriberRegister
Client.start_link(name: name, session: @session, reconnect: false)
- TestSubscriber.start_link(self(), name, "com.data.temp")
- assert_receive {:subscribed, id}
+ TestSubscriber.start_link(test: self(), client_name: name, topic: "com.data.temp")
+ assert_receive {:subscribed, _id}
end
@tag :client
test "subscriber receives disconnect event" do
name = TestSubscriberDisconnectEvents
Client.start_link(name: name, session: @session, reconnect: false)
- TestSubscriber.start_link(self(), name, "router.peer.disconnect")
- assert_receive {:subscribed, id}
+ TestSubscriber.start_link(test: self(), client_name: name, topic: "router.peer.disconnect")
+ assert_receive {:subscribed, _id}
Task.start(fn ->
{:ok, pid} =
Process.exit(pid, :normal)
end)
- assert_receive %Event{details: %{"topic" => "router.peer.disconnect"}}, 500
+ assert_receive [], 500
end
@tag :client
test "subscriber receives events from publisher" do
name = TestSubscriberEvents
Client.start_link(name: name, session: @session, reconnect: false)
- TestSubscriber.start_link(self(), name, "com.data.test.temp")
+ TestSubscriber.start_link(test: self(), client_name: name, topic: "com.data.test.temp")
Client.start_link(name: TestPublisher, session: @session, reconnect: false)
}
)
- assert_receive %Event{details: %{"topic" => "com.data.test.temp"}}
- assert_receive %Event{details: %{"topic" => "com.data.test.temp"}}
- assert_receive %Event{details: %{"topic" => "com.data.test.temp"}}
+ assert_receive {:subscribed, _}
+ assert_receive [12.5, 45.6, 87.5]
+ assert_receive [12.5, 45.6, 87.5]
end
end