]> Entropealabs - wampex_router.git/commitdiff
move realm sepcific logic to it's own module
authorChristopher <chris@entropealabs.com>
Sat, 21 Mar 2020 19:15:18 +0000 (14:15 -0500)
committerChristopher <chris@entropealabs.com>
Sat, 21 Mar 2020 19:15:18 +0000 (14:15 -0500)
lib/router/admin.ex
lib/router/realms/session.ex [new file with mode: 0644]
lib/router/session.ex
lib/router/transports/web_socket.ex

index 77e9c2c73e3cd7cf2c65ad48f7b1e86e15a0dbe4..16204d9211e32a5ddbc10b934fa6a207d777f431 100644 (file)
@@ -5,7 +5,8 @@ defmodule Wampex.Router.Admin do
   alias Wampex.Roles.Dealer.{Invocation, Result}
   alias Wampex.Roles.Peer.Error
   alias Wampex.Router.Authentication.{Peer, Realm}
-  alias Wampex.Router.{Realms, Session}
+  alias Wampex.Router.Realms
+  alias Realms.Session, as: RealmSession
 
   @procedures [
     "admin.create_peer",
@@ -29,8 +30,8 @@ defmodule Wampex.Router.Admin do
     Logger.info("Registering admin procedures: #{inspect(@procedures)}")
 
     Enum.reduce(@procedures, regs, fn proc, acc ->
-      val = {Session.get_global_id(), {self(), Node.self()}}
-      ClusterKV.put(db, realm, proc, val)
+      val = {RealmSession.get_id(), {self(), Node.self()}}
+      RealmSession.register(db, realm, proc, val)
       [{proc, val} | acc]
     end)
   end
diff --git a/lib/router/realms/session.ex b/lib/router/realms/session.ex
new file mode 100644 (file)
index 0000000..31aacbd
--- /dev/null
@@ -0,0 +1,123 @@
+defmodule Wampex.Router.Realms.Session do
+  @moduledoc false
+
+  @max_id 9_007_199_254_740_992
+
+  def register(db, realm, procedure, val) do
+    ClusterKV.put(db, realm, procedure, val)
+    ClusterKV.put(db, realm, "#{procedure}:callee_index", 0)
+  end
+
+  def unregister(db, realm, {id, _from} = value, regs) do
+    {_id, proc} = reg = get_registration(id, regs)
+    ClusterKV.update(db, realm, proc, value, &upsert_remove/2)
+    List.delete(regs, reg)
+  end
+
+  def callees(db, realm, procedure) do
+    with {_key, callees} <- ClusterKV.get(db, realm, procedure, 500),
+         {_key, [callee_index]} <- ClusterKV.get(db, realm, "#{procedure}:callee_index") do
+      {callees, callee_index}
+    end
+  end
+
+  def round_robin(db, realm, procedure, len) do
+    ClusterKV.update(db, realm, "#{procedure}:callee_index", len, &update_round_robin/2)
+  end
+
+  def subscribe(db, realm, topic, val, %{"match" => "wildcard"}) do
+    put_wildcard(db, realm, topic, val)
+    true
+  end
+
+  def subscribe(db, realm, topic, val, _) do
+    ClusterKV.put(db, realm, topic, val)
+    false
+  end
+
+  def unsubscribe(db, realm, {id, _from} = value, subscriptions) do
+    {_id, topic, wc} = sub = get_subscription(id, subscriptions)
+
+    {key, filter} =
+      case wc do
+        true ->
+          key = get_wildcard_key(topic)
+          {key, &upsert_remove/2}
+
+        false ->
+          {topic, &upsert_remove/2}
+      end
+
+    ClusterKV.update(db, realm, key, value, filter)
+    remove_val(sub, subscriptions)
+  end
+
+  def subscriptions(db, realm, topic) do
+    {_, _, subs} =
+      {db, {realm, topic}, []}
+      |> get_subscribers()
+      |> get_prefix()
+      |> get_wildcard()
+
+    Enum.uniq(subs)
+  end
+
+  def get_id do
+    Enum.random(0..@max_id)
+  end
+
+  def get_request_id(current_id) when current_id == @max_id do
+    1
+  end
+
+  def get_request_id(current_id) do
+    current_id + 1
+  end
+
+  defp update_round_robin({id, [values]}, value) do
+    case values + 1 do
+      ni when ni < value -> {id, [ni]}
+      _ -> {id, [0]}
+    end
+  end
+
+  defp get_subscribers({db, {keyspace, key}, acc}) do
+    case ClusterKV.get(db, keyspace, key, 500) do
+      {_, subscribers} -> {db, {keyspace, key}, acc ++ subscribers}
+      _ -> {db, {keyspace, key}, acc}
+    end
+  end
+
+  defp get_prefix({db, {keyspace, key}, acc}) do
+    l = ClusterKV.prefix(db, keyspace, key, ".", 2, 500)
+    {db, {keyspace, key}, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)}
+  end
+
+  defp get_wildcard({db, {keyspace, key}, acc}) do
+    l = ClusterKV.wildcard(db, keyspace, key, ".", ":", "")
+    {db, {keyspace, key}, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)}
+  end
+
+  defp get_registration(id, regs) do
+    Enum.find(regs, fn
+      {^id, _proc} -> true
+      _ -> false
+    end)
+  end
+
+  defp get_subscription(id, subscriptions) do
+    Enum.find(subscriptions, fn
+      {^id, _topic, _} -> true
+      _ -> false
+    end)
+  end
+
+  defp remove_val(val, values) do
+    List.delete(values, val)
+  end
+
+  defp upsert_remove({id, values}, value), do: {id, remove_val(value, values)}
+
+  defp put_wildcard(db, realm, topic, val), do: ClusterKV.put_wildcard(db, realm, topic, val, ".", ":", "")
+  defp get_wildcard_key(topic), do: ClusterKV.get_wildcard_key(topic, ".", ":", "")
+end
index 8406778ba4d00f56913342eba510e7825c5f7710..4dede66bc97a0a506285edf750faa88a2183a78b 100644 (file)
@@ -4,8 +4,6 @@ defmodule Wampex.Router.Session do
   """
   use StatesLanguage, data: "priv/router.json"
 
-  @max_id 9_007_199_254_740_992
-
   alias __MODULE__, as: Sess
   alias StatesLanguage, as: SL
   alias Wampex.Roles.{Broker, Caller, Dealer, Peer}
@@ -14,6 +12,7 @@ defmodule Wampex.Router.Session do
   alias Broker.{Subscribed, Published, Event, Unsubscribed}
   alias Dealer.{Invocation, Registered, Result, Unregistered}
   alias Router.{Authentication, Realms}
+  alias Realms.Session, as: RealmSession
 
   @enforce_keys [:transport, :transport_pid]
   defstruct [
@@ -110,11 +109,7 @@ defmodule Wampex.Router.Session do
   # @handle_interrupt "HandleInterrupt"
   @handle_abort "HandleAbort"
   @handle_goodbye "HandleGoodbye"
-  # @handle_cancel "HandleCancel"
-
-  def get_global_id do
-    Enum.random(0..@max_id)
-  end
+  # @handle_cancel "HandleCancel" 
 
   @impl true
   def handle_resource(
@@ -125,7 +120,7 @@ defmodule Wampex.Router.Session do
       ) do
     debug("Init")
 
-    {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}}, [{:next_event, :internal, :transition}]}
+    {:ok, %SL{data | data: %Sess{data.data | id: RealmSession.get_id()}}, [{:next_event, :internal, :transition}]}
   end
 
   @impl true
@@ -258,18 +253,9 @@ defmodule Wampex.Router.Session do
             } = data
         } = sl
       ) do
-    id = get_global_id()
+    id = RealmSession.get_id()
 
-    wc =
-      case opts do
-        %{"match" => "wildcard"} ->
-          ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "")
-          true
-
-        _ ->
-          ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}})
-          false
-      end
+    wc = RealmSession.subscribe(db, realm, topic, {id, {self(), Node.self()}}, opts)
 
     send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t)
 
@@ -293,7 +279,7 @@ defmodule Wampex.Router.Session do
           }
         } = sl
       ) do
-    subs = remove_subscription(subs, subscription_id, realm, db)
+    subs = RealmSession.unsubscribe(db, realm, {subscription_id, {self(), Node.self()}}, subs)
 
     send_to_peer(Broker.unsubscribed(%Unsubscribed{request_id: rid}), tt, t)
 
@@ -316,15 +302,9 @@ defmodule Wampex.Router.Session do
           }
         } = sl
       ) do
-    pub_id = get_global_id()
+    pub_id = RealmSession.get_id()
 
-    {_, _, subs} =
-      {db, {realm, topic}, []}
-      |> get_subscribers()
-      |> get_prefix()
-      |> get_wildcard()
-
-    subs = Enum.uniq(subs)
+    subs = RealmSession.subscriptions(db, realm, topic)
 
     Enum.each(subs, fn {id, {pid, node}} ->
       send(
@@ -367,7 +347,7 @@ defmodule Wampex.Router.Session do
             } = data
         } = sl
       ) do
-    regs = remove_registration(regs, realm, registration_id, db)
+    regs = RealmSession.unregister(db, realm, {registration_id, {self(), Node.self()}}, regs)
 
     send_to_peer(Dealer.unregistered(%Unregistered{request_id: request_id}), tt, t)
     {:ok, %SL{sl | data: %Sess{data | registrations: regs}}}
@@ -390,8 +370,8 @@ defmodule Wampex.Router.Session do
             } = data
         } = sl
       ) do
-    id = get_global_id()
-    ClusterKV.put(db, realm, procedure, {id, {self(), Node.self()}})
+    id = RealmSession.get_id()
+    RealmSession.register(db, realm, procedure, {id, {self(), Node.self()}})
     regd = %Registered{request_id: rid, registration_id: id}
     send_to_peer(Dealer.registered(regd), tt, t)
 
@@ -418,9 +398,9 @@ defmodule Wampex.Router.Session do
         } = sl
       ) do
     data =
-      with {_key, callees} <- ClusterKV.get(db, realm, proc, 500),
-           {id, {pid, node}} <- get_live_callee(proxy, callees) do
-        req_id = get_request_id(ri)
+      with {callees, index} <- RealmSession.callees(db, realm, proc),
+           {id, {pid, node}} <- get_live_callee(proxy, callees, index, 3) do
+        req_id = RealmSession.get_request_id(ri)
         dets = Map.put(dets, "procedure", proc)
 
         send(
@@ -441,6 +421,8 @@ defmodule Wampex.Router.Session do
           }
         )
 
+        RealmSession.round_robin(db, realm, proc, length(callees))
+
         %SL{sl | data: %Sess{data | request_id: req_id}}
       else
         {:error, :no_live_callees} ->
@@ -562,6 +544,12 @@ defmodule Wampex.Router.Session do
     {:ok, %SL{data | data: %Sess{sess | message: message}}, [{:next_event, :internal, :message_received}]}
   end
 
+  @impl true
+  def handle_info({:EXIT, t, reason}, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
+    debug("Transport #{tt} exited because #{inspect(reason)}: Shutting down")
+    {:ok, data, [{:next_event, :internal, :abort}]}
+  end
+
   @impl true
   def handle_info(event, state, data) do
     Logger.warn("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}")
@@ -579,14 +567,14 @@ defmodule Wampex.Router.Session do
           subscriptions: subs
         }
       }) do
-    Logger.warn("Router session terminating #{inspect(reason)}")
+    Logger.debug("Router session terminating #{inspect(reason)}")
 
     Enum.each(regs, fn {id, _proc} ->
-      remove_registration(regs, realm, id, db)
+      RealmSession.unregister(db, realm, {id, {self(), Node.self()}}, regs)
     end)
 
     Enum.each(subs, fn {id, _proc, _} ->
-      remove_subscription(subs, id, realm, db)
+      RealmSession.unsubscribe(db, realm, {id, {self(), Node.self()}}, subs)
     end)
 
     send_to_peer(
@@ -655,79 +643,24 @@ defmodule Wampex.Router.Session do
     end
   end
 
-  defp remove_subscription(subs, subscription_id, realm, db) do
-    {id, topic, wc} =
-      sub =
-      Enum.find(subs, fn
-        {^subscription_id, _topic, _} -> true
-        _ -> false
-      end)
-
-    {key, filter} =
-      case wc do
-        true ->
-          key = ClusterKV.get_wildcard_key(topic, ".", ":", "")
-
-          {key,
-           fn {id, values}, value ->
-             {id,
-              Enum.filter(values, fn
-                {_, ^value} -> false
-                _ -> true
-              end)}
-           end}
-
-        false ->
-          {topic,
-           fn {id, values}, value ->
-             {id, List.delete(values, value)}
-           end}
-      end
-
-    ClusterKV.update(db, realm, key, {id, {self(), Node.self()}}, filter)
-    List.delete(subs, sub)
-  end
-
-  defp remove_registration(regs, realm, registration_id, db) do
-    {id, proc} =
-      reg =
-      Enum.find(regs, fn
-        {^registration_id, _proc} -> true
-        _ -> false
-      end)
-
-    filter = fn {id, values}, value ->
-      {id, List.delete(values, value)}
-    end
-
-    ClusterKV.update(db, realm, proc, {id, {self(), Node.self()}}, filter)
-    List.delete(regs, reg)
-  end
+  defp get_live_callee(_proxy, [], _index, 0), do: {:error, :no_live_callees}
 
-  defp get_live_callee(_proxy, []), do: {:error, :no_live_callees}
+  defp get_live_callee(proxy, callees, index, tries) do
+    {_id, {pid, node}} = c = Enum.at(callees, index)
 
-  defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do
     case GenServer.call({proxy, node}, {:is_up, pid}) do
-      true -> c
-      false -> get_live_callee(proxy, t)
-    end
-  end
-
-  defp get_subscribers({db, {keyspace, key}, acc}) do
-    case ClusterKV.get(db, keyspace, key, 500) do
-      {_, subscribers} -> {db, {keyspace, key}, acc ++ subscribers}
-      _ -> {db, {keyspace, key}, acc}
-    end
-  end
+      true ->
+        c
 
-  defp get_prefix({db, {keyspace, key}, acc}) do
-    l = ClusterKV.prefix(db, keyspace, key, ".", 2, 500)
-    {db, {keyspace, key}, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)}
-  end
+      false ->
+        index =
+          case index + 1 do
+            ni when ni < length(callees) -> ni
+            _ -> 0
+          end
 
-  defp get_wildcard({db, {keyspace, key}, acc}) do
-    l = ClusterKV.wildcard(db, keyspace, key, ".", ":", "")
-    {db, {keyspace, key}, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)}
+        get_live_callee(proxy, callees, index, tries - 1)
+    end
   end
 
   defp send_to_peer(msg, transport, pid) do
@@ -763,14 +696,6 @@ defmodule Wampex.Router.Session do
     Enum.reverse(message)
   end
 
-  defp get_request_id(current_id) when current_id == @max_id do
-    1
-  end
-
-  defp get_request_id(current_id) do
-    current_id + 1
-  end
-
   defp handle_message(msg, roles) do
     Enum.reduce_while(roles, nil, fn r, _ ->
       try do
index 4cedd83fe66b8db86321466711d109a2274f6a6e..cf10e0809c3212f81c0df30c7d79d288c097cc12 100644 (file)
@@ -74,7 +74,7 @@ defmodule Wampex.Router.Transports.WebSocket do
 
   @impl true
   def terminate(reason, _req, _state) do
-    Logger.info("Websocket closing for reason #{inspect(reason)}")
+    Logger.debug("Websocket closing for reason #{inspect(reason)}")
     :ok
   end