- [x] AUTHENTICATE
### RPC
-- [x] CANCEL
-- [x] INTERRUPT
+- [X] CANCEL
+- [X] INTERRUPT
- [x] Progressive Call Results
- [x] Progressive Calls
- [x] Call Canceling
A simple [Session](https://wamp-proto.org/_static/gen/wamp_latest.html#sessions) can be configured like so.
```elixir
-alias Wampex.{Realm, Session}
+alias Wampex.Client.Session
+alias Wampex.Realm
alias Wampex.Role.{Callee, Caller, Publisher, Subscriber}
url = "http://localhost:18080/ws"
roles = [Callee, Caller, Publisher, Subscriber]
session = %Session{url: url, realm: realm, roles: roles}
-Wampex.start_link(name: Connection, session: session)
+Wampex.Client.start_link(name: Connection, session: session)
```
The name can be anything you want, it is required, and must be unique among multiple WAMPex instances. See the [example application](https://gitlab.com/entropealabs/wampex_example_app) for an example of running multiple connections/sessions.
You can override the default serializer and transport like this.
```elixir
-alias Wampex.{Realm, Session}
+alias Wampex.Client.Session
+alias Wampex.Realm
alias Wampex.Role.{Callee, Caller, Publisher, Subscriber}
alias Wampex.Serializer.JSON
alias Wampex.Transport.WebSocket
roles = [Callee, Caller, Publisher, Subscriber]
session = %Session{url: url, realm: realm, roles: roles, protocol: "wamp.2.json", serializer: JSON}
-Wampex.start_link(name: Connection, session: session)
+Wampex.Client.start_link(name: Connection, session: session)
```
The protocol uses a registered [WebSocket subprotocol](https://wamp-proto.org/_static/gen/wamp_latest.html#websocket-transport), there are several values available. You need to ensure that the [Router](https://wamp-proto.org/_static/gen/wamp_latest.html#peers-and-roles) that you connect to also supports the subprotocol.
There's a Docker image available, so we'll use that to get up and running quickly.
-Make sure you are in the `WAMPex` directory before running this command.
+Make sure you are in the `wampex` directory before running this command.
```bash
docker run \
-d leapsight/bondy:0.8.8-slim
```
+### Configure Realm
+
+```bash
+curl -X "POST" "http://localhost:18081/realms/" \
+ -H 'Content-Type: application/json; charset=utf-8' \
+ -H 'Accept: application/json; charset=utf-8' \
+ -d $'{
+ "uri": "com.myrealm",
+ "description": "Test",
+ "security_enabled" : false
+}'
+```
+
This will setup a default realm `com.myrealm`.
This config allows anonymous users as well as authenticated ones. In reality you wouldn't want to enable both, but for testing, it's useful.
"password": "test1234"
}'
```
+
+### Run the tests
+
+Warning as errors, mix format, Dialyzer, Credo and Coveralls
+
+```bash
+$ MIX_ENV=test mix all_tests
+```
+
+Happy hacking ;)
{
"coverage_options": {
- "minimum_coverage": 80.8
+ "minimum_coverage": 77.6
},
"skip_files": [
"test/support"
--- /dev/null
+defmodule Wampex.Client do
+ @moduledoc """
+ Documentation for Wampex.
+ """
+ use Supervisor
+
+ alias Wampex.Client.Session, as: Sess
+ alias Wampex.Roles.{Callee, Subscriber}
+ alias Wampex.Roles.Callee.Register
+ alias Wampex.Roles.Subscriber.Subscribe
+
+ @spec start_link(name: atom(), session_data: Sess.t()) ::
+ {:ok, pid()}
+ | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
+ def start_link(name: name, session: session_data) when is_atom(name) do
+ Supervisor.start_link(__MODULE__, {name, session_data}, name: name)
+ end
+
+ @spec init({atom(), Sess.t()}) ::
+ {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
+ def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do
+ session = session_name(name)
+ subscriber_registry = subscriber_registry_name(name)
+ callee_registry = callee_registry_name(name)
+ transport = transport_name(name)
+ session_data = %Sess{session_data | name: name, transport_pid: transport}
+
+ children = [
+ {Sess, [{:local, session}, session_data, []]},
+ {t, [url: url, session: session, protocol: p, serializer: s, opts: [name: transport]]},
+ {Registry,
+ [keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]},
+ {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]}
+ ]
+
+ Supervisor.init(children, strategy: :one_for_all, max_restarts: 0)
+ end
+
+ @spec session_name(module()) :: module()
+ def session_name(name), do: Module.concat([name, Session])
+ @spec subscriber_registry_name(module()) :: module()
+ def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry])
+ @spec callee_registry_name(module()) :: module()
+ def callee_registry_name(name), do: Module.concat([name, CalleeRegistry])
+ @spec transport_name(module()) :: module()
+ def transport_name(name), do: Module.concat([name, Transport])
+
+ @spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) ::
+ {:ok, integer()}
+ def subscribe(name, %Subscribe{topic: t} = sub, timeout \\ 5000) do
+ {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(sub), timeout)
+ Registry.register(subscriber_registry_name(name), id, t)
+ {:ok, id}
+ end
+
+ @spec register(name :: module(), register :: Register.t(), timeout :: integer()) ::
+ {:ok, integer()}
+ def register(name, %Register{procedure: p} = reg, timeout \\ 5000) do
+ {:ok, id} = Sess.send_request(session_name(name), Callee.register(reg), timeout)
+ Registry.register(callee_registry_name(name), id, p)
+ {:ok, id}
+ end
+
+ @spec send_request(name :: module(), request :: Wampex.message(), timeout :: integer()) ::
+ term()
+ def send_request(name, request, timeout \\ 5000) do
+ Sess.send_request(session_name(name), request, timeout)
+ end
+
+ @spec cast_send_request(name :: module(), Wampex.message()) :: :ok
+ def cast_send_request(name, request) do
+ Sess.cast_send_request(session_name(name), request)
+ end
+end
-defmodule Wampex.Session do
+defmodule Wampex.Client.Session do
@moduledoc """
A state machine based process for managing a WAMP Session. Utilizes StatesLanguage to implement the gen_statem process. See priv/session.json for the JSON representation
"""
- use StatesLanguage, data: "priv/session.json"
+ use StatesLanguage, data: "priv/client.json"
@max_id 9_007_199_254_740_992
alias StatesLanguage, as: SL
alias Wampex.Realm
- alias Wampex.Role.Peer
- alias Wampex.Role.Peer.{Authenticate, Hello}
- alias Wampex.Serializer.MessagePack
+ alias Wampex.Roles.Peer
+ alias Wampex.Roles.Peer.{Authenticate, Hello}
+ alias Wampex.Serializers.MessagePack
alias __MODULE__, as: Sess
- alias Wampex.Transport.WebSocket
+ alias Wampex.Client
+ alias Wampex.Client.Transport.WebSocket
@yield 70
) do
Logger.debug("Received Event #{inspect(event)}")
- sub = Wampex.subscriber_registry_name(name)
+ sub = Client.subscriber_registry_name(name)
Registry.dispatch(sub, elem(event, 1), fn entries ->
for {pid, _topic} <- entries, do: send(pid, event)
%SL{data: %Sess{name: name, invocation: invocation}} = sl
) do
Logger.debug("Received Invocation #{inspect(invocation)}")
- reg = Wampex.callee_registry_name(name)
+ reg = Client.callee_registry_name(name)
Registry.dispatch(reg, elem(invocation, 2), fn entries ->
for {pid, _procedure} <- entries, do: send(pid, invocation)
@interrupt,
%SL{data: %Sess{interrupt: {id, opts}, name: name}} = data
) do
- [{callee, _procedure}] = Registry.lookup(Wampex.callee_registry_name(name), id)
+ [{callee, _procedure}] = Registry.lookup(Client.callee_registry_name(name), id)
send(callee, {:interrupt, id, opts})
{:ok, data, [{:next_event, :internal, :transition}]}
end
-defmodule Wampex.Transport.WebSocket do
+defmodule Wampex.Client.Transport.WebSocket do
use WebSockex
@behaviour Wampex.Transport
-defmodule Wampex.Role.Callee do
+defmodule Wampex.Roles.Callee do
@moduledoc """
Handles requests and responses for a Callee
"""
-defmodule Wampex.Role.Caller do
+defmodule Wampex.Roles.Caller do
@moduledoc """
Handles requests and responses for a Caller
"""
-defmodule Wampex.Role.Peer do
+defmodule Wampex.Roles.Peer do
@moduledoc """
Handles requests and responses for low-level Peer/Session interactions
"""
-defmodule Wampex.Role.Publisher do
+defmodule Wampex.Roles.Publisher do
@moduledoc """
Handles requests and responses for Publishers
"""
-defmodule Wampex.Role.Subscriber do
+defmodule Wampex.Roles.Subscriber do
@moduledoc """
Handles requests and responses for Subscribers
"""
--- /dev/null
+defmodule Wampex.Router do
+end
-defmodule Wampex.Serializer.JSON do
+defmodule Wampex.Serializers.JSON do
@moduledoc "JSON Serializer"
@behaviour Wampex.Serializer
-defmodule Wampex.Serializer.MessagePack do
+defmodule Wampex.Serializers.MessagePack do
@moduledoc "MessgePack Serializer"
@behaviour Wampex.Serializer
@callback send_request(transport :: atom() | pid(), message :: Wampex.message()) :: :ok
@callback start_link(
url: binary(),
- session: Wampex.Session.t(),
+ session: Wampex.Client.Session.t(),
protocol: binary(),
serializer: module(),
opts: []
defmodule Wampex do
@moduledoc """
- Documentation for Wampex.
+ Types for wampex
"""
- use Supervisor
-
- alias Wampex.Session, as: Sess
- alias Wampex.Role.{Callee, Subscriber}
- alias Wampex.Role.Callee.Register
- alias Wampex.Role.Subscriber.Subscribe
-
@type message_part :: integer() | binary() | map() | list()
@type message :: nonempty_list(message_part())
@type arg_list :: [] | nonempty_list(any())
| {:ok, details :: map(), arg_list :: arg_list(), arg_keyword :: arg_keyword()}
| error()
| event()
-
- @spec start_link(name: atom(), session_data: Sess.t()) ::
- {:ok, pid()}
- | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
- def start_link(name: name, session: session_data) when is_atom(name) do
- Supervisor.start_link(__MODULE__, {name, session_data}, name: name)
- end
-
- @spec init({atom(), Sess.t()}) ::
- {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
- def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do
- session = session_name(name)
- subscriber_registry = subscriber_registry_name(name)
- callee_registry = callee_registry_name(name)
- transport = transport_name(name)
- session_data = %Sess{session_data | name: name, transport_pid: transport}
-
- children = [
- {Sess, [{:local, session}, session_data, []]},
- {t, [url: url, session: session, protocol: p, serializer: s, opts: [name: transport]]},
- {Registry,
- [keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]},
- {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]}
- ]
-
- Supervisor.init(children, strategy: :one_for_all, max_restarts: 0)
- end
-
- @spec session_name(module()) :: module()
- def session_name(name), do: Module.concat([name, Session])
- @spec subscriber_registry_name(module()) :: module()
- def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry])
- @spec callee_registry_name(module()) :: module()
- def callee_registry_name(name), do: Module.concat([name, CalleeRegistry])
- @spec transport_name(module()) :: module()
- def transport_name(name), do: Module.concat([name, Transport])
-
- @spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) ::
- {:ok, integer()}
- def subscribe(name, %Subscribe{topic: t} = sub, timeout \\ 5000) do
- {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(sub), timeout)
- Registry.register(subscriber_registry_name(name), id, t)
- {:ok, id}
- end
-
- @spec register(name :: module(), register :: Register.t(), timeout :: integer()) ::
- {:ok, integer()}
- def register(name, %Register{procedure: p} = reg, timeout \\ 5000) do
- {:ok, id} = Sess.send_request(session_name(name), Callee.register(reg), timeout)
- Registry.register(callee_registry_name(name), id, p)
- {:ok, id}
- end
-
- @spec send_request(name :: module(), request :: message(), timeout :: integer()) :: term()
- def send_request(name, request, timeout \\ 5000) do
- Sess.send_request(session_name(name), request, timeout)
- end
-
- @spec cast_send_request(name :: module(), message()) :: :ok
- def cast_send_request(name, request) do
- Sess.cast_send_request(session_name(name), request)
- end
end
@moduledoc false
use GenServer
require Logger
- alias Wampex.Role.Callee
+ alias Wampex.Client
+ alias Wampex.Roles.Callee
alias Callee.{Register, Unregister, Yield}
def start_link(test, name, device) do
end
def init({test, name, device}) do
- {:ok, reg} = Wampex.register(name, %Register{procedure: "com.actuator.#{device}.light"})
+ {:ok, reg} = Client.register(name, %Register{procedure: "com.actuator.#{device}.light"})
send(test, {:registered, reg})
{:ok, {test, name, reg}}
end
send(test, invocation)
- Wampex.cast_send_request(
+ Client.cast_send_request(
name,
Callee.yield(%Yield{
request_id: id,
end
def terminate(_reason, {_test, name, reg}) do
- Wampex.send_request(
+ Client.send_request(
name,
Callee.unregister(%Unregister{registration_id: reg})
)
@moduledoc false
use GenServer
require Logger
- alias Wampex.Role.Subscriber
+ alias Wampex.Client
+ alias Wampex.Roles.Subscriber
alias Subscriber.{Subscribe, Unsubscribe}
def start_link(test, name, topic) do
end
def init({test, name, topic}) do
- {:ok, sub} = Wampex.subscribe(name, %Subscribe{topic: topic})
+ {:ok, sub} = Client.subscribe(name, %Subscribe{topic: topic})
send(test, {:subscribed, sub})
{:ok, {test, name, sub}}
end
end
def terminate(_r, {_test, name, sub}) do
- Wampex.send_request(
+ Client.send_request(
name,
Subscriber.unsubscribe(%Unsubscribe{subscription_id: sub})
)
use ExUnit.Case, async: true
doctest Wampex
- alias Wampex.{Authentication, Realm, Session}
- alias Wampex.Role.{Callee, Caller, Peer, Publisher, Subscriber}
- alias Wampex.Serializer.{JSON, MessagePack}
+ alias Wampex.{Authentication, Realm}
+ alias Wampex.Client
+ alias Wampex.Roles.{Callee, Caller, Peer, Publisher, Subscriber}
+ alias Wampex.Serializers.{JSON, MessagePack}
alias Callee.{Register, Unregister, Yield}
alias Caller.Call
+ alias Client.Session
alias Peer.{Authenticate, Goodbye, Hello}
alias Publisher.Publish
alias Subscriber.{Subscribe, Unsubscribe}
@session %Session{url: @url, realm: @realm, roles: @roles}
test "session_name" do
- assert Test.Session = Wampex.session_name(Test)
+ assert Test.Session = Client.session_name(Test)
end
test "subscriber_registry_name" do
- assert Test.SubscriberRegistry = Wampex.subscriber_registry_name(Test)
+ assert Test.SubscriberRegistry = Client.subscriber_registry_name(Test)
end
test "callee_registry_name" do
- assert Test.CalleeRegistry = Wampex.callee_registry_name(Test)
+ assert Test.CalleeRegistry = Client.callee_registry_name(Test)
end
test "transport_name" do
- assert Test.Transport = Wampex.transport_name(Test)
+ assert Test.Transport = Client.transport_name(Test)
end
test "Callee.add" do
test "callee registration" do
name = TestCalleeRegistration
- Wampex.start_link(name: name, session: @session)
+ Client.start_link(name: name, session: @session)
TestCallee.start_link(self(), name, @device)
assert_receive {:registered, id}
end
test "authentication" do
callee_name = TestCalleeRespondAuthenticated
session = %Session{@session | realm: %Realm{@session.realm | authentication: @auth}}
- Wampex.start_link(name: callee_name, session: session)
+ Client.start_link(name: callee_name, session: session)
TestCallee.start_link(self(), callee_name, @device)
assert_receive {:registered, id}
end
test "abort" do
Process.flag(:trap_exit, true)
callee_name = TestAbort
- {:ok, pid} = Wampex.start_link(name: callee_name, session: @session)
- Wampex.cast_send_request(callee_name, Peer.hello(%Hello{realm: "test", roles: [Callee]}))
+ {:ok, pid} = Client.start_link(name: callee_name, session: @session)
+ Client.cast_send_request(callee_name, Peer.hello(%Hello{realm: "test", roles: [Callee]}))
assert_receive {:EXIT, ^pid, :shutdown}
end
test "callee is invoked and responds and caller gets result" do
callee_name = TestCalleeRespond
- Wampex.start_link(name: callee_name, session: @session)
+ Client.start_link(name: callee_name, session: @session)
TestCallee.start_link(self(), callee_name, @device)
assert_receive {:registered, id}
caller_name = TestCaller
- Wampex.start_link(name: caller_name, session: @session)
+ Client.start_link(name: caller_name, session: @session)
{:ok, %{}, ["ok"], %{"color" => "#FFFFFF"}} =
- Wampex.send_request(
+ Client.send_request(
caller_name,
Caller.call(%Call{
procedure: "com.actuator.#{@device}.light",
test "subscriber registration" do
name = TestSubscriberRegister
- Wampex.start_link(name: name, session: @session)
+ Client.start_link(name: name, session: @session)
TestSubscriber.start_link(self(), name, "com.data.temp")
assert_receive {:subscribed, id}
end
test "subscriber receives events from publisher" do
name = TestSubscriberEvents
- Wampex.start_link(name: name, session: @session)
+ Client.start_link(name: name, session: @session)
TestSubscriber.start_link(self(), name, "com.data.temp")
assert_receive {:subscribed, id}
- Wampex.start_link(name: TestPublisher, session: @session)
+ Client.start_link(name: TestPublisher, session: @session)
- Wampex.cast_send_request(
+ Client.cast_send_request(
TestPublisher,
Publisher.publish(%Publish{
topic: "com.data.temp",