From: Christopher Date: Mon, 17 Feb 2020 19:30:53 +0000 (-0600) Subject: adds Registry based dispatch for invocations and subscriptions X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=bd948eb78c7888457d65bd6670b34d77c7643ab1;p=wampex.git adds Registry based dispatch for invocations and subscriptions --- diff --git a/config/config.exs b/config/config.exs new file mode 100644 index 0000000..19932f3 --- /dev/null +++ b/config/config.exs @@ -0,0 +1,3 @@ +use Mix.Config + +config :wampex, state_machine: "priv/session.json" diff --git a/lib/test.ex b/lib/test.ex index d995ea1..92fd049 100644 --- a/lib/test.ex +++ b/lib/test.ex @@ -16,10 +16,55 @@ defmodule Test do roles: @roles } + defmodule TestCallee do + use GenServer + require Logger + alias Wampex.Role.Callee + + @device "s87d6f8s7df6" + + def start_link(_) do + GenServer.start_link(__MODULE__, :ok) + end + + def init(:ok) do + Wampex.register(TestCalleeSession, "com.actuator.#{@device}.light") + end + + def handle_info({:invocation, {id, _reg, _arg_l, arg_kw} = data, procedure}, state) do + Logger.info("Got invocation #{inspect(data)} from procedure #{inspect(procedure)}") + + Wampex.cast_send_request( + TestCalleeSession, + Callee.yield(id, [:ok], %{color: Map.get(arg_kw, "color")}) + ) + + {:noreply, state} + end + end + + defmodule TestSubscriber do + use GenServer + require Logger + + def start_link(_) do + GenServer.start_link(__MODULE__, :ok) + end + + def init(:ok) do + Wampex.subscribe(TestSubscriberSession, "com.data.temp") + end + + def handle_info({:event, data, topic}, state) do + Logger.info("Got event #{inspect(data)} from topic #{inspect(topic)}") + {:noreply, state} + end + end + def register do - Wampex.start_link(TestCallee, @session) + Wampex.start_link(TestCalleeSession, @session) :timer.sleep(500) - Wampex.send_request(TestCallee, Callee.register("com.actuator.#{@device}.light")) + TestCallee.start_link([]) end def call do @@ -40,9 +85,9 @@ defmodule Test do end def subscribe do - Wampex.start_link(TestSubscriber, @session) + Wampex.start_link(TestSubscriberSession, @session) :timer.sleep(500) - Wampex.send_request(TestSubscriber, Subscriber.subscribe("com.data.temp")) + Enum.each(1..5, fn _ -> TestSubscriber.start_link([]) end) end def publish do @@ -50,7 +95,7 @@ defmodule Test do :timer.sleep(500) Enum.each(1..10, fn _ -> - Wampex.send_request( + Wampex.cast_send_request( TestPublisher, Publisher.publish("com.data.temp", [12.5, 45.6, 87.5], %{loc: "60645"}, %{}) ) diff --git a/lib/wampex.ex b/lib/wampex.ex index 3c67cd2..850bba9 100644 --- a/lib/wampex.ex +++ b/lib/wampex.ex @@ -5,22 +5,25 @@ defmodule Wampex do use Supervisor alias Wampex.Session, as: Sess + alias Wampex.Role.{Callee, Subscriber} def start_link(name, %Sess{} = session_data) when is_atom(name) do Supervisor.start_link(__MODULE__, {name, session_data}, name: name) end def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do - session_name = session_name(name) + session = session_name(name) subscriber_registry = subscriber_registry_name(name) - transport_name = transport_name(name) - session_data = %Sess{session_data | transport_pid: transport_name} + callee_registry = callee_registry_name(name) + transport = transport_name(name) + session_data = %Sess{session_data | name: name, transport_pid: transport} children = [ - {Sess, [{:local, session_name}, session_data, []]}, - {t, - [url: url, session: session_name, protocol: p, serializer: s, opts: [name: transport_name]]}, - {Registry, [keys: :duplicate, name: subscriber_registry]} + {Sess, [{:local, session}, session_data, []]}, + {t, [url: url, session: session, protocol: p, serializer: s, opts: [name: transport]]}, + {Registry, + [keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]}, + {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]} ] Supervisor.init(children, strategy: :one_for_all) @@ -28,9 +31,26 @@ defmodule Wampex do def session_name(name), do: Module.concat([name, Session]) def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry]) + def callee_registry_name(name), do: Module.concat([name, CalleeRegistry]) def transport_name(name), do: Module.concat([name, Transport]) - def send_request(name, request) do - Sess.send_request(session_name(name), request) + def subscribe(name, topic, timeout \\ 5000) do + {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(topic), timeout) + Registry.register(subscriber_registry_name(name), id, topic) + {:ok, id} + end + + def register(name, procedure, timeout \\ 5000) do + {:ok, id} = Sess.send_request(session_name(name), Callee.register(procedure), timeout) + Registry.register(callee_registry_name(name), id, procedure) + {:ok, id} + end + + def send_request(name, request, timeout \\ 5000) do + Sess.send_request(session_name(name), request, timeout) + end + + def cast_send_request(name, request) do + Sess.cast_send_request(session_name(name), request) end end diff --git a/lib/wampex/messages.ex b/lib/wampex/messages.ex index 10adef2..f631c8c 100644 --- a/lib/wampex/messages.ex +++ b/lib/wampex/messages.ex @@ -1,121 +1,12 @@ defmodule Wampex.Messages do - alias StatesLanguage, as: SL - alias Wampex.Session - - require Logger - - @welcome 2 - @abort 3 - @goodbye 6 - @error 8 - @published 17 - @subscribed 33 - @unsubscribed 35 - @event 36 - @result 50 - @registered 65 - @unregistered 67 - @invocation 68 - - def handle([@welcome, session_id, _dets], %SL{data: %Session{} = data} = sl) do - {%SL{sl | data: %Session{data | id: session_id}}, [{:next_event, :internal, :noop}]} - end - - def handle([@abort, _dets, reason], sl) do - Logger.warn("Session aborted: #{reason}") - {sl, [{:next_event, :internal, :abort}]} - end - - def handle([@goodbye, _dets, reason], sl) do - Logger.warn("Goodbye: #{reason}") - {sl, [{:next_event, :internal, :goodbye}]} - end - - def handle([@error, type, id, dets, error], sl) do - handle([@error, type, id, dets, error, [], %{}], sl) - end - - def handle([@error, type, id, dets, error, arg_l], sl) do - handle([@error, type, id, dets, error, arg_l, %{}], sl) - end - - def handle([@error, type, id, dets, error, arg_l, arg_kw], sl) do - Logger.error(""" - Request Type: #{type} - Id: #{id} - Details: #{inspect(dets)} - Error: #{error} - ArgList: #{inspect(arg_l)} - ArgKW: #{inspect(arg_kw)} - """) - - {sl, [{:next_event, :internal, :noop}]} - end - - def handle([@published, _request_id, id], sl) do - Logger.info("Published: #{id}") - {sl, [{:next_event, :internal, :noop}]} - end - - def handle([@subscribed, _request_id, id], sl) do - Logger.info("Subscribed: #{id}") - {sl, [{:next_event, :internal, :noop}]} - end - - def handle([@unsubscribed, request_id], sl) do - Logger.info("Unsubscribed: #{request_id}") - {sl, [{:next_event, :internal, :noop}]} - end - - def handle([@event, sub_id, pub_id, dets], sl) do - handle([@event, sub_id, pub_id, dets, [], %{}], sl) - end - - def handle([@event, sub_id, pub_id, dets, arg_l], sl) do - handle([@event, sub_id, pub_id, dets, arg_l, %{}], sl) - end - - def handle([@event, sub_id, pub_id, _dets, arg_l, arg_kw], sl) do - Logger.info( - "Got event for #{sub_id} from #{pub_id} with args #{inspect(arg_l)} and #{inspect(arg_kw)}" - ) - - {sl, [{:next_event, :internal, :noop}]} - end - - def handle([@result, id, dets], sl) do - handle([@result, id, dets, [], %{}], sl) - end - - def handle([@result, id, dets, arg_l], sl) do - handle([@result, id, dets, arg_l, %{}], sl) - end - - def handle([@result, id, _dets, arg_l, arg_kw], sl) do - Logger.info("Received result for request #{id} #{inspect(arg_l)}, #{inspect(arg_kw)}") - {sl, [{:next_event, :internal, :noop}]} - end - - def handle([@unregistered, request_id], sl) do - Logger.info("Unregistered #{request_id}") - {sl, [{:next_event, :internal, :noop}]} - end - - def handle([@registered, _request_id, id], sl) do - Logger.info("Registered #{id}") - {sl, [{:next_event, :internal, :noop}]} - end - - def handle([@invocation, id, dets], sl) do - handle([@invocation, id, dets, [], %{}], sl) - end - - def handle([@invocation, id, dets, arg_l], sl) do - handle([@invocation, id, dets, arg_l, %{}], sl) - end - - def handle([@invocation, id, _dets, arg_l, arg_kw], sl) do - Logger.info("Received invocation for request #{id} #{inspect(arg_l)}, #{inspect(arg_kw)}") - {sl, [{:next_event, :internal, :noop}]} + def handle(msg, data, roles) do + Enum.reduce_while(roles, nil, fn r, _ -> + try do + res = r.handle(msg, data) + {:halt, res} + rescue + FunctionClauseError -> {:cont, nil} + end + end) end end diff --git a/lib/wampex/roles/callee.ex b/lib/wampex/roles/callee.ex index 417a043..24b02ed 100644 --- a/lib/wampex/roles/callee.ex +++ b/lib/wampex/roles/callee.ex @@ -1,6 +1,12 @@ defmodule Wampex.Role.Callee do + @moduledoc false + @register 64 + @registered 65 @unregister 66 + @unregistered 67 + @invocation 68 + @yield 70 def add(roles) do Map.put(roles, :callee, %{}) @@ -13,4 +19,37 @@ defmodule Wampex.Role.Callee do def unregister(id) do [@unregister, id] end + + def yield(request_id) do + [@yield, request_id, %{}] + end + + def yield(request_id, arg_l) do + [@yield, request_id, %{}, arg_l] + end + + def yield(request_id, arg_l, arg_kw) do + [@yield, request_id, %{}, arg_l, arg_kw] + end + + def handle([@unregistered, request_id], sl) do + {sl, [{:next_event, :internal, :noop}], request_id, {:ok, request_id}} + end + + def handle([@registered, request_id, id], sl) do + {sl, [{:next_event, :internal, :noop}], request_id, {:ok, id}} + end + + def handle([@invocation, id, reg_id, dets], sl) do + handle([@invocation, id, reg_id, dets, [], %{}], sl) + end + + def handle([@invocation, id, reg_id, dets, arg_l], sl) do + handle([@invocation, id, dets, reg_id, arg_l, %{}], sl) + end + + def handle([@invocation, id, reg_id, _dets, arg_l, arg_kw], sl) do + {sl, [{:next_event, :internal, :invocation}], id, + {:update, :invocation, {id, reg_id, arg_l, arg_kw}}} + end end diff --git a/lib/wampex/roles/caller.ex b/lib/wampex/roles/caller.ex index 03cd00a..036db7d 100644 --- a/lib/wampex/roles/caller.ex +++ b/lib/wampex/roles/caller.ex @@ -1,5 +1,8 @@ defmodule Wampex.Role.Caller do + @moduledoc false + @call 48 + @result 50 def add(roles) do Map.put(roles, :caller, %{}) @@ -20,4 +23,16 @@ defmodule Wampex.Role.Caller do def call(procedure, args_l, args_kw, opts) do [@call, opts, procedure, args_l, args_kw] end + + def handle([@result, id, dets], sl) do + handle([@result, id, dets, [], %{}], sl) + end + + def handle([@result, id, dets, arg_l], sl) do + handle([@result, id, dets, arg_l, %{}], sl) + end + + def handle([@result, id, _dets, arg_l, arg_kw], sl) do + {sl, [{:next_event, :internal, :noop}], id, {:ok, arg_l, arg_kw}} + end end diff --git a/lib/wampex/roles/peer.ex b/lib/wampex/roles/peer.ex new file mode 100644 index 0000000..cce5847 --- /dev/null +++ b/lib/wampex/roles/peer.ex @@ -0,0 +1,42 @@ +defmodule Wampex.Role.Peer do + @moduledoc false + alias StatesLanguage, as: SL + alias Wampex.Session + + @hello 1 + @welcome 2 + @abort 3 + @goodbye 6 + @error 8 + + def add(roles), do: roles + + def hello(realm, roles) do + [@hello, realm, %{roles: Enum.reduce(roles, %{}, fn r, acc -> r.add(acc) end)}] + end + + def handle([@welcome, session_id, _dets], %SL{data: %Session{} = data} = sl) do + {%SL{sl | data: %Session{data | id: session_id}}, [{:next_event, :internal, :noop}], nil, + {:ok, session_id}} + end + + def handle([@abort, _dets, reason], sl) do + {sl, [{:next_event, :internal, :abort}], nil, {:error, reason}} + end + + def handle([@goodbye, _dets, reason], sl) do + {sl, [{:next_event, :internal, :goodbye}], nil, {:goodbye, reason}} + end + + def handle([@error, type, id, dets, error], sl) do + handle([@error, type, id, dets, error, [], %{}], sl) + end + + def handle([@error, type, id, dets, error, arg_l], sl) do + handle([@error, type, id, dets, error, arg_l, %{}], sl) + end + + def handle([@error, type, id, _dets, error, arg_l, arg_kw], sl) do + {sl, [{:next_event, :internal, :noop}], id, {:error, type, error, arg_l, arg_kw}} + end +end diff --git a/lib/wampex/roles/publisher.ex b/lib/wampex/roles/publisher.ex index 1207c2d..33b95bc 100644 --- a/lib/wampex/roles/publisher.ex +++ b/lib/wampex/roles/publisher.ex @@ -1,5 +1,8 @@ defmodule Wampex.Role.Publisher do + @moduledoc false + @publish 16 + @published 17 def add(roles) do Map.put(roles, :publisher, %{}) @@ -20,4 +23,8 @@ defmodule Wampex.Role.Publisher do def publish(topic, args_l, args_kw, opts) do [@publish, opts, topic, args_l, args_kw] end + + def handle([@published, request_id, id], sl) do + {sl, [{:next_event, :internal, :noop}], request_id, {:ok, id}} + end end diff --git a/lib/wampex/roles/subscriber.ex b/lib/wampex/roles/subscriber.ex index 84f4fe1..09fe31e 100644 --- a/lib/wampex/roles/subscriber.ex +++ b/lib/wampex/roles/subscriber.ex @@ -1,7 +1,11 @@ defmodule Wampex.Role.Subscriber do @moduledoc false + @subscribe 32 + @subscribed 33 @unsubscribe 34 + @unsubscribed 35 + @event 36 def add(roles) do Map.put(roles, :subscriber, %{}) @@ -14,4 +18,25 @@ defmodule Wampex.Role.Subscriber do def unsubscribe(sub_id) do [@unsubscribe, sub_id] end + + def handle([@subscribed, request_id, id], sl) do + {sl, [{:next_event, :internal, :noop}], request_id, {:ok, id}} + end + + def handle([@unsubscribed, request_id], sl) do + {sl, [{:next_event, :internal, :noop}], request_id, :ok} + end + + def handle([@event, sub_id, pub_id, dets], sl) do + handle([@event, sub_id, pub_id, dets, [], %{}], sl) + end + + def handle([@event, sub_id, pub_id, dets, arg_l], sl) do + handle([@event, sub_id, pub_id, dets, arg_l, %{}], sl) + end + + def handle([@event, sub_id, pub_id, _dets, arg_l, arg_kw], sl) do + {sl, [{:next_event, :internal, :event}], nil, + {:update, :event, {sub_id, pub_id, arg_l, arg_kw}}} + end end diff --git a/lib/wampex/session.ex b/lib/wampex/session.ex index c3acead..9599d72 100644 --- a/lib/wampex/session.ex +++ b/lib/wampex/session.ex @@ -4,22 +4,29 @@ defmodule Wampex.Session do @max_id 9_007_199_254_740_992 alias StatesLanguage, as: SL - alias Wampex.Session, as: Sess alias Wampex.{Messages, Realm} + alias Wampex.Role.{Callee, Peer} alias Wampex.Serializer.JSON + alias Wampex.Session, as: Sess alias Wampex.Transport.WebSocket + @yield 70 + defstruct [ :id, :url, :transport_pid, :message, + :event, + :invocation, :realm, + :name, roles: [], request_id: 0, transport: WebSocket, protocol: "wamp.2.json", - serializer: JSON + serializer: JSON, + requests: [] ] def get_request_id(current_id) when current_id == @max_id do @@ -30,19 +37,58 @@ defmodule Wampex.Session do current_id + 1 end - def send_request(p, request) do + def cast_send_request(p, request) do __MODULE__.cast(p, {:send_request, request}) end + def send_request(p, request, timeout) do + __MODULE__.call(p, {:send_request, request}, timeout) + end + + def do_send(r_id, tt, t, request) do + request_id = get_request_id(r_id) + + request = + case request do + [@yield | _] -> request + _ -> List.insert_at(request, 1, request_id) + end + + tt.send_request(t, request) + request_id + end + + def handle_call( + {:send_request, request}, + from, + _, + %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data + ) do + request_id = do_send(r_id, tt, t, request) + + {:ok, + %SL{ + data + | data: %Sess{ + sess + | request_id: request_id, + requests: [{request_id, from} | sess.requests] + } + }, []} + end + def handle_cast( {:send_request, request}, - "Established", + _, %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data ) do - request_id = get_request_id(r_id) - request = List.insert_at(request, 1, request_id) - tt.send_request(t, request) - {:ok, %SL{data | data: %Sess{sess | request_id: request_id}}, []} + request_id = do_send(r_id, tt, t, request) + + {:ok, + %SL{ + data + | data: %Sess{sess | request_id: request_id, requests: [{request_id, nil} | sess.requests]} + }, []} end def handle_resource( @@ -53,15 +99,7 @@ defmodule Wampex.Session do data: %Sess{transport: tt, transport_pid: t, roles: roles, realm: %Realm{name: realm}} } = data ) do - tt.send_request( - t, - [ - 1, - realm, - %{roles: Enum.reduce(roles, %{}, fn r, acc -> r.add(acc) end)} - ] - ) - + tt.send_request(t, Peer.hello(realm, roles)) {:ok, data, [{:next_event, :internal, :hello_sent}]} end @@ -75,10 +113,18 @@ defmodule Wampex.Session do {:ok, data, []} end - def handle_resource("HandleMessage", _, _, %SL{data: %Sess{message: msg}} = data) do + def handle_resource( + "HandleMessage", + _, + _, + %SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data + ) do Logger.info("Handling Message #{inspect(msg)}") - {data, actions} = Messages.handle(msg, data) - {:ok, data, actions} + {data, actions, id, response} = Messages.handle(msg, data, roles) + {data, response} = maybe_update_response(data, response) + {requests, actions} = resp = handle_response(id, actions, requests, response) + Logger.info("Response: #{inspect(resp)}") + {:ok, %SL{data | data: %Sess{data.data | requests: requests}}, actions} end def handle_resource("Abort", _, _, data) do @@ -93,7 +139,7 @@ defmodule Wampex.Session do data ) do Logger.info("Waiting for transport to connect...") - {:ok, data, []} + {:ok, %SL{data | data: %Sess{data.data | roles: [Peer | data.data.roles]}}, []} end def handle_resource("Established", _, "Established", data) do @@ -101,6 +147,39 @@ defmodule Wampex.Session do {:ok, data, []} end + def handle_resource( + "HandleEvent", + _, + "Event", + %SL{data: %Sess{name: name, event: event}} = sl + ) do + Logger.info("Received Event #{inspect(event)}") + + sub = Wampex.subscriber_registry_name(name) + + Registry.dispatch(sub, elem(event, 0), fn entries -> + for {pid, topic} <- entries, do: send(pid, {:event, event, topic}) + end) + + {:ok, sl, [{:next_event, :internal, :transition}]} + end + + def handle_resource( + "HandleInvocation", + _, + "Invocation", + %SL{data: %Sess{name: name, invocation: invocation}} = sl + ) do + Logger.info("Received Invocation #{inspect(invocation)}") + reg = Wampex.callee_registry_name(name) + + Registry.dispatch(reg, elem(invocation, 1), fn entries -> + for {pid, procedure} <- entries, do: send(pid, {:invocation, invocation, procedure}) + end) + + {:ok, sl, [{:next_event, :internal, :transition}]} + end + def handle_resource(resource, _, state, data) do Logger.info("No specific resource handler for #{resource} in state #{state}") {:ok, data, []} @@ -110,4 +189,32 @@ defmodule Wampex.Session do {:ok, %SL{data | data: %Sess{sess | message: message}}, [{:next_event, :internal, :message_received}]} end + + def maybe_update_response(data, {:update, key, resp}) do + {%SL{data | data: Map.put(data.data, key, resp)}, resp} + end + + def maybe_update_response(data, resp), do: {data, resp} + + def handle_response(nil, actions, requests, _), do: {requests, actions} + + def handle_response(id, actions, requests, response) do + Enum.reduce_while(requests, {requests, actions}, fn + {^id, nil}, _ -> + {:halt, {remove_request(id, requests), actions}} + + {^id, from}, _ -> + {:halt, {remove_request(id, requests), [{:reply, from, response} | actions]}} + + {_, _}, acc -> + {:cont, acc} + end) + end + + def remove_request(id, requests) do + Enum.filter(requests, fn + {^id, _} -> false + {_id, _} -> true + end) + end end