]> Entropealabs - wampex.git/commitdiff
adds Registry based dispatch for invocations and subscriptions
authorChristopher <chris@entropealabs.com>
Mon, 17 Feb 2020 19:30:53 +0000 (13:30 -0600)
committerChristopher <chris@entropealabs.com>
Mon, 17 Feb 2020 19:30:53 +0000 (13:30 -0600)
config/config.exs [new file with mode: 0644]
lib/test.ex
lib/wampex.ex
lib/wampex/messages.ex
lib/wampex/roles/callee.ex
lib/wampex/roles/caller.ex
lib/wampex/roles/peer.ex [new file with mode: 0644]
lib/wampex/roles/publisher.ex
lib/wampex/roles/subscriber.ex
lib/wampex/session.ex

diff --git a/config/config.exs b/config/config.exs
new file mode 100644 (file)
index 0000000..19932f3
--- /dev/null
@@ -0,0 +1,3 @@
+use Mix.Config
+
+config :wampex, state_machine: "priv/session.json"
index d995ea1f726ebb1093b83ac01015d29ffc1d2797..92fd049f7ef2abaaf3b095d3f52c8ecc70450975 100644 (file)
@@ -16,10 +16,55 @@ defmodule Test do
     roles: @roles
   }
 
+  defmodule TestCallee do
+    use GenServer
+    require Logger
+    alias Wampex.Role.Callee
+
+    @device "s87d6f8s7df6"
+
+    def start_link(_) do
+      GenServer.start_link(__MODULE__, :ok)
+    end
+
+    def init(:ok) do
+      Wampex.register(TestCalleeSession, "com.actuator.#{@device}.light")
+    end
+
+    def handle_info({:invocation, {id, _reg, _arg_l, arg_kw} = data, procedure}, state) do
+      Logger.info("Got invocation #{inspect(data)} from procedure #{inspect(procedure)}")
+
+      Wampex.cast_send_request(
+        TestCalleeSession,
+        Callee.yield(id, [:ok], %{color: Map.get(arg_kw, "color")})
+      )
+
+      {:noreply, state}
+    end
+  end
+
+  defmodule TestSubscriber do
+    use GenServer
+    require Logger
+
+    def start_link(_) do
+      GenServer.start_link(__MODULE__, :ok)
+    end
+
+    def init(:ok) do
+      Wampex.subscribe(TestSubscriberSession, "com.data.temp")
+    end
+
+    def handle_info({:event, data, topic}, state) do
+      Logger.info("Got event #{inspect(data)} from topic #{inspect(topic)}")
+      {:noreply, state}
+    end
+  end
+
   def register do
-    Wampex.start_link(TestCallee, @session)
+    Wampex.start_link(TestCalleeSession, @session)
     :timer.sleep(500)
-    Wampex.send_request(TestCallee, Callee.register("com.actuator.#{@device}.light"))
+    TestCallee.start_link([])
   end
 
   def call do
@@ -40,9 +85,9 @@ defmodule Test do
   end
 
   def subscribe do
-    Wampex.start_link(TestSubscriber, @session)
+    Wampex.start_link(TestSubscriberSession, @session)
     :timer.sleep(500)
-    Wampex.send_request(TestSubscriber, Subscriber.subscribe("com.data.temp"))
+    Enum.each(1..5, fn _ -> TestSubscriber.start_link([]) end)
   end
 
   def publish do
@@ -50,7 +95,7 @@ defmodule Test do
     :timer.sleep(500)
 
     Enum.each(1..10, fn _ ->
-      Wampex.send_request(
+      Wampex.cast_send_request(
         TestPublisher,
         Publisher.publish("com.data.temp", [12.5, 45.6, 87.5], %{loc: "60645"}, %{})
       )
index 3c67cd2b7ffdc6b215a16d596933a52f2d1d787d..850bba90e6414b2c64bfc18a6ccfd692ca5768a7 100644 (file)
@@ -5,22 +5,25 @@ defmodule Wampex do
   use Supervisor
 
   alias Wampex.Session, as: Sess
+  alias Wampex.Role.{Callee, Subscriber}
 
   def start_link(name, %Sess{} = session_data) when is_atom(name) do
     Supervisor.start_link(__MODULE__, {name, session_data}, name: name)
   end
 
   def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do
-    session_name = session_name(name)
+    session = session_name(name)
     subscriber_registry = subscriber_registry_name(name)
-    transport_name = transport_name(name)
-    session_data = %Sess{session_data | transport_pid: transport_name}
+    callee_registry = callee_registry_name(name)
+    transport = transport_name(name)
+    session_data = %Sess{session_data | name: name, transport_pid: transport}
 
     children = [
-      {Sess, [{:local, session_name}, session_data, []]},
-      {t,
-       [url: url, session: session_name, protocol: p, serializer: s, opts: [name: transport_name]]},
-      {Registry, [keys: :duplicate, name: subscriber_registry]}
+      {Sess, [{:local, session}, session_data, []]},
+      {t, [url: url, session: session, protocol: p, serializer: s, opts: [name: transport]]},
+      {Registry,
+       [keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]},
+      {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]}
     ]
 
     Supervisor.init(children, strategy: :one_for_all)
@@ -28,9 +31,26 @@ defmodule Wampex do
 
   def session_name(name), do: Module.concat([name, Session])
   def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry])
+  def callee_registry_name(name), do: Module.concat([name, CalleeRegistry])
   def transport_name(name), do: Module.concat([name, Transport])
 
-  def send_request(name, request) do
-    Sess.send_request(session_name(name), request)
+  def subscribe(name, topic, timeout \\ 5000) do
+    {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(topic), timeout)
+    Registry.register(subscriber_registry_name(name), id, topic)
+    {:ok, id}
+  end
+
+  def register(name, procedure, timeout \\ 5000) do
+    {:ok, id} = Sess.send_request(session_name(name), Callee.register(procedure), timeout)
+    Registry.register(callee_registry_name(name), id, procedure)
+    {:ok, id}
+  end
+
+  def send_request(name, request, timeout \\ 5000) do
+    Sess.send_request(session_name(name), request, timeout)
+  end
+
+  def cast_send_request(name, request) do
+    Sess.cast_send_request(session_name(name), request)
   end
 end
index 10adef2ef5d800615aa39e59aebe0b8616567906..f631c8ca0dfcca17f3e4b4656f1c2ab9a63add68 100644 (file)
 defmodule Wampex.Messages do
-  alias StatesLanguage, as: SL
-  alias Wampex.Session
-
-  require Logger
-
-  @welcome 2
-  @abort 3
-  @goodbye 6
-  @error 8
-  @published 17
-  @subscribed 33
-  @unsubscribed 35
-  @event 36
-  @result 50
-  @registered 65
-  @unregistered 67
-  @invocation 68
-
-  def handle([@welcome, session_id, _dets], %SL{data: %Session{} = data} = sl) do
-    {%SL{sl | data: %Session{data | id: session_id}}, [{:next_event, :internal, :noop}]}
-  end
-
-  def handle([@abort, _dets, reason], sl) do
-    Logger.warn("Session aborted: #{reason}")
-    {sl, [{:next_event, :internal, :abort}]}
-  end
-
-  def handle([@goodbye, _dets, reason], sl) do
-    Logger.warn("Goodbye: #{reason}")
-    {sl, [{:next_event, :internal, :goodbye}]}
-  end
-
-  def handle([@error, type, id, dets, error], sl) do
-    handle([@error, type, id, dets, error, [], %{}], sl)
-  end
-
-  def handle([@error, type, id, dets, error, arg_l], sl) do
-    handle([@error, type, id, dets, error, arg_l, %{}], sl)
-  end
-
-  def handle([@error, type, id, dets, error, arg_l, arg_kw], sl) do
-    Logger.error("""
-    Request Type: #{type} 
-    Id: #{id}
-    Details: #{inspect(dets)}
-    Error: #{error}
-    ArgList: #{inspect(arg_l)}
-    ArgKW: #{inspect(arg_kw)}
-    """)
-
-    {sl, [{:next_event, :internal, :noop}]}
-  end
-
-  def handle([@published, _request_id, id], sl) do
-    Logger.info("Published: #{id}")
-    {sl, [{:next_event, :internal, :noop}]}
-  end
-
-  def handle([@subscribed, _request_id, id], sl) do
-    Logger.info("Subscribed: #{id}")
-    {sl, [{:next_event, :internal, :noop}]}
-  end
-
-  def handle([@unsubscribed, request_id], sl) do
-    Logger.info("Unsubscribed: #{request_id}")
-    {sl, [{:next_event, :internal, :noop}]}
-  end
-
-  def handle([@event, sub_id, pub_id, dets], sl) do
-    handle([@event, sub_id, pub_id, dets, [], %{}], sl)
-  end
-
-  def handle([@event, sub_id, pub_id, dets, arg_l], sl) do
-    handle([@event, sub_id, pub_id, dets, arg_l, %{}], sl)
-  end
-
-  def handle([@event, sub_id, pub_id, _dets, arg_l, arg_kw], sl) do
-    Logger.info(
-      "Got event for #{sub_id} from #{pub_id} with args #{inspect(arg_l)} and #{inspect(arg_kw)}"
-    )
-
-    {sl, [{:next_event, :internal, :noop}]}
-  end
-
-  def handle([@result, id, dets], sl) do
-    handle([@result, id, dets, [], %{}], sl)
-  end
-
-  def handle([@result, id, dets, arg_l], sl) do
-    handle([@result, id, dets, arg_l, %{}], sl)
-  end
-
-  def handle([@result, id, _dets, arg_l, arg_kw], sl) do
-    Logger.info("Received result for request #{id} #{inspect(arg_l)}, #{inspect(arg_kw)}")
-    {sl, [{:next_event, :internal, :noop}]}
-  end
-
-  def handle([@unregistered, request_id], sl) do
-    Logger.info("Unregistered #{request_id}")
-    {sl, [{:next_event, :internal, :noop}]}
-  end
-
-  def handle([@registered, _request_id, id], sl) do
-    Logger.info("Registered #{id}")
-    {sl, [{:next_event, :internal, :noop}]}
-  end
-
-  def handle([@invocation, id, dets], sl) do
-    handle([@invocation, id, dets, [], %{}], sl)
-  end
-
-  def handle([@invocation, id, dets, arg_l], sl) do
-    handle([@invocation, id, dets, arg_l, %{}], sl)
-  end
-
-  def handle([@invocation, id, _dets, arg_l, arg_kw], sl) do
-    Logger.info("Received invocation for request #{id} #{inspect(arg_l)}, #{inspect(arg_kw)}")
-    {sl, [{:next_event, :internal, :noop}]}
+  def handle(msg, data, roles) do
+    Enum.reduce_while(roles, nil, fn r, _ ->
+      try do
+        res = r.handle(msg, data)
+        {:halt, res}
+      rescue
+        FunctionClauseError -> {:cont, nil}
+      end
+    end)
   end
 end
index 417a043f50634ad6d954b5b29004d84d02654353..24b02eddab614bed0de8bebed15a4a6b6e6bc305 100644 (file)
@@ -1,6 +1,12 @@
 defmodule Wampex.Role.Callee do
+  @moduledoc false
+
   @register 64
+  @registered 65
   @unregister 66
+  @unregistered 67
+  @invocation 68
+  @yield 70
 
   def add(roles) do
     Map.put(roles, :callee, %{})
@@ -13,4 +19,37 @@ defmodule Wampex.Role.Callee do
   def unregister(id) do
     [@unregister, id]
   end
+
+  def yield(request_id) do
+    [@yield, request_id, %{}]
+  end
+
+  def yield(request_id, arg_l) do
+    [@yield, request_id, %{}, arg_l]
+  end
+
+  def yield(request_id, arg_l, arg_kw) do
+    [@yield, request_id, %{}, arg_l, arg_kw]
+  end
+
+  def handle([@unregistered, request_id], sl) do
+    {sl, [{:next_event, :internal, :noop}], request_id, {:ok, request_id}}
+  end
+
+  def handle([@registered, request_id, id], sl) do
+    {sl, [{:next_event, :internal, :noop}], request_id, {:ok, id}}
+  end
+
+  def handle([@invocation, id, reg_id, dets], sl) do
+    handle([@invocation, id, reg_id, dets, [], %{}], sl)
+  end
+
+  def handle([@invocation, id, reg_id, dets, arg_l], sl) do
+    handle([@invocation, id, dets, reg_id, arg_l, %{}], sl)
+  end
+
+  def handle([@invocation, id, reg_id, _dets, arg_l, arg_kw], sl) do
+    {sl, [{:next_event, :internal, :invocation}], id,
+     {:update, :invocation, {id, reg_id, arg_l, arg_kw}}}
+  end
 end
index 03cd00a507aa69f6ebb9a3d13f145e295088f65c..036db7de035487eaae92da9b9eacdb442cb41441 100644 (file)
@@ -1,5 +1,8 @@
 defmodule Wampex.Role.Caller do
+  @moduledoc false
+
   @call 48
+  @result 50
 
   def add(roles) do
     Map.put(roles, :caller, %{})
@@ -20,4 +23,16 @@ defmodule Wampex.Role.Caller do
   def call(procedure, args_l, args_kw, opts) do
     [@call, opts, procedure, args_l, args_kw]
   end
+
+  def handle([@result, id, dets], sl) do
+    handle([@result, id, dets, [], %{}], sl)
+  end
+
+  def handle([@result, id, dets, arg_l], sl) do
+    handle([@result, id, dets, arg_l, %{}], sl)
+  end
+
+  def handle([@result, id, _dets, arg_l, arg_kw], sl) do
+    {sl, [{:next_event, :internal, :noop}], id, {:ok, arg_l, arg_kw}}
+  end
 end
diff --git a/lib/wampex/roles/peer.ex b/lib/wampex/roles/peer.ex
new file mode 100644 (file)
index 0000000..cce5847
--- /dev/null
@@ -0,0 +1,42 @@
+defmodule Wampex.Role.Peer do
+  @moduledoc false
+  alias StatesLanguage, as: SL
+  alias Wampex.Session
+
+  @hello 1
+  @welcome 2
+  @abort 3
+  @goodbye 6
+  @error 8
+
+  def add(roles), do: roles
+
+  def hello(realm, roles) do
+    [@hello, realm, %{roles: Enum.reduce(roles, %{}, fn r, acc -> r.add(acc) end)}]
+  end
+
+  def handle([@welcome, session_id, _dets], %SL{data: %Session{} = data} = sl) do
+    {%SL{sl | data: %Session{data | id: session_id}}, [{:next_event, :internal, :noop}], nil,
+     {:ok, session_id}}
+  end
+
+  def handle([@abort, _dets, reason], sl) do
+    {sl, [{:next_event, :internal, :abort}], nil, {:error, reason}}
+  end
+
+  def handle([@goodbye, _dets, reason], sl) do
+    {sl, [{:next_event, :internal, :goodbye}], nil, {:goodbye, reason}}
+  end
+
+  def handle([@error, type, id, dets, error], sl) do
+    handle([@error, type, id, dets, error, [], %{}], sl)
+  end
+
+  def handle([@error, type, id, dets, error, arg_l], sl) do
+    handle([@error, type, id, dets, error, arg_l, %{}], sl)
+  end
+
+  def handle([@error, type, id, _dets, error, arg_l, arg_kw], sl) do
+    {sl, [{:next_event, :internal, :noop}], id, {:error, type, error, arg_l, arg_kw}}
+  end
+end
index 1207c2de7b59875d2b284103f0e8ec8cdedd6665..33b95bc6a8b78b59130e82f1fa005045291ee56b 100644 (file)
@@ -1,5 +1,8 @@
 defmodule Wampex.Role.Publisher do
+  @moduledoc false
+
   @publish 16
+  @published 17
 
   def add(roles) do
     Map.put(roles, :publisher, %{})
@@ -20,4 +23,8 @@ defmodule Wampex.Role.Publisher do
   def publish(topic, args_l, args_kw, opts) do
     [@publish, opts, topic, args_l, args_kw]
   end
+
+  def handle([@published, request_id, id], sl) do
+    {sl, [{:next_event, :internal, :noop}], request_id, {:ok, id}}
+  end
 end
index 84f4fe119315dbd789b56813aaccf38cd39eb23c..09fe31ebee317ae042e37a8da744ddac510ca4a5 100644 (file)
@@ -1,7 +1,11 @@
 defmodule Wampex.Role.Subscriber do
   @moduledoc false
+
   @subscribe 32
+  @subscribed 33
   @unsubscribe 34
+  @unsubscribed 35
+  @event 36
 
   def add(roles) do
     Map.put(roles, :subscriber, %{})
@@ -14,4 +18,25 @@ defmodule Wampex.Role.Subscriber do
   def unsubscribe(sub_id) do
     [@unsubscribe, sub_id]
   end
+
+  def handle([@subscribed, request_id, id], sl) do
+    {sl, [{:next_event, :internal, :noop}], request_id, {:ok, id}}
+  end
+
+  def handle([@unsubscribed, request_id], sl) do
+    {sl, [{:next_event, :internal, :noop}], request_id, :ok}
+  end
+
+  def handle([@event, sub_id, pub_id, dets], sl) do
+    handle([@event, sub_id, pub_id, dets, [], %{}], sl)
+  end
+
+  def handle([@event, sub_id, pub_id, dets, arg_l], sl) do
+    handle([@event, sub_id, pub_id, dets, arg_l, %{}], sl)
+  end
+
+  def handle([@event, sub_id, pub_id, _dets, arg_l, arg_kw], sl) do
+    {sl, [{:next_event, :internal, :event}], nil,
+     {:update, :event, {sub_id, pub_id, arg_l, arg_kw}}}
+  end
 end
index c3acead7362b414342463bf9b0df672564b3fd7d..9599d724d775f27d3e22e4ef78510fd19e63f14c 100644 (file)
@@ -4,22 +4,29 @@ defmodule Wampex.Session do
   @max_id 9_007_199_254_740_992
 
   alias StatesLanguage, as: SL
-  alias Wampex.Session, as: Sess
   alias Wampex.{Messages, Realm}
+  alias Wampex.Role.{Callee, Peer}
   alias Wampex.Serializer.JSON
+  alias Wampex.Session, as: Sess
   alias Wampex.Transport.WebSocket
 
+  @yield 70
+
   defstruct [
     :id,
     :url,
     :transport_pid,
     :message,
+    :event,
+    :invocation,
     :realm,
+    :name,
     roles: [],
     request_id: 0,
     transport: WebSocket,
     protocol: "wamp.2.json",
-    serializer: JSON
+    serializer: JSON,
+    requests: []
   ]
 
   def get_request_id(current_id) when current_id == @max_id do
@@ -30,19 +37,58 @@ defmodule Wampex.Session do
     current_id + 1
   end
 
-  def send_request(p, request) do
+  def cast_send_request(p, request) do
     __MODULE__.cast(p, {:send_request, request})
   end
 
+  def send_request(p, request, timeout) do
+    __MODULE__.call(p, {:send_request, request}, timeout)
+  end
+
+  def do_send(r_id, tt, t, request) do
+    request_id = get_request_id(r_id)
+
+    request =
+      case request do
+        [@yield | _] -> request
+        _ -> List.insert_at(request, 1, request_id)
+      end
+
+    tt.send_request(t, request)
+    request_id
+  end
+
+  def handle_call(
+        {:send_request, request},
+        from,
+        _,
+        %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
+      ) do
+    request_id = do_send(r_id, tt, t, request)
+
+    {:ok,
+     %SL{
+       data
+       | data: %Sess{
+           sess
+           | request_id: request_id,
+             requests: [{request_id, from} | sess.requests]
+         }
+     }, []}
+  end
+
   def handle_cast(
         {:send_request, request},
-        "Established",
+        _,
         %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
       ) do
-    request_id = get_request_id(r_id)
-    request = List.insert_at(request, 1, request_id)
-    tt.send_request(t, request)
-    {:ok, %SL{data | data: %Sess{sess | request_id: request_id}}, []}
+    request_id = do_send(r_id, tt, t, request)
+
+    {:ok,
+     %SL{
+       data
+       | data: %Sess{sess | request_id: request_id, requests: [{request_id, nil} | sess.requests]}
+     }, []}
   end
 
   def handle_resource(
@@ -53,15 +99,7 @@ defmodule Wampex.Session do
           data: %Sess{transport: tt, transport_pid: t, roles: roles, realm: %Realm{name: realm}}
         } = data
       ) do
-    tt.send_request(
-      t,
-      [
-        1,
-        realm,
-        %{roles: Enum.reduce(roles, %{}, fn r, acc -> r.add(acc) end)}
-      ]
-    )
-
+    tt.send_request(t, Peer.hello(realm, roles))
     {:ok, data, [{:next_event, :internal, :hello_sent}]}
   end
 
@@ -75,10 +113,18 @@ defmodule Wampex.Session do
     {:ok, data, []}
   end
 
-  def handle_resource("HandleMessage", _, _, %SL{data: %Sess{message: msg}} = data) do
+  def handle_resource(
+        "HandleMessage",
+        _,
+        _,
+        %SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data
+      ) do
     Logger.info("Handling Message #{inspect(msg)}")
-    {data, actions} = Messages.handle(msg, data)
-    {:ok, data, actions}
+    {data, actions, id, response} = Messages.handle(msg, data, roles)
+    {data, response} = maybe_update_response(data, response)
+    {requests, actions} = resp = handle_response(id, actions, requests, response)
+    Logger.info("Response: #{inspect(resp)}")
+    {:ok, %SL{data | data: %Sess{data.data | requests: requests}}, actions}
   end
 
   def handle_resource("Abort", _, _, data) do
@@ -93,7 +139,7 @@ defmodule Wampex.Session do
         data
       ) do
     Logger.info("Waiting for transport to connect...")
-    {:ok, data, []}
+    {:ok, %SL{data | data: %Sess{data.data | roles: [Peer | data.data.roles]}}, []}
   end
 
   def handle_resource("Established", _, "Established", data) do
@@ -101,6 +147,39 @@ defmodule Wampex.Session do
     {:ok, data, []}
   end
 
+  def handle_resource(
+        "HandleEvent",
+        _,
+        "Event",
+        %SL{data: %Sess{name: name, event: event}} = sl
+      ) do
+    Logger.info("Received Event #{inspect(event)}")
+
+    sub = Wampex.subscriber_registry_name(name)
+
+    Registry.dispatch(sub, elem(event, 0), fn entries ->
+      for {pid, topic} <- entries, do: send(pid, {:event, event, topic})
+    end)
+
+    {:ok, sl, [{:next_event, :internal, :transition}]}
+  end
+
+  def handle_resource(
+        "HandleInvocation",
+        _,
+        "Invocation",
+        %SL{data: %Sess{name: name, invocation: invocation}} = sl
+      ) do
+    Logger.info("Received Invocation #{inspect(invocation)}")
+    reg = Wampex.callee_registry_name(name)
+
+    Registry.dispatch(reg, elem(invocation, 1), fn entries ->
+      for {pid, procedure} <- entries, do: send(pid, {:invocation, invocation, procedure})
+    end)
+
+    {:ok, sl, [{:next_event, :internal, :transition}]}
+  end
+
   def handle_resource(resource, _, state, data) do
     Logger.info("No specific resource handler for #{resource} in state #{state}")
     {:ok, data, []}
@@ -110,4 +189,32 @@ defmodule Wampex.Session do
     {:ok, %SL{data | data: %Sess{sess | message: message}},
      [{:next_event, :internal, :message_received}]}
   end
+
+  def maybe_update_response(data, {:update, key, resp}) do
+    {%SL{data | data: Map.put(data.data, key, resp)}, resp}
+  end
+
+  def maybe_update_response(data, resp), do: {data, resp}
+
+  def handle_response(nil, actions, requests, _), do: {requests, actions}
+
+  def handle_response(id, actions, requests, response) do
+    Enum.reduce_while(requests, {requests, actions}, fn
+      {^id, nil}, _ ->
+        {:halt, {remove_request(id, requests), actions}}
+
+      {^id, from}, _ ->
+        {:halt, {remove_request(id, requests), [{:reply, from, response} | actions]}}
+
+      {_, _}, acc ->
+        {:cont, acc}
+    end)
+  end
+
+  def remove_request(id, requests) do
+    Enum.filter(requests, fn
+      {^id, _} -> false
+      {_id, _} -> true
+    end)
+  end
 end