--- /dev/null
+defmodule Wampex.Test do
+ alias Wampex.{Realm, Session}
+ alias Wampex.Role.{Callee, Caller, Publisher, Subscriber}
+
+ @url "ws://localhost:18080/ws"
+ @realm %Realm{name: "com.myrealm"}
+ @roles [Callee, Caller, Publisher, Subscriber]
+
+ def subscribe do
+ sess = %Session{url: @url, realm: @realm, roles: @roles}
+ Wampex.start_link(TestSubscriber, sess)
+ :timer.sleep(500)
+ Wampex.send_request(TestSubscriber, Subscriber.subscribe("com.data.temp"))
+ end
+
+ def publish do
+ sess = %Session{url: @url, realm: @realm, roles: @roles}
+ Wampex.start_link(TestPublisher, sess)
+ :timer.sleep(500)
+
+ Enum.each(1..10, fn _ ->
+ Wampex.send_request(
+ TestPublisher,
+ Publisher.publish("com.data.temp", [12.5, 45.6, 87.5], %{loc: "60645"}, %{})
+ )
+
+ :timer.sleep(500)
+ end)
+ end
+end
@moduledoc """
Documentation for Wampex.
"""
- alias Wampex.Session
+ use Supervisor
- def test do
- url = "ws://localhost:18080/ws"
- sess = %Session{url: url}
- Session.start_link(sess)
+ alias Wampex.Session, as: Sess
+
+ def start_link(name, %Sess{} = session_data) when is_atom(name) do
+ Supervisor.start_link(__MODULE__, {name, session_data}, name: name)
+ end
+
+ def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do
+ session_name = session_name(name)
+ subscriber_registry = subscriber_registry_name(name)
+ transport_name = transport_name(name)
+ session_data = %Sess{session_data | transport_pid: transport_name}
+
+ children = [
+ {Sess, [{:local, session_name}, session_data, []]},
+ {t,
+ [url: url, session: session_name, protocol: p, serializer: s, opts: [name: transport_name]]},
+ {Registry, [keys: :duplicate, name: subscriber_registry]}
+ ]
+
+ Supervisor.init(children, strategy: :one_for_all)
+ end
+
+ def session_name(name), do: Module.concat([name, Session])
+ def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry])
+ def transport_name(name), do: Module.concat([name, Transport])
+
+ def send_request(name, request) do
+ Sess.send_request(session_name(name), request)
end
end
--- /dev/null
+defmodule Wampex.Realm do
+ @moduledoc false
+
+ defstruct [:name, :authentication]
+end
--- /dev/null
+defmodule Wampex.Role.Callee do
+ def add(roles) do
+ Map.put(roles, :callee, %{})
+ end
+end
--- /dev/null
+defmodule Wampex.Role.Caller do
+ def add(roles) do
+ Map.put(roles, :caller, %{})
+ end
+end
--- /dev/null
+defmodule Wampex.Role.Publisher do
+ @publish 16
+
+ def add(roles) do
+ Map.put(roles, :publisher, %{})
+ end
+
+ def publish(topic) do
+ [@publish, %{}, topic]
+ end
+
+ def publish(topic, opts) do
+ [@publish, opts, topic]
+ end
+
+ def publish(topic, args_l, opts) do
+ [@publish, opts, topic, args_l]
+ end
+
+ def publish(topic, args_l, args_kw, opts) do
+ [@publish, opts, topic, args_l, args_kw]
+ end
+end
--- /dev/null
+defmodule Wampex.Role.Subscriber do
+ @moduledoc false
+ @subscribe 32
+ @unsubscribe 34
+
+ def add(roles) do
+ Map.put(roles, :subscriber, %{})
+ end
+
+ def subscribe(topic, opts \\ %{}) when is_binary(topic) and is_map(opts) do
+ [@subscribe, opts, topic]
+ end
+
+ def unsubscribe(sub_id) do
+ [@unsubscribe, sub_id]
+ end
+end
defmodule Wampex.Session do
use StatesLanguage, data: "priv/session.json"
+ @max_id 9_007_199_254_740_992
+
alias StatesLanguage, as: SL
alias Wampex.Session, as: Sess
+ alias Wampex.Realm
alias Wampex.Serializer.JSON
alias Wampex.Transport.WebSocket
:url,
:transport_pid,
:message,
- request_id: 1,
+ :realm,
+ roles: [],
+ request_id: 0,
transport: WebSocket,
protocol: "wamp.2.json",
serializer: JSON
]
- def get_id do
- Enum.random(1..9_007_199_254_740_992)
+ def get_request_id(current_id) when current_id == @max_id do
+ 1
+ end
+
+ def get_request_id(current_id) do
+ current_id + 1
+ end
+
+ def send_request(p, request) do
+ __MODULE__.cast(p, {:send_request, request})
+ end
+
+ def handle_cast(
+ {:send_request, request},
+ "Established",
+ %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
+ ) do
+ request_id = get_request_id(r_id)
+ request = List.insert_at(request, 1, request_id)
+ tt.send_request(t, request)
+ {:ok, %SL{data | data: %Sess{sess | request_id: request_id}}, []}
end
def handle_resource(
"Hello",
_,
"Init",
- %SL{data: %Sess{transport: tt, transport_pid: t}} = data
+ %SL{
+ data: %Sess{transport: tt, transport_pid: t, roles: roles, realm: %Realm{name: realm}}
+ } = data
) do
- tt.send_message(
+ tt.send_request(
t,
[
1,
- "com.myrealm",
- %{roles: %{publisher: %{}, subscriber: %{}, caller: %{}, callee: %{}}}
+ realm,
+ %{roles: Enum.reduce(roles, %{}, fn r, acc -> r.add(acc) end)}
]
)
end
def handle_resource("HandleMessage", _, _, data) do
- Logger.info("Handling Message")
+ Logger.info("Handling Message #{inspect(data.data.message)}")
{:ok, data, [{:next_event, :internal, :noop}]}
end
"InitTransport",
_,
"WaitForTransport",
- %SL{data: %Sess{url: url, transport: t, serializer: s, protocol: p} = sess} = data
+ data
) do
- {:ok, pid} = t.start_link(url, self(), p, s)
- {:ok, %SL{data | data: %Sess{sess | transport_pid: pid}}, []}
+ Logger.info("Waiting for transport to connect...")
+ {:ok, data, []}
end
def handle_resource("Established", _, "Established", data) do
- Logger.info("Session #{data.data.id} Established")
+ Logger.info("Session #{data.data.id}")
{:ok, data, []}
end
require Logger
@header "Sec-WebSocket-Protocol"
+ @ping_interval 20_000
- def send_message(t, data) do
- WebSockex.cast(t, {:send_message, data})
+ def send_request(t, data) do
+ WebSockex.cast(t, {:send_request, data})
end
def connected?(t, resp) do
end
def start_link(
- url,
- session,
- protocol,
- serializer,
- opts \\ []
+ url: url,
+ session: session,
+ protocol: protocol,
+ serializer: serializer,
+ opts: opts
) do
url
|> Conn.new(extra_headers: [{@header, protocol}])
def handle_connect(%Conn{host: h, port: p, path: pa, query: q} = _conn, state) do
Logger.info("Connected to #{h}:#{p}#{pa}?#{q}")
+ Process.send_after(self(), :ping, @ping_interval)
send(state.session, {:connected, true})
{:ok, %{state | connected: true}}
end
- def handle_cast({:send_message, data}, %{serializer: s} = state) do
+ def handle_ping(_ping, state) do
+ {:reply, :pong, state}
+ end
+
+ def handle_pong(_pong, state) do
+ {:ok, state}
+ end
+
+ def handle_cast({:send_request, data}, %{serializer: s} = state) do
+ Logger.info("Sending Request #{inspect(data)}")
{:reply, {s.data_type(), s.serialize!(data)}, state}
end
{:ok, state}
end
+ def handle_info(:ping, state) do
+ Process.send_after(self(), :ping, @ping_interval)
+ {:reply, :ping, state}
+ end
+
def handle_frame({type, data}, %{serializer: s} = state) do
Logger.info("Received #{type} data #{data}")
data = s.deserialize!(data)
def handle_message(message, state) do
send(state.session, {:set_message, message})
+ {:ok, state}
end
end