--- /dev/null
+use Mix.Config
+
+config :wampex, state_machine: "priv/session.json"
roles: @roles
}
+ defmodule TestCallee do
+ use GenServer
+ require Logger
+ alias Wampex.Role.Callee
+
+ @device "s87d6f8s7df6"
+
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, :ok)
+ end
+
+ def init(:ok) do
+ Wampex.register(TestCalleeSession, "com.actuator.#{@device}.light")
+ end
+
+ def handle_info({:invocation, {id, _reg, _arg_l, arg_kw} = data, procedure}, state) do
+ Logger.info("Got invocation #{inspect(data)} from procedure #{inspect(procedure)}")
+
+ Wampex.cast_send_request(
+ TestCalleeSession,
+ Callee.yield(id, [:ok], %{color: Map.get(arg_kw, "color")})
+ )
+
+ {:noreply, state}
+ end
+ end
+
+ defmodule TestSubscriber do
+ use GenServer
+ require Logger
+
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, :ok)
+ end
+
+ def init(:ok) do
+ Wampex.subscribe(TestSubscriberSession, "com.data.temp")
+ end
+
+ def handle_info({:event, data, topic}, state) do
+ Logger.info("Got event #{inspect(data)} from topic #{inspect(topic)}")
+ {:noreply, state}
+ end
+ end
+
def register do
- Wampex.start_link(TestCallee, @session)
+ Wampex.start_link(TestCalleeSession, @session)
:timer.sleep(500)
- Wampex.send_request(TestCallee, Callee.register("com.actuator.#{@device}.light"))
+ TestCallee.start_link([])
end
def call do
end
def subscribe do
- Wampex.start_link(TestSubscriber, @session)
+ Wampex.start_link(TestSubscriberSession, @session)
:timer.sleep(500)
- Wampex.send_request(TestSubscriber, Subscriber.subscribe("com.data.temp"))
+ Enum.each(1..5, fn _ -> TestSubscriber.start_link([]) end)
end
def publish do
:timer.sleep(500)
Enum.each(1..10, fn _ ->
- Wampex.send_request(
+ Wampex.cast_send_request(
TestPublisher,
Publisher.publish("com.data.temp", [12.5, 45.6, 87.5], %{loc: "60645"}, %{})
)
use Supervisor
alias Wampex.Session, as: Sess
+ alias Wampex.Role.{Callee, Subscriber}
def start_link(name, %Sess{} = session_data) when is_atom(name) do
Supervisor.start_link(__MODULE__, {name, session_data}, name: name)
end
def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do
- session_name = session_name(name)
+ session = session_name(name)
subscriber_registry = subscriber_registry_name(name)
- transport_name = transport_name(name)
- session_data = %Sess{session_data | transport_pid: transport_name}
+ callee_registry = callee_registry_name(name)
+ transport = transport_name(name)
+ session_data = %Sess{session_data | name: name, transport_pid: transport}
children = [
- {Sess, [{:local, session_name}, session_data, []]},
- {t,
- [url: url, session: session_name, protocol: p, serializer: s, opts: [name: transport_name]]},
- {Registry, [keys: :duplicate, name: subscriber_registry]}
+ {Sess, [{:local, session}, session_data, []]},
+ {t, [url: url, session: session, protocol: p, serializer: s, opts: [name: transport]]},
+ {Registry,
+ [keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]},
+ {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]}
]
Supervisor.init(children, strategy: :one_for_all)
def session_name(name), do: Module.concat([name, Session])
def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry])
+ def callee_registry_name(name), do: Module.concat([name, CalleeRegistry])
def transport_name(name), do: Module.concat([name, Transport])
- def send_request(name, request) do
- Sess.send_request(session_name(name), request)
+ def subscribe(name, topic, timeout \\ 5000) do
+ {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(topic), timeout)
+ Registry.register(subscriber_registry_name(name), id, topic)
+ {:ok, id}
+ end
+
+ def register(name, procedure, timeout \\ 5000) do
+ {:ok, id} = Sess.send_request(session_name(name), Callee.register(procedure), timeout)
+ Registry.register(callee_registry_name(name), id, procedure)
+ {:ok, id}
+ end
+
+ def send_request(name, request, timeout \\ 5000) do
+ Sess.send_request(session_name(name), request, timeout)
+ end
+
+ def cast_send_request(name, request) do
+ Sess.cast_send_request(session_name(name), request)
end
end
defmodule Wampex.Messages do
- alias StatesLanguage, as: SL
- alias Wampex.Session
-
- require Logger
-
- @welcome 2
- @abort 3
- @goodbye 6
- @error 8
- @published 17
- @subscribed 33
- @unsubscribed 35
- @event 36
- @result 50
- @registered 65
- @unregistered 67
- @invocation 68
-
- def handle([@welcome, session_id, _dets], %SL{data: %Session{} = data} = sl) do
- {%SL{sl | data: %Session{data | id: session_id}}, [{:next_event, :internal, :noop}]}
- end
-
- def handle([@abort, _dets, reason], sl) do
- Logger.warn("Session aborted: #{reason}")
- {sl, [{:next_event, :internal, :abort}]}
- end
-
- def handle([@goodbye, _dets, reason], sl) do
- Logger.warn("Goodbye: #{reason}")
- {sl, [{:next_event, :internal, :goodbye}]}
- end
-
- def handle([@error, type, id, dets, error], sl) do
- handle([@error, type, id, dets, error, [], %{}], sl)
- end
-
- def handle([@error, type, id, dets, error, arg_l], sl) do
- handle([@error, type, id, dets, error, arg_l, %{}], sl)
- end
-
- def handle([@error, type, id, dets, error, arg_l, arg_kw], sl) do
- Logger.error("""
- Request Type: #{type}
- Id: #{id}
- Details: #{inspect(dets)}
- Error: #{error}
- ArgList: #{inspect(arg_l)}
- ArgKW: #{inspect(arg_kw)}
- """)
-
- {sl, [{:next_event, :internal, :noop}]}
- end
-
- def handle([@published, _request_id, id], sl) do
- Logger.info("Published: #{id}")
- {sl, [{:next_event, :internal, :noop}]}
- end
-
- def handle([@subscribed, _request_id, id], sl) do
- Logger.info("Subscribed: #{id}")
- {sl, [{:next_event, :internal, :noop}]}
- end
-
- def handle([@unsubscribed, request_id], sl) do
- Logger.info("Unsubscribed: #{request_id}")
- {sl, [{:next_event, :internal, :noop}]}
- end
-
- def handle([@event, sub_id, pub_id, dets], sl) do
- handle([@event, sub_id, pub_id, dets, [], %{}], sl)
- end
-
- def handle([@event, sub_id, pub_id, dets, arg_l], sl) do
- handle([@event, sub_id, pub_id, dets, arg_l, %{}], sl)
- end
-
- def handle([@event, sub_id, pub_id, _dets, arg_l, arg_kw], sl) do
- Logger.info(
- "Got event for #{sub_id} from #{pub_id} with args #{inspect(arg_l)} and #{inspect(arg_kw)}"
- )
-
- {sl, [{:next_event, :internal, :noop}]}
- end
-
- def handle([@result, id, dets], sl) do
- handle([@result, id, dets, [], %{}], sl)
- end
-
- def handle([@result, id, dets, arg_l], sl) do
- handle([@result, id, dets, arg_l, %{}], sl)
- end
-
- def handle([@result, id, _dets, arg_l, arg_kw], sl) do
- Logger.info("Received result for request #{id} #{inspect(arg_l)}, #{inspect(arg_kw)}")
- {sl, [{:next_event, :internal, :noop}]}
- end
-
- def handle([@unregistered, request_id], sl) do
- Logger.info("Unregistered #{request_id}")
- {sl, [{:next_event, :internal, :noop}]}
- end
-
- def handle([@registered, _request_id, id], sl) do
- Logger.info("Registered #{id}")
- {sl, [{:next_event, :internal, :noop}]}
- end
-
- def handle([@invocation, id, dets], sl) do
- handle([@invocation, id, dets, [], %{}], sl)
- end
-
- def handle([@invocation, id, dets, arg_l], sl) do
- handle([@invocation, id, dets, arg_l, %{}], sl)
- end
-
- def handle([@invocation, id, _dets, arg_l, arg_kw], sl) do
- Logger.info("Received invocation for request #{id} #{inspect(arg_l)}, #{inspect(arg_kw)}")
- {sl, [{:next_event, :internal, :noop}]}
+ def handle(msg, data, roles) do
+ Enum.reduce_while(roles, nil, fn r, _ ->
+ try do
+ res = r.handle(msg, data)
+ {:halt, res}
+ rescue
+ FunctionClauseError -> {:cont, nil}
+ end
+ end)
end
end
defmodule Wampex.Role.Callee do
+ @moduledoc false
+
@register 64
+ @registered 65
@unregister 66
+ @unregistered 67
+ @invocation 68
+ @yield 70
def add(roles) do
Map.put(roles, :callee, %{})
def unregister(id) do
[@unregister, id]
end
+
+ def yield(request_id) do
+ [@yield, request_id, %{}]
+ end
+
+ def yield(request_id, arg_l) do
+ [@yield, request_id, %{}, arg_l]
+ end
+
+ def yield(request_id, arg_l, arg_kw) do
+ [@yield, request_id, %{}, arg_l, arg_kw]
+ end
+
+ def handle([@unregistered, request_id], sl) do
+ {sl, [{:next_event, :internal, :noop}], request_id, {:ok, request_id}}
+ end
+
+ def handle([@registered, request_id, id], sl) do
+ {sl, [{:next_event, :internal, :noop}], request_id, {:ok, id}}
+ end
+
+ def handle([@invocation, id, reg_id, dets], sl) do
+ handle([@invocation, id, reg_id, dets, [], %{}], sl)
+ end
+
+ def handle([@invocation, id, reg_id, dets, arg_l], sl) do
+ handle([@invocation, id, dets, reg_id, arg_l, %{}], sl)
+ end
+
+ def handle([@invocation, id, reg_id, _dets, arg_l, arg_kw], sl) do
+ {sl, [{:next_event, :internal, :invocation}], id,
+ {:update, :invocation, {id, reg_id, arg_l, arg_kw}}}
+ end
end
defmodule Wampex.Role.Caller do
+ @moduledoc false
+
@call 48
+ @result 50
def add(roles) do
Map.put(roles, :caller, %{})
def call(procedure, args_l, args_kw, opts) do
[@call, opts, procedure, args_l, args_kw]
end
+
+ def handle([@result, id, dets], sl) do
+ handle([@result, id, dets, [], %{}], sl)
+ end
+
+ def handle([@result, id, dets, arg_l], sl) do
+ handle([@result, id, dets, arg_l, %{}], sl)
+ end
+
+ def handle([@result, id, _dets, arg_l, arg_kw], sl) do
+ {sl, [{:next_event, :internal, :noop}], id, {:ok, arg_l, arg_kw}}
+ end
end
--- /dev/null
+defmodule Wampex.Role.Peer do
+ @moduledoc false
+ alias StatesLanguage, as: SL
+ alias Wampex.Session
+
+ @hello 1
+ @welcome 2
+ @abort 3
+ @goodbye 6
+ @error 8
+
+ def add(roles), do: roles
+
+ def hello(realm, roles) do
+ [@hello, realm, %{roles: Enum.reduce(roles, %{}, fn r, acc -> r.add(acc) end)}]
+ end
+
+ def handle([@welcome, session_id, _dets], %SL{data: %Session{} = data} = sl) do
+ {%SL{sl | data: %Session{data | id: session_id}}, [{:next_event, :internal, :noop}], nil,
+ {:ok, session_id}}
+ end
+
+ def handle([@abort, _dets, reason], sl) do
+ {sl, [{:next_event, :internal, :abort}], nil, {:error, reason}}
+ end
+
+ def handle([@goodbye, _dets, reason], sl) do
+ {sl, [{:next_event, :internal, :goodbye}], nil, {:goodbye, reason}}
+ end
+
+ def handle([@error, type, id, dets, error], sl) do
+ handle([@error, type, id, dets, error, [], %{}], sl)
+ end
+
+ def handle([@error, type, id, dets, error, arg_l], sl) do
+ handle([@error, type, id, dets, error, arg_l, %{}], sl)
+ end
+
+ def handle([@error, type, id, _dets, error, arg_l, arg_kw], sl) do
+ {sl, [{:next_event, :internal, :noop}], id, {:error, type, error, arg_l, arg_kw}}
+ end
+end
defmodule Wampex.Role.Publisher do
+ @moduledoc false
+
@publish 16
+ @published 17
def add(roles) do
Map.put(roles, :publisher, %{})
def publish(topic, args_l, args_kw, opts) do
[@publish, opts, topic, args_l, args_kw]
end
+
+ def handle([@published, request_id, id], sl) do
+ {sl, [{:next_event, :internal, :noop}], request_id, {:ok, id}}
+ end
end
defmodule Wampex.Role.Subscriber do
@moduledoc false
+
@subscribe 32
+ @subscribed 33
@unsubscribe 34
+ @unsubscribed 35
+ @event 36
def add(roles) do
Map.put(roles, :subscriber, %{})
def unsubscribe(sub_id) do
[@unsubscribe, sub_id]
end
+
+ def handle([@subscribed, request_id, id], sl) do
+ {sl, [{:next_event, :internal, :noop}], request_id, {:ok, id}}
+ end
+
+ def handle([@unsubscribed, request_id], sl) do
+ {sl, [{:next_event, :internal, :noop}], request_id, :ok}
+ end
+
+ def handle([@event, sub_id, pub_id, dets], sl) do
+ handle([@event, sub_id, pub_id, dets, [], %{}], sl)
+ end
+
+ def handle([@event, sub_id, pub_id, dets, arg_l], sl) do
+ handle([@event, sub_id, pub_id, dets, arg_l, %{}], sl)
+ end
+
+ def handle([@event, sub_id, pub_id, _dets, arg_l, arg_kw], sl) do
+ {sl, [{:next_event, :internal, :event}], nil,
+ {:update, :event, {sub_id, pub_id, arg_l, arg_kw}}}
+ end
end
@max_id 9_007_199_254_740_992
alias StatesLanguage, as: SL
- alias Wampex.Session, as: Sess
alias Wampex.{Messages, Realm}
+ alias Wampex.Role.{Callee, Peer}
alias Wampex.Serializer.JSON
+ alias Wampex.Session, as: Sess
alias Wampex.Transport.WebSocket
+ @yield 70
+
defstruct [
:id,
:url,
:transport_pid,
:message,
+ :event,
+ :invocation,
:realm,
+ :name,
roles: [],
request_id: 0,
transport: WebSocket,
protocol: "wamp.2.json",
- serializer: JSON
+ serializer: JSON,
+ requests: []
]
def get_request_id(current_id) when current_id == @max_id do
current_id + 1
end
- def send_request(p, request) do
+ def cast_send_request(p, request) do
__MODULE__.cast(p, {:send_request, request})
end
+ def send_request(p, request, timeout) do
+ __MODULE__.call(p, {:send_request, request}, timeout)
+ end
+
+ def do_send(r_id, tt, t, request) do
+ request_id = get_request_id(r_id)
+
+ request =
+ case request do
+ [@yield | _] -> request
+ _ -> List.insert_at(request, 1, request_id)
+ end
+
+ tt.send_request(t, request)
+ request_id
+ end
+
+ def handle_call(
+ {:send_request, request},
+ from,
+ _,
+ %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
+ ) do
+ request_id = do_send(r_id, tt, t, request)
+
+ {:ok,
+ %SL{
+ data
+ | data: %Sess{
+ sess
+ | request_id: request_id,
+ requests: [{request_id, from} | sess.requests]
+ }
+ }, []}
+ end
+
def handle_cast(
{:send_request, request},
- "Established",
+ _,
%SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
) do
- request_id = get_request_id(r_id)
- request = List.insert_at(request, 1, request_id)
- tt.send_request(t, request)
- {:ok, %SL{data | data: %Sess{sess | request_id: request_id}}, []}
+ request_id = do_send(r_id, tt, t, request)
+
+ {:ok,
+ %SL{
+ data
+ | data: %Sess{sess | request_id: request_id, requests: [{request_id, nil} | sess.requests]}
+ }, []}
end
def handle_resource(
data: %Sess{transport: tt, transport_pid: t, roles: roles, realm: %Realm{name: realm}}
} = data
) do
- tt.send_request(
- t,
- [
- 1,
- realm,
- %{roles: Enum.reduce(roles, %{}, fn r, acc -> r.add(acc) end)}
- ]
- )
-
+ tt.send_request(t, Peer.hello(realm, roles))
{:ok, data, [{:next_event, :internal, :hello_sent}]}
end
{:ok, data, []}
end
- def handle_resource("HandleMessage", _, _, %SL{data: %Sess{message: msg}} = data) do
+ def handle_resource(
+ "HandleMessage",
+ _,
+ _,
+ %SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data
+ ) do
Logger.info("Handling Message #{inspect(msg)}")
- {data, actions} = Messages.handle(msg, data)
- {:ok, data, actions}
+ {data, actions, id, response} = Messages.handle(msg, data, roles)
+ {data, response} = maybe_update_response(data, response)
+ {requests, actions} = resp = handle_response(id, actions, requests, response)
+ Logger.info("Response: #{inspect(resp)}")
+ {:ok, %SL{data | data: %Sess{data.data | requests: requests}}, actions}
end
def handle_resource("Abort", _, _, data) do
data
) do
Logger.info("Waiting for transport to connect...")
- {:ok, data, []}
+ {:ok, %SL{data | data: %Sess{data.data | roles: [Peer | data.data.roles]}}, []}
end
def handle_resource("Established", _, "Established", data) do
{:ok, data, []}
end
+ def handle_resource(
+ "HandleEvent",
+ _,
+ "Event",
+ %SL{data: %Sess{name: name, event: event}} = sl
+ ) do
+ Logger.info("Received Event #{inspect(event)}")
+
+ sub = Wampex.subscriber_registry_name(name)
+
+ Registry.dispatch(sub, elem(event, 0), fn entries ->
+ for {pid, topic} <- entries, do: send(pid, {:event, event, topic})
+ end)
+
+ {:ok, sl, [{:next_event, :internal, :transition}]}
+ end
+
+ def handle_resource(
+ "HandleInvocation",
+ _,
+ "Invocation",
+ %SL{data: %Sess{name: name, invocation: invocation}} = sl
+ ) do
+ Logger.info("Received Invocation #{inspect(invocation)}")
+ reg = Wampex.callee_registry_name(name)
+
+ Registry.dispatch(reg, elem(invocation, 1), fn entries ->
+ for {pid, procedure} <- entries, do: send(pid, {:invocation, invocation, procedure})
+ end)
+
+ {:ok, sl, [{:next_event, :internal, :transition}]}
+ end
+
def handle_resource(resource, _, state, data) do
Logger.info("No specific resource handler for #{resource} in state #{state}")
{:ok, data, []}
{:ok, %SL{data | data: %Sess{sess | message: message}},
[{:next_event, :internal, :message_received}]}
end
+
+ def maybe_update_response(data, {:update, key, resp}) do
+ {%SL{data | data: Map.put(data.data, key, resp)}, resp}
+ end
+
+ def maybe_update_response(data, resp), do: {data, resp}
+
+ def handle_response(nil, actions, requests, _), do: {requests, actions}
+
+ def handle_response(id, actions, requests, response) do
+ Enum.reduce_while(requests, {requests, actions}, fn
+ {^id, nil}, _ ->
+ {:halt, {remove_request(id, requests), actions}}
+
+ {^id, from}, _ ->
+ {:halt, {remove_request(id, requests), [{:reply, from, response} | actions]}}
+
+ {_, _}, acc ->
+ {:cont, acc}
+ end)
+ end
+
+ def remove_request(id, requests) do
+ Enum.filter(requests, fn
+ {^id, _} -> false
+ {_id, _} -> true
+ end)
+ end
end