alias Wampex.Session, as: Sess
alias Wampex.Role.{Callee, Subscriber}
+ alias Wampex.Role.Callee.Register
+ alias Wampex.Role.Subscriber.Subscribe
@type message_part :: integer() | binary() | map() | list()
@type message :: nonempty_list(message_part())
@spec transport_name(module()) :: module()
def transport_name(name), do: Module.concat([name, Transport])
- @spec subscribe(name :: module(), topic :: binary(), timeout :: integer()) :: {:ok, integer()}
- def subscribe(name, topic, timeout \\ 5000) do
- {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(topic), timeout)
- Registry.register(subscriber_registry_name(name), id, topic)
+ @spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) ::
+ {:ok, integer()}
+ def subscribe(name, %Subscribe{topic: t} = sub, timeout \\ 5000) do
+ {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(sub), timeout)
+ Registry.register(subscriber_registry_name(name), id, t)
{:ok, id}
end
- @spec register(name :: module(), procedure :: binary(), timeout :: integer()) ::
+ @spec register(name :: module(), register :: Register.t(), timeout :: integer()) ::
{:ok, integer()}
- def register(name, procedure, timeout \\ 5000) do
- {:ok, id} = Sess.send_request(session_name(name), Callee.register(procedure), timeout)
- Registry.register(callee_registry_name(name), id, procedure)
+ def register(name, %Register{procedure: p} = reg, timeout \\ 5000) do
+ {:ok, id} = Sess.send_request(session_name(name), Callee.register(reg), timeout)
+ Registry.register(callee_registry_name(name), id, p)
{:ok, id}
end
@invocation 68
@yield 70
- @impl true
- def add(roles) do
- Map.put(roles, :callee, %{})
+ defmodule Register do
+ @moduledoc false
+ @enforce_keys [:procedure]
+ defstruct [:procedure, options: %{}]
+
+ @type t :: %__MODULE__{
+ procedure: binary(),
+ options: map()
+ }
end
- @spec register(procedure :: binary()) :: Wampex.message()
- def register(procedure) do
- [@register, %{}, procedure]
+ defmodule Unregister do
+ @moduledoc false
+ @enforce_keys [:registration_id]
+ defstruct [:registration_id]
+
+ @type t :: %__MODULE__{
+ registration_id: integer()
+ }
end
- @spec unregister(registration_id :: integer()) :: Wampex.message()
- def unregister(registration_id) do
- [@unregister, registration_id]
+ defmodule Yield do
+ @moduledoc false
+ @enforce_keys [:request_id]
+ defstruct [:request_id, :arg_list, :arg_kw, options: %{}]
+
+ @type t :: %__MODULE__{
+ request_id: integer(),
+ arg_list: nonempty_list(any()) | nil,
+ arg_kw: map() | nil,
+ options: map()
+ }
+ end
+
+ @impl true
+ def add(roles) do
+ Map.put(roles, :callee, %{})
end
- @spec yield(request_id :: integer()) :: Wampex.message()
- def yield(request_id) do
- [@yield, request_id, %{}]
+ @spec register(Register.t()) :: Wampex.message()
+ def register(%Register{procedure: p, options: opts}) do
+ [@register, opts, p]
end
- @spec yield(request_id :: integer(), arg_l :: nonempty_list(any())) :: Wampex.message()
- def yield(request_id, arg_l) do
- [@yield, request_id, %{}, arg_l]
+ @spec unregister(Unregister.t()) :: Wampex.message()
+ def unregister(%Unregister{registration_id: ri}) do
+ [@unregister, ri]
end
- @spec yield(request_id :: integer(), arg_l :: nonempty_list(any()), arg_kw :: map()) ::
- Wampex.message()
- def yield(request_id, arg_l, arg_kw) do
- [@yield, request_id, %{}, arg_l, arg_kw]
+ @spec yield(Yield.t()) :: Wampex.message()
+ def yield(%Yield{request_id: ri, arg_list: al, arg_kw: akw, options: opts}) do
+ [@yield, ri, opts, al, akw]
end
@impl true
@call 48
@result 50
+ defmodule Call do
+ @moduledoc false
+ @enforce_keys [:procedure]
+ defstruct [:procedure, :arg_list, :arg_kw, options: %{}]
+
+ @type t :: %__MODULE__{
+ procedure: binary(),
+ arg_list: nonempty_list(any()) | nil,
+ arg_kw: map() | nil,
+ options: map()
+ }
+ end
+
@impl true
def add(roles) do
Map.put(roles, :caller, %{})
end
- @spec call(procedure :: binary()) :: Wampex.message()
- def call(procedure) do
- [@call, %{}, procedure]
- end
-
- @spec call(procedure :: binary(), arg_l :: nonempty_list(any())) :: Wampex.message()
- def call(procedure, arg_l) do
- [@call, %{}, procedure, arg_l]
- end
-
- @spec call(procedure :: binary(), arg_l :: nonempty_list(any()), arg_kw :: map()) ::
- Wampex.message()
- def call(procedure, arg_l, arg_kw) do
- [@call, %{}, procedure, arg_l, arg_kw]
+ @spec call(Call.t()) :: Wampex.message()
+ def call(%Call{procedure: p, arg_list: al, arg_kw: akw, options: opts}) do
+ [@call, opts, p, al, akw]
end
@impl true
@goodbye 6
@error 8
+ defmodule Hello do
+ @moduledoc false
+ @enforce_keys [:realm, :roles]
+ defstruct [:realm, :roles, agent: "WAMPex"]
+
+ @type t :: %__MODULE__{
+ realm: binary(),
+ roles: nonempty_list(module()),
+ agent: binary()
+ }
+ end
+
+ defmodule Goodbye do
+ @moduledoc false
+ @enforce_keys [:reason]
+ defstruct [:reason, options: %{}]
+
+ @type t :: %__MODULE__{
+ reason: binary(),
+ options: map()
+ }
+ end
+
@impl true
def add(roles), do: roles
- @spec hello(realm :: binary(), roles :: nonempty_list(module())) :: Wampex.message()
- def hello(realm, roles) do
- [@hello, realm, %{roles: Enum.reduce(roles, %{}, fn r, acc -> r.add(acc) end)}]
+ @spec hello(Hello.t()) :: Wampex.message()
+ def hello(%Hello{realm: r, roles: roles, agent: agent}) do
+ [@hello, r, %{agent: agent, roles: Enum.reduce(roles, %{}, fn r, acc -> r.add(acc) end)}]
end
- @spec goodbye(reason :: binary()) :: Wampex.message()
- def goodbye(reason) do
- [@goodbye, %{}, reason]
+ @spec goodbye(Goodbye.t()) :: Wampex.message()
+ def goodbye(%Goodbye{reason: r, options: opts}) do
+ [@goodbye, opts, r]
end
@impl true
@publish 16
@published 17
- @impl true
- def add(roles) do
- Map.put(roles, :publisher, %{})
- end
+ defmodule Publish do
+ @moduledoc false
+ @enforce_keys [:topic]
+ defstruct [:topic, :arg_list, :arg_kw, options: %{}]
- @spec publish(topic :: binary()) :: Wampex.message()
- def publish(topic) do
- [@publish, %{}, topic]
+ @type t :: %__MODULE__{
+ topic: binary(),
+ arg_list: nonempty_list(any()) | nil,
+ arg_kw: map() | nil,
+ options: map()
+ }
end
- @spec publish(topic :: binary(), arg_l :: nonempty_list(any())) :: Wampex.message()
- def publish(topic, arg_l) do
- [@publish, %{}, topic, arg_l]
+ @impl true
+ def add(roles) do
+ Map.put(roles, :publisher, %{})
end
- @spec publish(topic :: binary(), arg_l :: nonempty_list(any()), arg_kw :: map()) ::
- Wampex.message()
- def publish(topic, arg_l, arg_kw) do
- [@publish, %{}, topic, arg_l, arg_kw]
+ @spec publish(Publish.t()) :: Wampex.message()
+ def publish(%Publish{topic: t, options: opts, arg_list: al, arg_kw: akw}) do
+ [@publish, opts, t, al, akw]
end
@impl true
@unsubscribed 35
@event 36
+ defmodule Subscribe do
+ @moduledoc false
+ @enforce_keys [:topic]
+ defstruct [:topic, options: %{}]
+
+ @type t :: %__MODULE__{
+ topic: binary(),
+ options: map()
+ }
+ end
+
+ defmodule Unsubscribe do
+ @moduledoc false
+ @enforce_keys [:subscription_id]
+ defstruct [:subscription_id]
+
+ @type t :: %__MODULE__{
+ subscription_id: integer()
+ }
+ end
+
@impl true
def add(roles) do
Map.put(roles, :subscriber, %{})
end
- @spec subscribe(topic :: binary()) :: Wampex.message()
- def subscribe(topic) do
- [@subscribe, %{}, topic]
+ @spec subscribe(Subscribe.t()) :: Wampex.message()
+ def subscribe(%Subscribe{topic: t, options: opts}) do
+ [@subscribe, opts, t]
end
- @spec unsubscribe(subscription_id :: integer()) :: Wampex.message()
- def unsubscribe(subscription_id) do
- [@unsubscribe, subscription_id]
+ @spec unsubscribe(Unsubscribe.t()) :: Wampex.message()
+ def unsubscribe(%Unsubscribe{subscription_id: si}) do
+ [@unsubscribe, si]
end
@impl true
alias StatesLanguage, as: SL
alias Wampex.Realm
alias Wampex.Role.Peer
+ alias Wampex.Role.Peer.Hello
alias Wampex.Serializer.MessagePack
alias __MODULE__, as: Sess
alias Wampex.Transport.WebSocket
data: %Sess{transport: tt, transport_pid: t, roles: roles, realm: %Realm{name: realm}}
} = data
) do
- tt.send_request(t, Peer.hello(realm, roles))
+ tt.send_request(t, Peer.hello(%Hello{realm: realm, roles: roles}))
{:ok, data, [{:next_event, :internal, :hello_sent}]}
end
defp do_send(r_id, tt, t, message) do
{request_id, message} = maybe_inject_request_id(r_id, message)
- tt.send_request(t, message)
+ tt.send_request(t, remove_nil_values(message))
request_id
end
+ defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1)
+
defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {r_id, message}
defp maybe_inject_request_id(r_id, message) do
use GenServer
require Logger
alias Wampex.Role.Callee
+ alias Callee.{Register, Unregister, Yield}
def start_link(test, name, device) do
GenServer.start_link(__MODULE__, {test, name, device})
end
def init({test, name, device}) do
- {:ok, reg} = Wampex.register(name, "com.actuator.#{device}.light")
+ {:ok, reg} = Wampex.register(name, %Register{procedure: "com.actuator.#{device}.light"})
send(test, {:registered, reg})
{:ok, {test, name, reg}}
end
Wampex.cast_send_request(
name,
- Callee.yield(id, [:ok], %{color: Map.get(arg_kw, "color")})
+ Callee.yield(%Yield{
+ request_id: id,
+ arg_list: [:ok],
+ arg_kw: %{color: Map.get(arg_kw, "color")}
+ })
)
{:noreply, state}
def terminate(_reason, {_test, name, reg}) do
Wampex.send_request(
name,
- Callee.unregister(reg)
+ Callee.unregister(%Unregister{registration_id: reg})
)
:ok
use GenServer
require Logger
alias Wampex.Role.Subscriber
+ alias Subscriber.{Subscribe, Unsubscribe}
def start_link(test, name, topic) do
GenServer.start_link(__MODULE__, {test, name, topic})
end
def init({test, name, topic}) do
- {:ok, sub} = Wampex.subscribe(name, topic)
+ {:ok, sub} = Wampex.subscribe(name, %Subscribe{topic: topic})
send(test, {:subscribed, sub})
{:ok, {test, name, sub}}
end
def terminate(_r, {_test, name, sub}) do
Wampex.send_request(
name,
- Subscriber.unsubscribe(sub)
+ Subscriber.unsubscribe(%Unsubscribe{subscription_id: sub})
)
:ok
alias Wampex.{Realm, Session}
alias Wampex.Role.{Callee, Caller, Peer, Publisher, Subscriber}
+ alias Callee.{Register, Unregister, Yield}
+ alias Caller.Call
+ alias Peer.{Goodbye, Hello}
+ alias Publisher.Publish
+ alias Subscriber.{Subscribe, Unsubscribe}
require Logger
@url "ws://localhost:18080/ws"
end
test "Callee.register" do
- assert [64, %{}, "test"] = Callee.register("test")
+ assert [64, %{}, "test"] = Callee.register(%Register{procedure: "test"})
end
test "Callee.unregister" do
- assert [66, 123_456] = Callee.unregister(123_456)
+ assert [66, 123_456] = Callee.unregister(%Unregister{registration_id: 123_456})
end
test "Callee.yield" do
- assert [70, 1234, %{}] = Callee.yield(1234)
- assert [70, 1234, %{}, []] = Callee.yield(1234, [])
- assert [70, 1234, %{}, [], %{}] = Callee.yield(1234, [], %{})
+ assert [70, 1234, %{}, nil, nil] = Callee.yield(%Yield{request_id: 1234})
+ assert [70, 1234, %{}, ["1"], nil] = Callee.yield(%Yield{request_id: 1234, arg_list: ["1"]})
+
+ assert [70, 1234, %{test: 1}, [2], %{}] =
+ Callee.yield(%Yield{
+ request_id: 1234,
+ arg_list: [2],
+ arg_kw: %{},
+ options: %{test: 1}
+ })
end
test "Caller.add" do
end
test "Caller.call" do
- assert [48, %{}, "test"] = Caller.call("test")
- assert [48, %{}, "test", []] = Caller.call("test", [])
- assert [48, %{}, "test", [], %{}] = Caller.call("test", [], %{})
+ assert [48, %{}, "test", nil, nil] = Caller.call(%Call{procedure: "test"})
+ assert [48, %{}, "test", [1], nil] = Caller.call(%Call{procedure: "test", arg_list: [1]})
+
+ assert [48, %{ok: 1}, "test", [1], %{test: 1}] =
+ Caller.call(%Call{
+ procedure: "test",
+ arg_list: [1],
+ arg_kw: %{test: 1},
+ options: %{ok: 1}
+ })
end
test "Peer.add" do
end
test "Peer.hello" do
- assert [1, "test", %{roles: %{callee: %{}}}] = Peer.hello("test", [Callee])
+ assert [1, "test", %{agent: "WAMPex", roles: %{callee: %{}}}] =
+ Peer.hello(%Hello{realm: "test", roles: [Callee]})
end
test "Peer.gooodbye" do
- assert [6, %{}, "test"] = Peer.goodbye("test")
+ assert [6, %{}, "test"] = Peer.goodbye(%Goodbye{reason: "test"})
end
test "Publisher.add" do
end
test "Publisher.publish" do
- assert [16, %{}, "test"] = Publisher.publish("test")
- assert [16, %{}, "test", []] = Publisher.publish("test", [])
- assert [16, %{}, "test", [], %{}] = Publisher.publish("test", [], %{})
+ assert [16, %{}, "test", nil, nil] = Publisher.publish(%Publish{topic: "test"})
+ assert [16, %{}, "test", [1], nil] = Publisher.publish(%Publish{topic: "test", arg_list: [1]})
+
+ assert [16, %{test: 2}, "test", [1], %{test: 1}] =
+ Publisher.publish(%Publish{
+ topic: "test",
+ arg_list: [1],
+ arg_kw: %{test: 1},
+ options: %{test: 2}
+ })
end
test "Subscriber.add" do
end
test "Subscriber.subscribe" do
- assert [32, %{}, "test"] = Subscriber.subscribe("test")
+ assert [32, %{test: 1}, "test"] =
+ Subscriber.subscribe(%Subscribe{topic: "test", options: %{test: 1}})
end
test "Subscriber.unsubscribe" do
- assert [34, 1234] = Subscriber.unsubscribe(1234)
+ assert [34, 1234] = Subscriber.unsubscribe(%Unsubscribe{subscription_id: 1234})
end
test "callee registration" do
{:ok, ["ok"], %{"color" => "#FFFFFF"}} =
Wampex.send_request(
caller_name,
- Caller.call("com.actuator.#{@device}.light", [1], %{color: "#FFFFFF"})
+ Caller.call(%Call{
+ procedure: "com.actuator.#{@device}.light",
+ arg_list: [1],
+ arg_kw: %{color: "#FFFFFF"}
+ })
)
assert_receive {:invocation, _, _, _, _, _}
Wampex.cast_send_request(
TestPublisher,
- Publisher.publish("com.data.temp", [12.5, 45.6, 87.5], %{loc: "60645"})
+ Publisher.publish(%Publish{
+ topic: "com.data.temp",
+ arg_list: [12.5, 45.6, 87.5],
+ arg_kw: %{loc: "60645"}
+ })
)
assert_receive {:event, _, _, _, _, _}