]> Entropealabs - wampex.git/commitdiff
enumerate all result types, remove uneccessary states from JSON
authorChristopher <chris@entropealabs.com>
Mon, 17 Feb 2020 22:00:03 +0000 (16:00 -0600)
committerChristopher <chris@entropealabs.com>
Mon, 17 Feb 2020 22:00:03 +0000 (16:00 -0600)
19 files changed:
lib/test.ex
lib/wampex.ex
lib/wampex/messages.ex [deleted file]
lib/wampex/realm.ex
lib/wampex/role.ex [new file with mode: 0644]
lib/wampex/roles/callee.ex
lib/wampex/roles/caller.ex
lib/wampex/roles/peer.ex
lib/wampex/roles/publisher.ex
lib/wampex/roles/subscriber.ex
lib/wampex/serializer.ex [new file with mode: 0644]
lib/wampex/serializers/json.ex
lib/wampex/serializers/message_pack.ex
lib/wampex/session.ex
lib/wampex/transport.ex [new file with mode: 0644]
lib/wampex/transports/web_socket.ex
mix.exs
mix.lock
priv/session.json

index 92fd049f7ef2abaaf3b095d3f52c8ecc70450975..8144718c0eb859917ec612c4389d2e94e4d6e552 100644 (file)
@@ -1,20 +1,13 @@
 defmodule Test do
   alias Wampex.{Realm, Session}
   alias Wampex.Role.{Callee, Caller, Publisher, Subscriber}
-  alias Wampex.Serializer.MessagePack
 
   @url "ws://localhost:18080/ws"
   @realm %Realm{name: "com.myrealm"}
   @roles [Callee, Caller, Publisher, Subscriber]
   @device "s87d6f8s7df6"
 
-  @session %Session{
-    serializer: MessagePack,
-    protocol: "wamp.2.msgpack",
-    url: @url,
-    realm: @realm,
-    roles: @roles
-  }
+  @session Session.new(@url, @realm, @roles)
 
   defmodule TestCallee do
     use GenServer
@@ -31,8 +24,8 @@ defmodule Test do
       Wampex.register(TestCalleeSession, "com.actuator.#{@device}.light")
     end
 
-    def handle_info({:invocation, {id, _reg, _arg_l, arg_kw} = data, procedure}, state) do
-      Logger.info("Got invocation #{inspect(data)} from procedure #{inspect(procedure)}")
+    def handle_info({:invocation, id, _, _, _, arg_kw} = invocation, state) do
+      Logger.info("Got invocation #{inspect(invocation)}")
 
       Wampex.cast_send_request(
         TestCalleeSession,
@@ -55,8 +48,8 @@ defmodule Test do
       Wampex.subscribe(TestSubscriberSession, "com.data.temp")
     end
 
-    def handle_info({:event, data, topic}, state) do
-      Logger.info("Got event #{inspect(data)} from topic #{inspect(topic)}")
+    def handle_info(event, state) do
+      Logger.info("Got event #{inspect(event)}")
       {:noreply, state}
     end
   end
index 850bba90e6414b2c64bfc18a6ccfd692ca5768a7..56e0d6cc2eea3b06333a752981e9a42e9abcc5cd 100644 (file)
@@ -7,10 +7,30 @@ defmodule Wampex do
   alias Wampex.Session, as: Sess
   alias Wampex.Role.{Callee, Subscriber}
 
+  @type message_part :: integer() | binary() | map() | list()
+  @type message :: nonempty_list(message_part())
+  @type arg_list :: [] | nonempty_list(any())
+  @type arg_keyword :: map()
+  @type handle_response ::
+          {:ok, integer()}
+          | {:invocation, request_id :: integer(), registration_id :: integer(), details :: map(),
+             arg_list :: arg_list(), arg_keywords :: arg_keyword()}
+          | {:ok, arg_list :: arg_list(), arg_keyword :: arg_keyword()}
+          | {:error, reason :: binary()}
+          | {:error, type :: integer(), error :: binary(), arg_list :: arg_list(),
+             arg_keyword :: arg_keyword()}
+          | {:event, subscription_id :: integer(), publication_id :: integer(), details :: map(),
+             arg_list :: arg_list(), arg_keyword :: arg_keyword()}
+
+  @spec start_link(name :: atom(), session_data :: Sess.t()) ::
+          {:ok, pid()}
+          | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
   def start_link(name, %Sess{} = session_data) when is_atom(name) do
     Supervisor.start_link(__MODULE__, {name, session_data}, name: name)
   end
 
+  @spec init({atom(), Sess.t()}) ::
+          {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
   def init({name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data}) do
     session = session_name(name)
     subscriber_registry = subscriber_registry_name(name)
@@ -29,27 +49,36 @@ defmodule Wampex do
     Supervisor.init(children, strategy: :one_for_all)
   end
 
+  @spec session_name(module()) :: module()
   def session_name(name), do: Module.concat([name, Session])
+  @spec subscriber_registry_name(module()) :: module()
   def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry])
+  @spec callee_registry_name(module()) :: module()
   def callee_registry_name(name), do: Module.concat([name, CalleeRegistry])
+  @spec transport_name(module()) :: module()
   def transport_name(name), do: Module.concat([name, Transport])
 
+  @spec subscribe(name :: module(), topic :: binary(), timeout :: integer()) :: {:ok, integer()}
   def subscribe(name, topic, timeout \\ 5000) do
     {:ok, id} = Sess.send_request(session_name(name), Subscriber.subscribe(topic), timeout)
     Registry.register(subscriber_registry_name(name), id, topic)
     {:ok, id}
   end
 
+  @spec register(name :: module(), procedure :: binary(), timeout :: integer()) ::
+          {:ok, integer()}
   def register(name, procedure, timeout \\ 5000) do
     {:ok, id} = Sess.send_request(session_name(name), Callee.register(procedure), timeout)
     Registry.register(callee_registry_name(name), id, procedure)
     {:ok, id}
   end
 
+  @spec send_request(name :: module(), request :: message(), timeout :: integer()) :: term()
   def send_request(name, request, timeout \\ 5000) do
     Sess.send_request(session_name(name), request, timeout)
   end
 
+  @spec cast_send_request(name :: module(), message()) :: :ok
   def cast_send_request(name, request) do
     Sess.cast_send_request(session_name(name), request)
   end
diff --git a/lib/wampex/messages.ex b/lib/wampex/messages.ex
deleted file mode 100644 (file)
index f631c8c..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-defmodule Wampex.Messages do
-  def handle(msg, data, roles) do
-    Enum.reduce_while(roles, nil, fn r, _ ->
-      try do
-        res = r.handle(msg, data)
-        {:halt, res}
-      rescue
-        FunctionClauseError -> {:cont, nil}
-      end
-    end)
-  end
-end
index 6e635158fe5cd72d564fcb93f9d5588ce6062082..ca93ee55908dff652f6741dc0d9b91d9fe5ba451 100644 (file)
@@ -2,4 +2,6 @@ defmodule Wampex.Realm do
   @moduledoc false
 
   defstruct [:name, :authentication]
+
+  @type t :: %__MODULE__{}
 end
diff --git a/lib/wampex/role.ex b/lib/wampex/role.ex
new file mode 100644 (file)
index 0000000..886d4d6
--- /dev/null
@@ -0,0 +1,6 @@
+defmodule Wampex.Role do
+  @callback add(map()) :: map()
+  @callback handle(message :: Wampex.message(), data :: StatesLanguage.t()) ::
+              {StatesLanguage.t(), [:gen_statem.action()], integer() | nil,
+               Wampex.handle_response()}
+end
index 24b02eddab614bed0de8bebed15a4a6b6e6bc305..bb4ee5ee57b2fbd908dcbccfa88f25f2c89b3c60 100644 (file)
@@ -1,5 +1,10 @@
 defmodule Wampex.Role.Callee do
-  @moduledoc false
+  @moduledoc """
+  Handles requests and responses for a Callee
+  """
+
+  alias Wampex.Role
+  @behaviour Role
 
   @register 64
   @registered 65
@@ -8,6 +13,7 @@ defmodule Wampex.Role.Callee do
   @invocation 68
   @yield 70
 
+  @impl true
   def add(roles) do
     Map.put(roles, :callee, %{})
   end
@@ -32,24 +38,29 @@ defmodule Wampex.Role.Callee do
     [@yield, request_id, %{}, arg_l, arg_kw]
   end
 
+  @impl true
   def handle([@unregistered, request_id], sl) do
     {sl, [{:next_event, :internal, :noop}], request_id, {:ok, request_id}}
   end
 
+  @impl true
   def handle([@registered, request_id, id], sl) do
     {sl, [{:next_event, :internal, :noop}], request_id, {:ok, id}}
   end
 
+  @impl true
   def handle([@invocation, id, reg_id, dets], sl) do
     handle([@invocation, id, reg_id, dets, [], %{}], sl)
   end
 
+  @impl true
   def handle([@invocation, id, reg_id, dets, arg_l], sl) do
     handle([@invocation, id, dets, reg_id, arg_l, %{}], sl)
   end
 
-  def handle([@invocation, id, reg_id, _dets, arg_l, arg_kw], sl) do
+  @impl true
+  def handle([@invocation, id, reg_id, dets, arg_l, arg_kw], sl) do
     {sl, [{:next_event, :internal, :invocation}], id,
-     {:update, :invocation, {id, reg_id, arg_l, arg_kw}}}
+     {:update, :invocation, {:invocation, id, reg_id, dets, arg_l, arg_kw}}}
   end
 end
index 036db7de035487eaae92da9b9eacdb442cb41441..5e593d4a7714d4aab0b71b1659455f75a858b86e 100644 (file)
@@ -1,9 +1,15 @@
 defmodule Wampex.Role.Caller do
-  @moduledoc false
+  @moduledoc """
+  Handles requests and responses for a Caller
+  """
+
+  alias Wampex.Role
+  @behaviour Role
 
   @call 48
   @result 50
 
+  @impl true
   def add(roles) do
     Map.put(roles, :caller, %{})
   end
@@ -24,14 +30,17 @@ defmodule Wampex.Role.Caller do
     [@call, opts, procedure, args_l, args_kw]
   end
 
+  @impl true
   def handle([@result, id, dets], sl) do
     handle([@result, id, dets, [], %{}], sl)
   end
 
+  @impl true
   def handle([@result, id, dets, arg_l], sl) do
     handle([@result, id, dets, arg_l, %{}], sl)
   end
 
+  @impl true
   def handle([@result, id, _dets, arg_l, arg_kw], sl) do
     {sl, [{:next_event, :internal, :noop}], id, {:ok, arg_l, arg_kw}}
   end
index cce5847cb6e016fcea5d2497486795a9fc3cecad..a2109bc50d4d8b0450247b0a5b436c221199ee06 100644 (file)
@@ -1,41 +1,57 @@
 defmodule Wampex.Role.Peer do
-  @moduledoc false
+  @moduledoc """
+  Handles requests and responses for low-level Peer/Session interactions
+  """
   alias StatesLanguage, as: SL
   alias Wampex.Session
 
+  alias Wampex.Role
+  @behaviour Role
+
   @hello 1
   @welcome 2
   @abort 3
   @goodbye 6
   @error 8
 
+  @impl true
   def add(roles), do: roles
 
   def hello(realm, roles) do
     [@hello, realm, %{roles: Enum.reduce(roles, %{}, fn r, acc -> r.add(acc) end)}]
   end
 
+  def goodbye(reason) do
+    [@goodbye, %{}, reason]
+  end
+
+  @impl true
   def handle([@welcome, session_id, _dets], %SL{data: %Session{} = data} = sl) do
     {%SL{sl | data: %Session{data | id: session_id}}, [{:next_event, :internal, :noop}], nil,
      {:ok, session_id}}
   end
 
+  @impl true
   def handle([@abort, _dets, reason], sl) do
     {sl, [{:next_event, :internal, :abort}], nil, {:error, reason}}
   end
 
+  @impl true
   def handle([@goodbye, _dets, reason], sl) do
-    {sl, [{:next_event, :internal, :goodbye}], nil, {:goodbye, reason}}
+    {sl, [{:next_event, :internal, :goodbye}], nil, {:update, :goodbye, reason}}
   end
 
+  @impl true
   def handle([@error, type, id, dets, error], sl) do
     handle([@error, type, id, dets, error, [], %{}], sl)
   end
 
+  @impl true
   def handle([@error, type, id, dets, error, arg_l], sl) do
     handle([@error, type, id, dets, error, arg_l, %{}], sl)
   end
 
+  @impl true
   def handle([@error, type, id, _dets, error, arg_l, arg_kw], sl) do
     {sl, [{:next_event, :internal, :noop}], id, {:error, type, error, arg_l, arg_kw}}
   end
index 33b95bc6a8b78b59130e82f1fa005045291ee56b..b7f90e8a1a8eed18bd2061528cd3336555531cb5 100644 (file)
@@ -1,9 +1,15 @@
 defmodule Wampex.Role.Publisher do
-  @moduledoc false
+  @moduledoc """
+  Handles requests and responses for Publishers
+  """
+
+  alias Wampex.Role
+  @behaviour Role
 
   @publish 16
   @published 17
 
+  @impl true
   def add(roles) do
     Map.put(roles, :publisher, %{})
   end
@@ -24,6 +30,7 @@ defmodule Wampex.Role.Publisher do
     [@publish, opts, topic, args_l, args_kw]
   end
 
+  @impl true
   def handle([@published, request_id, id], sl) do
     {sl, [{:next_event, :internal, :noop}], request_id, {:ok, id}}
   end
index 09fe31ebee317ae042e37a8da744ddac510ca4a5..a5481b8d0f60c0fe6b223ac043a9d8f82ca75a0f 100644 (file)
@@ -1,5 +1,10 @@
 defmodule Wampex.Role.Subscriber do
-  @moduledoc false
+  @moduledoc """
+  Handles requests and responses for Subscribers
+  """
+
+  alias Wampex.Role
+  @behaviour Role
 
   @subscribe 32
   @subscribed 33
@@ -7,6 +12,7 @@ defmodule Wampex.Role.Subscriber do
   @unsubscribed 35
   @event 36
 
+  @impl true
   def add(roles) do
     Map.put(roles, :subscriber, %{})
   end
@@ -19,24 +25,29 @@ defmodule Wampex.Role.Subscriber do
     [@unsubscribe, sub_id]
   end
 
+  @impl true
   def handle([@subscribed, request_id, id], sl) do
     {sl, [{:next_event, :internal, :noop}], request_id, {:ok, id}}
   end
 
+  @impl true
   def handle([@unsubscribed, request_id], sl) do
     {sl, [{:next_event, :internal, :noop}], request_id, :ok}
   end
 
+  @impl true
   def handle([@event, sub_id, pub_id, dets], sl) do
     handle([@event, sub_id, pub_id, dets, [], %{}], sl)
   end
 
+  @impl true
   def handle([@event, sub_id, pub_id, dets, arg_l], sl) do
     handle([@event, sub_id, pub_id, dets, arg_l, %{}], sl)
   end
 
-  def handle([@event, sub_id, pub_id, _dets, arg_l, arg_kw], sl) do
+  @impl true
+  def handle([@event, sub_id, pub_id, dets, arg_l, arg_kw], sl) do
     {sl, [{:next_event, :internal, :event}], nil,
-     {:update, :event, {sub_id, pub_id, arg_l, arg_kw}}}
+     {:update, :event, {:event, sub_id, pub_id, dets, arg_l, arg_kw}}}
   end
 end
diff --git a/lib/wampex/serializer.ex b/lib/wampex/serializer.ex
new file mode 100644 (file)
index 0000000..b795919
--- /dev/null
@@ -0,0 +1,8 @@
+defmodule Wampex.Serializer do
+  @type data_type :: :binary | :text
+  @callback data_type() :: data_type()
+  @callback serialize!(data :: Wampex.message()) :: binary()
+  @callback serialize(data :: Wampex.message()) :: {:ok, binary()}
+  @callback deserialize!(data :: binary()) :: Wampex.message()
+  @callback deserialize(data :: binary()) :: {:ok, Wampex.message()}
+end
index 99b45db1af41ef8bad2ebf747203e0279447f583..accb3a2441f525b1601fa9420e480b3a945f00ea 100644 (file)
@@ -1,7 +1,14 @@
 defmodule Wampex.Serializer.JSON do
+  @behaviour Wampex.Serializer
+
+  @impl true
   def data_type, do: :text
+  @impl true
   def serialize!(data), do: Jason.encode!(data)
+  @impl true
   def serialize(data), do: Jason.encode(data)
+  @impl true
   def deserialize!(binary), do: Jason.decode!(binary)
+  @impl true
   def deserialize(binary), do: Jason.decode(binary)
 end
index 65078b54146bf5283bfcdbc7d780bc9ad6f36925..36bc6802abed8fc833a26a0151edfbdd561bc8a0 100644 (file)
@@ -1,13 +1,19 @@
 defmodule Wampex.Serializer.MessagePack do
+  @behaviour Wampex.Serializer
+
+  @impl true
   def data_type, do: :binary
+  @impl true
   def serialize!(data), do: :msgpack.pack(data, [])
-  def serialize(data), do: :msgpack.pack(data, [])
-
+  @impl true
+  def serialize(data), do: {:ok, :msgpack.pack(data, [])}
+  @impl true
   def deserialize!(data) do
     {:ok, res} = :msgpack.unpack(data, [{:known_atoms, []}, {:unpack_str, :as_binary}])
     res
   end
 
+  @impl true
   def deserialize(data),
     do: :msgpack.unpack(data, [{:known_atoms, []}, {:unpack_str, :as_binary}])
 end
index 9599d724d775f27d3e22e4ef78510fd19e63f14c..762335e5115a6caeb9330038198b6aae2c67c131 100644 (file)
@@ -4,10 +4,10 @@ defmodule Wampex.Session do
   @max_id 9_007_199_254_740_992
 
   alias StatesLanguage, as: SL
-  alias Wampex.{Messages, Realm}
-  alias Wampex.Role.{Callee, Peer}
-  alias Wampex.Serializer.JSON
-  alias Wampex.Session, as: Sess
+  alias Wampex.Realm
+  alias Wampex.Role.Peer
+  alias Wampex.Serializer.MessagePack
+  alias __MODULE__, as: Sess
   alias Wampex.Transport.WebSocket
 
   @yield 70
@@ -19,33 +19,58 @@ defmodule Wampex.Session do
     :message,
     :event,
     :invocation,
+    :goodbye,
     :realm,
     :name,
-    roles: [],
-    request_id: 0,
-    transport: WebSocket,
-    protocol: "wamp.2.json",
-    serializer: JSON,
+    :roles,
+    :request_id,
+    :transport,
+    :protocol,
+    :serializer,
     requests: []
   ]
 
-  def get_request_id(current_id) when current_id == @max_id do
-    1
-  end
-
-  def get_request_id(current_id) do
-    current_id + 1
+  @type t :: %__MODULE__{}
+
+  @spec new(
+          url :: binary(),
+          realm :: Realm.t(),
+          roles :: [module()],
+          transport :: module(),
+          protocol :: binary(),
+          serializer :: module()
+        ) :: t()
+  def new(
+        url,
+        %Realm{} = realm,
+        roles,
+        transport \\ WebSocket,
+        protocol \\ "wamp.2.msgpack",
+        serializer \\ MessagePack
+      ) do
+    %__MODULE__{
+      url: url,
+      realm: realm,
+      roles: roles,
+      transport: transport,
+      protocol: protocol,
+      serializer: serializer,
+      request_id: 0
+    }
   end
 
+  @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message()) :: :ok
   def cast_send_request(p, request) do
     __MODULE__.cast(p, {:send_request, request})
   end
 
+  @spec send_request(name :: atom() | pid(), request :: Wampex.message(), timeout :: integer()) ::
+          term()
   def send_request(p, request, timeout) do
     __MODULE__.call(p, {:send_request, request}, timeout)
   end
 
-  def do_send(r_id, tt, t, request) do
+  defp do_send(r_id, tt, t, request) do
     request_id = get_request_id(r_id)
 
     request =
@@ -58,6 +83,15 @@ defmodule Wampex.Session do
     request_id
   end
 
+  defp get_request_id(current_id) when current_id == @max_id do
+    1
+  end
+
+  defp get_request_id(current_id) do
+    current_id + 1
+  end
+
+  @impl true
   def handle_call(
         {:send_request, request},
         from,
@@ -77,6 +111,7 @@ defmodule Wampex.Session do
      }, []}
   end
 
+  @impl true
   def handle_cast(
         {:send_request, request},
         _,
@@ -91,6 +126,7 @@ defmodule Wampex.Session do
      }, []}
   end
 
+  @impl true
   def handle_resource(
         "Hello",
         _,
@@ -103,35 +139,39 @@ defmodule Wampex.Session do
     {:ok, data, [{:next_event, :internal, :hello_sent}]}
   end
 
+  @impl true
   def handle_resource(
         "GoodBye",
         _,
         "GoodBye",
-        %SL{data: %Sess{transport: tt, transport_pid: t}} = data
+        %SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data
       ) do
-    tt.send_message(t, [6, %{}, "i.dont.know"])
+    tt.send_message(t, Peer.goodbye(goodbye))
     {:ok, data, []}
   end
 
+  @impl true
   def handle_resource(
         "HandleMessage",
         _,
         _,
         %SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data
       ) do
-    Logger.info("Handling Message #{inspect(msg)}")
-    {data, actions, id, response} = Messages.handle(msg, data, roles)
+    Logger.debug("Handling Message #{inspect(msg)}")
+    {data, actions, id, response} = handle_message(msg, data, roles)
     {data, response} = maybe_update_response(data, response)
     {requests, actions} = resp = handle_response(id, actions, requests, response)
-    Logger.info("Response: #{inspect(resp)}")
+    Logger.debug("Response: #{inspect(resp)}")
     {:ok, %SL{data | data: %Sess{data.data | requests: requests}}, actions}
   end
 
-  def handle_resource("Abort", _, _, data) do
+  @impl true
+  def handle_resource("Abort", _, "Abort", data) do
     Logger.info("Aborting...")
     {:ok, data, []}
   end
 
+  @impl true
   def handle_resource(
         "InitTransport",
         _,
@@ -142,11 +182,13 @@ defmodule Wampex.Session do
     {:ok, %SL{data | data: %Sess{data.data | roles: [Peer | data.data.roles]}}, []}
   end
 
+  @impl true
   def handle_resource("Established", _, "Established", data) do
     Logger.info("Session #{data.data.id}")
     {:ok, data, []}
   end
 
+  @impl true
   def handle_resource(
         "HandleEvent",
         _,
@@ -157,13 +199,14 @@ defmodule Wampex.Session do
 
     sub = Wampex.subscriber_registry_name(name)
 
-    Registry.dispatch(sub, elem(event, 0), fn entries ->
-      for {pid, topic} <- entries, do: send(pid, {:event, event, topic})
+    Registry.dispatch(sub, elem(event, 1), fn entries ->
+      for {pid, topic} <- entries, do: send(pid, event)
     end)
 
     {:ok, sl, [{:next_event, :internal, :transition}]}
   end
 
+  @impl true
   def handle_resource(
         "HandleInvocation",
         _,
@@ -173,32 +216,34 @@ defmodule Wampex.Session do
     Logger.info("Received Invocation #{inspect(invocation)}")
     reg = Wampex.callee_registry_name(name)
 
-    Registry.dispatch(reg, elem(invocation, 1), fn entries ->
-      for {pid, procedure} <- entries, do: send(pid, {:invocation, invocation, procedure})
+    Registry.dispatch(reg, elem(invocation, 2), fn entries ->
+      for {pid, procedure} <- entries, do: send(pid, invocation)
     end)
 
     {:ok, sl, [{:next_event, :internal, :transition}]}
   end
 
+  @impl true
   def handle_resource(resource, _, state, data) do
     Logger.info("No specific resource handler for #{resource} in state #{state}")
     {:ok, data, []}
   end
 
+  @impl true
   def handle_info({:set_message, message}, "Established", %SL{data: %Sess{} = sess} = data) do
     {:ok, %SL{data | data: %Sess{sess | message: message}},
      [{:next_event, :internal, :message_received}]}
   end
 
-  def maybe_update_response(data, {:update, key, resp}) do
+  defp maybe_update_response(data, {:update, key, resp}) do
     {%SL{data | data: Map.put(data.data, key, resp)}, resp}
   end
 
-  def maybe_update_response(data, resp), do: {data, resp}
+  defp maybe_update_response(data, resp), do: {data, resp}
 
-  def handle_response(nil, actions, requests, _), do: {requests, actions}
+  defp handle_response(nil, actions, requests, _), do: {requests, actions}
 
-  def handle_response(id, actions, requests, response) do
+  defp handle_response(id, actions, requests, response) do
     Enum.reduce_while(requests, {requests, actions}, fn
       {^id, nil}, _ ->
         {:halt, {remove_request(id, requests), actions}}
@@ -211,10 +256,21 @@ defmodule Wampex.Session do
     end)
   end
 
-  def remove_request(id, requests) do
+  defp remove_request(id, requests) do
     Enum.filter(requests, fn
       {^id, _} -> false
       {_id, _} -> true
     end)
   end
+
+  defp handle_message(msg, data, roles) do
+    Enum.reduce_while(roles, nil, fn r, _ ->
+      try do
+        res = r.handle(msg, data)
+        {:halt, res}
+      rescue
+        FunctionClauseError -> {:cont, nil}
+      end
+    end)
+  end
 end
diff --git a/lib/wampex/transport.ex b/lib/wampex/transport.ex
new file mode 100644 (file)
index 0000000..c8d8d7a
--- /dev/null
@@ -0,0 +1,12 @@
+defmodule Wampex.Transport do
+  @callback send_request(atom() | pid(), Wampex.message()) :: :ok
+  @callback start_link(
+              url: binary(),
+              session: Wampex.Session.t(),
+              protocol: binary(),
+              serializer: module(),
+              opts: []
+            ) ::
+              {:ok, pid}
+              | {:error, term}
+end
index 61e6a8b3cb633f6459c37d07f5fe1ae8b3ef40d8..bd443817b2125a35410d7f58ba36f1732ffbf63d 100644 (file)
@@ -1,6 +1,8 @@
 defmodule Wampex.Transport.WebSocket do
   use WebSockex
 
+  @behaviour Wampex.Transport
+
   alias WebSockex.Conn
 
   require Logger
@@ -8,14 +10,12 @@ defmodule Wampex.Transport.WebSocket do
   @header "Sec-WebSocket-Protocol"
   @ping_interval 20_000
 
+  @impl true
   def send_request(t, data) do
     WebSockex.cast(t, {:send_request, data})
   end
 
-  def connected?(t, resp) do
-    WebSockex.cast(t, {:connected, resp})
-  end
-
+  @impl true
   def start_link(
         url: url,
         session: session,
@@ -32,6 +32,7 @@ defmodule Wampex.Transport.WebSocket do
     )
   end
 
+  @impl true
   def handle_connect(%Conn{host: h, port: p, path: pa, query: q} = _conn, state) do
     Logger.info("Connected to #{h}:#{p}#{pa}?#{q}")
     Process.send_after(self(), :ping, @ping_interval)
@@ -39,29 +40,35 @@ defmodule Wampex.Transport.WebSocket do
     {:ok, %{state | connected: true}}
   end
 
+  @impl true
   def handle_ping(_ping, state) do
     {:reply, :pong, state}
   end
 
+  @impl true
   def handle_pong(_pong, state) do
     {:ok, state}
   end
 
+  @impl true
   def handle_cast({:send_request, data}, %{serializer: s} = state) do
     Logger.info("Sending Request #{inspect(data)}")
     {:reply, {s.data_type(), s.serialize!(data)}, state}
   end
 
+  @impl true
   def handle_cast({:connected, resp}, state) do
     send(resp, {:connected, state.connected})
     {:ok, state}
   end
 
+  @impl true
   def handle_info(:ping, state) do
     Process.send_after(self(), :ping, @ping_interval)
     {:reply, :ping, state}
   end
 
+  @impl true
   def handle_frame({type, message}, %{serializer: s} = state) do
     message = s.deserialize!(message)
     Logger.info("Received #{type} data: #{inspect(message)}")
diff --git a/mix.exs b/mix.exs
index 071b1917824491f42784b23f856f003878c62361..659f1181c6ac1548e92b33560da8e2912380172f 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -11,22 +11,19 @@ defmodule Wampex.MixProject do
     ]
   end
 
-  # Run "mix help compile.app" to learn about applications.
   def application do
     [
       extra_applications: [:logger]
     ]
   end
 
-  # Run "mix help deps" to learn about dependencies.
   defp deps do
     [
+      {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
       {:jason, "~> 1.1"},
       {:msgpack, "~> 0.7.0"},
       {:states_language, "~> 0.2"},
       {:websockex, "~> 0.4"}
-      # {:dep_from_hexpm, "~> 0.3.0"},
-      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
     ]
   end
 end
index b73774c1cf866870d8b3dfe2bc506e913a24e48d..d4b8cf1da3733252397d307b121e7c7a61685f5e 100644 (file)
--- a/mix.lock
+++ b/mix.lock
@@ -1,5 +1,6 @@
 %{
   "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"},
+  "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"},
   "elixpath": {:hex, :elixpath, "0.1.0", "f860e931db7bda6856dc68145694ca429643cc068ef30d7ff6b4096d4357963e", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "30ce06079b41f1f5216ea2cd11605cfe4c82239628555cb3fde9f10055a6eb67"},
   "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},
   "json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"},
index 7cb0f50cc1f6ffcd74f96c72a8ed7b57360695cc..fca4d9d677cd4856f42d2fba3759d7df4882c001 100644 (file)
           "StringEquals": ":event",
           "Next": "Event"
         },
-        {
-          "StringEquals": ":published",
-          "Next": "Published"
-        },
-        {
-          "StringEquals": ":unsubscribed",
-          "Next": "Unsubscribed"
-        },
-        {
-          "StringEquals": ":subscribed",
-          "Next": "Subscribed"
-        },
-        {
-          "StringEquals": ":result",
-          "Next": "Result"
-        },
-        {
-          "StringEquals": ":registered",
-          "Next": "Registered"
-        },
-        {
-          "StringEquals": ":unregistered",
-          "Next": "Unregistered"
-        },
         {
           "StringEquals": ":invocation",
           "Next": "Invocation"
       "Resource": "HandleEvent",
       "Next": "Established"
     },
-    "Subscribed": {
-      "Type": "Task",
-      "Resource": "HandleSubscribed",
-      "Next": "Established"
-    },
-    "Published": {
-      "Type": "Task",
-      "Resource": "HandlePublished",
-      "Next": "Established"
-    },
-    "Unsubscribed": {
-      "Type": "Task",
-      "Resource": "HandleUnsubscribed",
-      "Next": "Established"
-    },
-    "Result": {
-      "Type": "Task",
-      "Resource": "HandleResult",
-      "Next": "Established"
-    },
-    "Registered": {
-      "Type": "Task",
-      "Resource": "HandleRegistered",
-      "Next": "Established"
-    },
-    "Unregistered": {
-      "Type": "Task",
-      "Resource": "HandleUnregistered",
-      "Next": "Established"
-    },
     "Invocation": {
       "Type": "Task",
       "Resource": "HandleInvocation",