]> Entropealabs - wampex.git/commitdiff
use keyspace for ClusterKV
authorChristopher <chris@entropealabs.com>
Tue, 17 Mar 2020 17:48:17 +0000 (12:48 -0500)
committerChristopher <chris@entropealabs.com>
Tue, 17 Mar 2020 17:48:17 +0000 (12:48 -0500)
lib/roles/peer.ex
lib/router/session.ex
lib/wampex.ex

index 13c581e1a9342e312519430692c5f3f4d050596c..8bb79111f21eafbdbd868f2e39bb1db2bbb896b2 100644 (file)
@@ -33,7 +33,7 @@ defmodule Wampex.Roles.Peer do
 
     @type t :: %__MODULE__{
             session_id: integer(),
-            options: %{}
+            options: map()
           }
   end
 
index db54c401fba9d482c9c42ff16be670a177ff9b9c..f053c7ab927926f6e97ad3d7b55f010aca07830b 100644 (file)
@@ -90,12 +90,11 @@ defmodule Wampex.Router.Session do
   @register "Register"
   @unregister "Unregister"
   @subscribe "Subscribe"
-  @unsubscribe "Unsubscribe"
+  @unsubscribe "Unsubscribe"
   @publish "Publish"
-  @error "Error"
+  @error "Error"
   @abort "Abort"
   @goodbye "Goodbye"
-  @event "Cancel"
 
   ## Resources
   @handle_init "HandleInit"
@@ -108,12 +107,12 @@ defmodule Wampex.Router.Session do
   @handle_register "HandleRegister"
   @handle_unregister "HandleUnregister"
   @handle_subscribe "HandleSubscribe"
-  @handle_unsubscribe "HandleUnsubscribe"
+  @handle_unsubscribe "HandleUnsubscribe"
   @handle_publish "HandlePublish"
-  @handle_interrupt "HandleError"
+  # @handle_interrupt "HandleInterrupt"
   @handle_abort "HandleAbort"
   @handle_goodbye "HandleGoodbye"
-  @handle_cancel "HandleCancel"
+  @handle_cancel "HandleCancel"
 
   @impl true
   def handle_resource(
@@ -225,16 +224,16 @@ defmodule Wampex.Router.Session do
         @authenticate,
         %SL{
           data: %Sess{
+            id: session_id,
             transport: tt,
             transport_pid: t,
             authentication: auth,
             challenge: challenge,
-            authenticate: {:authenticate, sig, dets}
+            authenticate: {:authenticate, sig, _dets}
           }
         } = sl
       ) do
     ch = JSON.deserialize!(challenge.challenge)
-    session_id = get_in(ch, ["session"])
     authid = get_in(ch, ["authid"])
     authrole = get_in(ch, ["authrole"])
     authmethod = get_in(ch, ["authmethod"])
@@ -308,18 +307,13 @@ defmodule Wampex.Router.Session do
         } = sl
       ) do
     id = get_global_id()
-    key = "#{realm}:#{topic}"
-
-    debug("Handling Subscription #{inspect(key)} #{inspect(opts)}")
 
     case opts do
       %{"match" => "wildcard"} ->
-        Logger.info("Putting wildcard")
         ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "")
 
       _ ->
-        Logger.info("Putting exact")
-        ClusterKV.put(db, key, {id, {self(), Node.self()}})
+        ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}})
     end
 
     send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t)
@@ -345,16 +339,14 @@ defmodule Wampex.Router.Session do
         } = sl
       ) do
     pub_id = get_global_id()
-    key = "#{realm}:#{topic}"
 
     {_, _, subs} =
-      {db, key, []}
+      {db, {realm, topic}, []}
       |> get_subscribers()
       |> get_prefix()
-      |> get_wildcard(realm)
+      |> get_wildcard()
 
     subs = Enum.uniq(subs)
-    Logger.info("Subscribers: #{inspect(subs)}")
 
     Enum.each(subs, fn {id, {pid, node}} ->
       send(
@@ -392,11 +384,12 @@ defmodule Wampex.Router.Session do
               transport: tt,
               transport_pid: t,
               registrations: regs,
+              realm: realm,
               unregister: {:unregister, request_id, registration_id}
             } = data
         } = sl
       ) do
-    regs = remove_registration(regs, registration_id, db)
+    regs = remove_registration(regs, realm, registration_id, db)
 
     send_to_peer(Dealer.unregistered(%Unregistered{request_id: request_id}), tt, t)
     {:ok, %SL{sl | data: %Sess{data | registrations: regs}}}
@@ -415,14 +408,12 @@ defmodule Wampex.Router.Session do
               registrations: regs,
               db: db,
               realm: realm,
-              register: {:register, rid, opts, procedure}
+              register: {:register, rid, _opts, procedure}
             } = data
         } = sl
       ) do
-    key = "#{realm}:#{procedure}"
-    debug("Handling Registration #{inspect(key)}")
     id = get_global_id()
-    ClusterKV.put(db, key, {id, {self(), Node.self()}})
+    ClusterKV.put(db, realm, procedure, {id, {self(), Node.self()}})
     send_to_peer(Dealer.registered(%Registered{request_id: rid, registration_id: id}), tt, t)
 
     {:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}},
@@ -439,16 +430,15 @@ defmodule Wampex.Router.Session do
             %Sess{
               db: db,
               proxy: proxy,
-              transport: tt,
-              transport_pid: t,
+              transport: _tt,
+              transport_pid: _t,
               realm: realm,
               request_id: ri,
               call: {:call, call_id, dets, proc, al, akw}
             } = data
         } = sl
       ) do
-    key = "#{realm}:#{proc}"
-    {_key, callees} = ClusterKV.get(db, key, 500)
+    {_key, callees} = ClusterKV.get(db, realm, proc, 500)
 
     {data, actions} =
       case get_live_callee(proxy, callees) do
@@ -548,7 +538,6 @@ defmodule Wampex.Router.Session do
         _,
         %SL{data: %Sess{transport: tt, transport_pid: t, invocations: inv}} = data
       ) do
-    Logger.info("Received Invocation #{inspect(e)}")
     send_to_peer(Dealer.invocation(e), tt, t)
     {:ok, %SL{data | data: %Sess{data.data | invocations: [{call_id, e, from} | inv]}}, []}
   end
@@ -573,7 +562,7 @@ defmodule Wampex.Router.Session do
 
   @impl true
   def handle_info(event, state, data) do
-    Logger.info("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}")
+    Logger.warn("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}")
     {:ok, data, []}
   end
 
@@ -584,11 +573,14 @@ defmodule Wampex.Router.Session do
           transport: tt,
           transport_pid: t,
           registrations: regs,
-          subscriptions: subs
+          realm: realm,
+          subscriptions: _subs
         }
       }) do
-    Enum.each(regs, fn {id, proc} ->
-      remove_registration(regs, id, db)
+    Logger.warn("Router session terminating #{inspect(reason)}")
+
+    Enum.each(regs, fn {id, _proc} ->
+      remove_registration(regs, realm, id, db)
     end)
 
     send_to_peer(
@@ -598,11 +590,11 @@ defmodule Wampex.Router.Session do
     )
   end
 
-  defp remove_registration(regs, registration_id, db) do
+  defp remove_registration(regs, realm, registration_id, db) do
     {id, proc} =
       reg =
       Enum.find(regs, fn
-        {^registration_id, proc} -> true
+        {^registration_id, _proc} -> true
         _ -> false
       end)
 
@@ -610,11 +602,11 @@ defmodule Wampex.Router.Session do
       {id, List.delete(values, value)}
     end
 
-    ClusterKV.update(db, proc, {id, {self(), Node.self()}}, filter)
+    ClusterKV.update(db, realm, proc, {id, {self(), Node.self()}}, filter)
     List.delete(regs, reg)
   end
 
-  defp get_live_callee(proxy, []), do: {:error, :no_live_callees}
+  defp get_live_callee(_proxy, []), do: {:error, :no_live_callees}
 
   defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do
     case GenServer.call({proxy, node}, {:is_up, pid}) do
@@ -623,34 +615,21 @@ defmodule Wampex.Router.Session do
     end
   end
 
-  defp get_subscribers({db, key, acc}) do
-    case ClusterKV.get(db, key, 500) do
-      {_, subscribers} -> {db, key, acc ++ subscribers}
-      _ -> {db, key, acc}
+  defp get_subscribers({db, {keyspace, key}, acc}) do
+    case ClusterKV.get(db, keyspace, key, 500) do
+      {_, subscribers} -> {db, {keyspace, key}, acc ++ subscribers}
+      _ -> {db, {keyspace, key}, acc}
     end
   end
 
-  defp get_prefix({db, key, acc}) do
-    case ClusterKV.prefix(db, key, ".", 2, 500) do
-      {_, subscribers} ->
-        {db, key, acc ++ subscribers}
-
-      l when is_list(l) ->
-        {db, key, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)}
-
-      _ ->
-        {db, key, acc}
-    end
+  defp get_prefix({db, {keyspace, key}, acc}) do
+    l = ClusterKV.prefix(db, keyspace, key, ".", 2, 500)
+    {db, {keyspace, key}, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)}
   end
 
-  defp get_wildcard({db, key, acc}, realm) do
-    case ClusterKV.wildcard(db, realm, key, ".", ":", "") do
-      l when is_list(l) ->
-        {db, key, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)}
-
-      _ ->
-        {db, key, acc}
-    end
+  defp get_wildcard({db, {keyspace, key}, acc}) do
+    l = ClusterKV.wildcard(db, keyspace, key, ".", ":", "")
+    {db, {keyspace, key}, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)}
   end
 
   defp send_to_peer(msg, transport, pid) do
index 4bbddba389606efb12480a42ee5697abf479e0cd..7b0565e1b9bd3a879298dcb6b3487672e6ad69bb 100644 (file)
@@ -26,14 +26,13 @@ defmodule Wampex do
   @type register ::
           {:register, request_id :: integer(), details :: map(), procedure :: String.t()}
 
-  @type unregister ::
-          {:unregister, request_id :: integer(), id :: integer()}
+  @type unregister :: {:unregister, request_id :: integer(), registration_id :: integer()}
 
   @type unsubscribe ::
           {:unsubscribe, request_id :: integer(), id :: integer()}
 
   @type subscribe ::
-          {:subscribe, request_id :: integer(), topic :: String.t(), details :: map()}
+          {:subscribe, request_id :: integer(), details :: map(), topic :: String.t()}
   @type invocation ::
           {:invocation, request_id :: integer(), registration_id :: integer(), details :: map(),
            arg_list :: arg_list(), arg_keywords :: arg_keyword()}
@@ -44,9 +43,8 @@ defmodule Wampex do
           {:error, reason :: binary()}
           | {:error, type :: integer(), error :: binary(), arg_list :: arg_list(),
              arg_keyword :: arg_keyword()}
-  @type handle_response ::
-          {:ok, integer()}
-          | publish()
+  @type messages ::
+          publish()
           | hello()
           | authenticate()
           | unsubscribe()
@@ -56,7 +54,11 @@ defmodule Wampex do
           | unregister()
           | call()
           | yield()
-          | {:ok, details :: map(), arg_list :: arg_list(), arg_keyword :: arg_keyword()}
           | error()
           | event()
+
+  @type handle_response ::
+          {:ok, integer()}
+          | {:ok, details :: map(), arg_list :: arg_list(), arg_keyword :: arg_keyword()}
+          | {:update, atom(), messages()}
 end