]> Entropealabs - wampex_router.git/commitdiff
shard proxies based on realm
authorChristopher <chris@entropealabs.com>
Sat, 21 Mar 2020 15:16:06 +0000 (10:16 -0500)
committerChristopher <chris@entropealabs.com>
Sat, 21 Mar 2020 15:16:06 +0000 (10:16 -0500)
.formatter.exs
lib/router.ex
lib/router/admin.ex
lib/router/authentication.ex
lib/router/realms.ex [new file with mode: 0644]
lib/router/realms/proxy.ex [moved from lib/router/proxy.ex with 89% similarity]
lib/router/session.ex
lib/router/transports/web_socket.ex
mix.exs
test/support/test_subscriber.ex
test/wampex_test.exs

index d2cda26eddc9110057ce9e8002585682217542cc..0a70dc0f6841b601f6c1e09d6912f877f3d84091 100644 (file)
@@ -1,4 +1,5 @@
 # Used by "mix format"
 [
-  inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
+  inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
+  line_length: 120
 ]
index 2650155fc3651395dfaf299c69e39d21d74006e1..c6988a55b34f3fa3bb21d409baaef7c7f53b02c1 100644 (file)
@@ -1,7 +1,7 @@
 defmodule Wampex.Router do
   use Supervisor
 
-  alias Wampex.Router.{Admin, Authentication, Proxy, REST}
+  alias Wampex.Router.{Admin, Authentication, Realms, REST}
   alias Wampex.Router.Authentication.EnsureDefaultAdmin
   alias Wampex.Router.Transports.WebSocket
 
@@ -35,25 +35,15 @@ defmodule Wampex.Router do
     )
   end
 
-  @spec init(
-          {module(), pos_integer(), list(), integer(), integer(), String.t(), String.t(),
-           String.t()}
-        ) ::
+  @spec init({module(), pos_integer(), list(), integer(), integer(), String.t(), String.t(), String.t()}) ::
           {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
   def init({name, port, topologies, replicas, quorum, admin_realm, admin_authid, admin_password}) do
     children = [
+      {ClusterKV, name: db_name(name), topologies: topologies, replicas: replicas, quorum: quorum},
       Authentication.Repo,
       {EnsureDefaultAdmin, [uri: admin_realm, authid: admin_authid, password: admin_password]},
-      {ClusterKV,
-       [
-         name: db_name(name),
-         topologies: topologies,
-         replicas: replicas,
-         quorum: quorum
-       ]},
-      {Proxy, [name: proxy_name(name)]},
-      {Admin,
-       [name: admin_name(name), db: db_name(name), realm: admin_realm, proxy: proxy_name(name)]},
+      {Realms, name: realms_name(name)},
+      {Admin, name: admin_name(name), db: db_name(name), realm: admin_realm, realms: realms_name(name)},
       {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch(name)]}
     ]
 
@@ -61,14 +51,14 @@ defmodule Wampex.Router do
   end
 
   def db_name(name), do: Module.concat([name, KV])
-  def proxy_name(name), do: Module.concat([name, Proxy])
+  def realms_name(name), do: Module.concat([name, Realms])
   def admin_name(name), do: Module.concat([name, Admin])
 
   defp dispatch(name) do
     [
       {:_,
        [
-         {"/ws", WebSocket, %{db: db_name(name), proxy: proxy_name(name)}},
+         {"/ws", WebSocket, %{db: db_name(name), name: name}},
          {:_, Plug.Cowboy.Handler, {REST, []}}
        ]}
     ]
index 1d484fb3a3a154fff1b1fcadd34c959b6fd3452f..f20906e8c99ae0b65e494cf1065b7542c5fbbb59 100644 (file)
@@ -5,19 +5,19 @@ defmodule Wampex.Router.Admin do
   alias Wampex.Roles.Dealer.{Invocation, Result}
   alias Wampex.Roles.Peer.Error
   alias Wampex.Router.Authentication.{User, Realm}
-  alias Wampex.Router.Session
+  alias Wampex.Router.{Realms, Session}
 
   @procedures [
     "admin.create_user",
     "admin.create_realm"
   ]
 
-  def start_link(name: name, db: db, realm: realm, proxy: proxy) do
-    GenServer.start_link(__MODULE__, {db, realm, proxy}, name: name)
+  def start_link(name: name, db: db, realm: realm, realms: realms) do
+    GenServer.start_link(__MODULE__, {db, realm, realms}, name: name)
   end
 
-  def init({db, realm, proxy}) do
-    {:ok, %{db: db, realm: realm, proxy: proxy, regs: []}, {:continue, :ok}}
+  def init({db, realm, realms}) do
+    {:ok, %{db: db, realm: realm, proxy: Realms.start_realm(realms, realm), regs: []}, {:continue, :ok}}
   end
 
   def handle_continue(:ok, %{db: db, realm: realm, regs: regs} = state) do
@@ -26,6 +26,8 @@ defmodule Wampex.Router.Admin do
   end
 
   def register_procedures(db, realm, regs) do
+    Logger.info("Registering admin procedures: #{inspect(@procedures)}")
+
     Enum.reduce(@procedures, regs, fn proc, acc ->
       val = {Session.get_global_id(), {self(), Node.self()}}
       ClusterKV.put(db, realm, proc, val)
index 56c59e94a8438f6d81875836ca5ec881073916de..40a59e9f0c3155e7ed8b00c8c68daf8a2bce1e6a 100644 (file)
@@ -42,8 +42,7 @@ defmodule Wampex.Router.Authentication do
   def parse_challenge(challenge) do
     ch = JSON.deserialize!(challenge.challenge)
 
-    {get_in(ch, ["authid"]), get_in(ch, ["authrole"]), get_in(ch, ["authmethod"]),
-     get_in(ch, ["authprovider"])}
+    {get_in(ch, ["authid"]), get_in(ch, ["authrole"]), get_in(ch, ["authmethod"]), get_in(ch, ["authprovider"])}
   end
 
   def authenticate(signature, realm, authid, %{
diff --git a/lib/router/realms.ex b/lib/router/realms.ex
new file mode 100644 (file)
index 0000000..06625c6
--- /dev/null
@@ -0,0 +1,30 @@
+defmodule Wampex.Router.Realms do
+  @moduledoc false
+  use Supervisor
+  alias Wampex.Router.Realms.Proxy
+  require Logger
+
+  def start_link(name: name) do
+    Supervisor.start_link(__MODULE__, name, name: name)
+  end
+
+  @impl true
+  def init(name) do
+    children = [
+      {DynamicSupervisor, strategy: :one_for_one, name: realm_supervisor_name(name)}
+    ]
+
+    Logger.info("Starting Realms Supervisor: #{inspect(children)}")
+    Supervisor.init(children, strategy: :one_for_one)
+  end
+
+  def start_realm(name, realm) do
+    r_name = realm_name(realm)
+    rs_name = realm_supervisor_name(name)
+    DynamicSupervisor.start_child(rs_name, {Proxy, name: r_name})
+    r_name
+  end
+
+  def realm_supervisor_name(name), do: Module.concat([name, Supervisor])
+  def realm_name(realm), do: Module.concat([__MODULE__, realm])
+end
similarity index 89%
rename from lib/router/proxy.ex
rename to lib/router/realms/proxy.ex
index 2ddb7d6e854c78c680d00fe6cbb106622cefe71d..9c8f64f0ee650f502cd5eb08d4dd3ce5cc1811d5 100644 (file)
@@ -1,4 +1,4 @@
-defmodule Wampex.Router.Proxy do
+defmodule Wampex.Router.Realms.Proxy do
   @moduledoc false
   use GenServer
 
index 106437a187813a1a547efcb1135dfe2961e39245..8406778ba4d00f56913342eba510e7825c5f7710 100644 (file)
@@ -13,7 +13,7 @@ defmodule Wampex.Router.Session do
   alias Wampex.Router
   alias Broker.{Subscribed, Published, Event, Unsubscribed}
   alias Dealer.{Invocation, Registered, Result, Unregistered}
-  alias Router.Authentication
+  alias Router.{Authentication, Realms}
 
   @enforce_keys [:transport, :transport_pid]
   defstruct [
@@ -125,8 +125,7 @@ defmodule Wampex.Router.Session do
       ) do
     debug("Init")
 
-    {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}},
-     [{:next_event, :internal, :transition}]}
+    {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}}, [{:next_event, :internal, :transition}]}
   end
 
   @impl true
@@ -208,15 +207,16 @@ defmodule Wampex.Router.Session do
             authentication: auth,
             challenge: challenge,
             realm: realm,
+            name: name,
             authenticate: {:authenticate, sig, _dets}
           }
         } = sl
       ) do
     info = auth.parse_challenge(challenge)
 
-    actions = maybe_welcome(auth, session_id, sig, realm, challenge, info, tt, t)
+    {actions, proxy} = maybe_welcome({auth, session_id, sig, realm, name, challenge, info, tt, t})
 
-    {:ok, sl, actions}
+    {:ok, %SL{sl | data: %Sess{sl.data | proxy: proxy}}, actions}
   end
 
   @impl true
@@ -297,8 +297,7 @@ defmodule Wampex.Router.Session do
 
     send_to_peer(Broker.unsubscribed(%Unsubscribed{request_id: rid}), tt, t)
 
-    {:ok, %SL{sl | data: %Sess{sl.data | subscriptions: subs}},
-     [{:next_event, :internal, :transition}]}
+    {:ok, %SL{sl | data: %Sess{sl.data | subscriptions: subs}}, [{:next_event, :internal, :transition}]}
   end
 
   @impl true
@@ -479,8 +478,7 @@ defmodule Wampex.Router.Session do
         @handle_yield,
         _,
         @yield,
-        %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} =
-          data
+        %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} = data
       ) do
     {call_id, _, {pid, node}} =
       Enum.find(inv, fn
@@ -499,8 +497,7 @@ defmodule Wampex.Router.Session do
         _ -> true
       end)
 
-    {:ok, %SL{data | data: %Sess{data.data | invocations: inv}},
-     [{:next_event, :internal, :transition}]}
+    {:ok, %SL{data | data: %Sess{data.data | invocations: inv}}, [{:next_event, :internal, :transition}]}
   end
 
   @impl true
@@ -562,8 +559,7 @@ defmodule Wampex.Router.Session do
 
   @impl true
   def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do
-    {:ok, %SL{data | data: %Sess{sess | message: message}},
-     [{:next_event, :internal, :message_received}]}
+    {:ok, %SL{data | data: %Sess{sess | message: message}}, [{:next_event, :internal, :message_received}]}
   end
 
   @impl true
@@ -601,17 +597,12 @@ defmodule Wampex.Router.Session do
   end
 
   defp maybe_welcome(
-         auth,
-         session_id,
-         sig,
-         realm,
-         challenge,
-         {authid, authrole, authmethod, authprovider},
-         tt,
-         t
+         {auth, session_id, sig, realm, name, challenge, {authid, authrole, authmethod, authprovider}, tt, t}
        ) do
     case auth.authenticate(sig, realm, authid, challenge) do
       true ->
+        proxy = Realms.start_realm(Router.realms_name(name), realm)
+
         send_to_peer(
           Peer.welcome(%Welcome{
             session_id: session_id,
@@ -628,10 +619,10 @@ defmodule Wampex.Router.Session do
           t
         )
 
-        [{:next_event, :internal, :transition}]
+        {[{:next_event, :internal, :transition}], proxy}
 
       false ->
-        [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}]
+        {[{:next_event, :internal, :transition}, {:next_event, :internal, :abort}], nil}
     end
   end
 
index ae36b57617f3675a937c51b67e751625157baa69..75bfec51049fce4aa3c57ddad4e959f3fe9e4044 100644 (file)
@@ -19,15 +19,15 @@ defmodule Wampex.Router.Transports.WebSocket do
   end
 
   @impl true
-  def init(req, %{db: db, proxy: proxy}) do
+  def init(req, %{db: db, name: name}) do
     {:ok, serializer, protocol} = get_serializer(req)
     req = :cowboy_req.set_resp_header(@protocol_header, protocol, req)
-    {:cowboy_websocket, req, {%WebSocket{serializer: serializer}, db, proxy}}
+    {:cowboy_websocket, req, {%WebSocket{serializer: serializer}, db, name}}
   end
 
   @impl true
-  def websocket_init({state, db, proxy}) do
-    {:ok, session} = start_session(db, proxy)
+  def websocket_init({state, db, name}) do
+    {:ok, session} = start_session(db, name)
     Logger.info("Websocket Initialized: #{inspect(state)}")
     {:ok, %WebSocket{state | session: session}}
   end
@@ -87,8 +87,8 @@ defmodule Wampex.Router.Transports.WebSocket do
     {:ok, state}
   end
 
-  defp start_session(db, proxy) do
-    Session.start_link(%Session{db: db, proxy: proxy, transport: WebSocket, transport_pid: self()})
+  defp start_session(db, name) do
+    Session.start_link(%Session{db: db, name: name, transport: WebSocket, transport_pid: self()})
   end
 
   defp get_serializer(req) do
diff --git a/mix.exs b/mix.exs
index b306e20adec293c9946fd1a0e8dfe2ac2b6a718e..9015ad74212f7f16e6bcd9667299f80f29cb0e30 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -39,8 +39,7 @@ defmodule Wampex.Router.MixProject do
     [
       # {:cluster_kv, path: "../cluster_kv"},
       {:cluster_kv,
-       git: "https://gitlab.com/entropealabs/cluster_kv.git",
-       tag: "4c36b8d68f40711cd167edb9917d32a992f49561"},
+       git: "https://gitlab.com/entropealabs/cluster_kv.git", tag: "4c36b8d68f40711cd167edb9917d32a992f49561"},
       {:cors_plug, "~> 2.0"},
       {:credo, "~> 1.2", only: [:dev, :test], runtime: false},
       {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
@@ -53,9 +52,7 @@ defmodule Wampex.Router.MixProject do
       {:plug_cowboy, "~> 2.1"},
       {:postgrex, ">= 0.0.0"},
       {:states_language, "~> 0.2"},
-      {:wampex,
-       git: "https://gitlab.com/entropealabs/wampex.git",
-       tag: "7c5d257937ace7776f61ad63f8590c24ef5e382a"},
+      {:wampex, git: "https://gitlab.com/entropealabs/wampex.git", tag: "7c5d257937ace7776f61ad63f8590c24ef5e382a"},
       {:wampex_client,
        git: "https://gitlab.com/entropealabs/wampex_client.git",
        tag: "59424e0d6a7bbc2646ab1cad9e71a8d27a016ac9",
index e86d354e30263681e0bbcf5d479107f9a968c652..77af14c1c28d8161d47515829d5b06de5fc21c29 100644 (file)
@@ -15,8 +15,7 @@ defmodule TestSubscriber do
     {:ok, sub2} = Client.subscribe(name, %Subscribe{topic: "com.data.test"})
     {:ok, sub3} = Client.subscribe(name, %Subscribe{topic: "com.data"})
 
-    {:ok, sub4} =
-      Client.subscribe(name, %Subscribe{topic: "com...temp", options: %{"match" => "wildcard"}})
+    {:ok, sub4} = Client.subscribe(name, %Subscribe{topic: "com...temp", options: %{"match" => "wildcard"}})
 
     {:ok, sub5} =
       Client.subscribe(name, %Subscribe{
index 7a12881114e96d8b36b1817bf8d121a749b3a436..c49677324761e087bd0c1d99e94e5f0d183e8820 100644 (file)
@@ -121,8 +121,7 @@ defmodule WampexTest do
 
   @tag :client
   test "Peer.hello" do
-    assert [1, "test", %{agent: "WAMPex", roles: %{callee: %{}}}] =
-             Peer.hello(%Hello{realm: "test", roles: [Callee]})
+    assert [1, "test", %{agent: "WAMPex", roles: %{callee: %{}}}] = Peer.hello(%Hello{realm: "test", roles: [Callee]})
   end
 
   @tag :client
@@ -162,8 +161,7 @@ defmodule WampexTest do
 
   @tag :client
   test "Subscriber.subscribe" do
-    assert [32, %{test: 1}, "test"] =
-             Subscriber.subscribe(%Subscribe{topic: "test", options: %{test: 1}})
+    assert [32, %{test: 1}, "test"] = Subscriber.subscribe(%Subscribe{topic: "test", options: %{test: 1}})
   end
 
   @tag :client
@@ -178,8 +176,7 @@ defmodule WampexTest do
 
   @tag :router
   test "Dealer.registered" do
-    assert [65, 123_456, 765_432] =
-             Dealer.registered(%Registered{request_id: 123_456, registration_id: 765_432})
+    assert [65, 123_456, 765_432] = Dealer.registered(%Registered{request_id: 123_456, registration_id: 765_432})
   end
 
   @tag :router
@@ -203,8 +200,7 @@ defmodule WampexTest do
 
   @tag :router
   test "Dealer.invocation" do
-    assert [68, 1234, 1234, %{}, [], %{}] =
-             Dealer.invocation(%Invocation{request_id: 1234, registration_id: 1234})
+    assert [68, 1234, 1234, %{}, [], %{}] = Dealer.invocation(%Invocation{request_id: 1234, registration_id: 1234})
 
     assert [68, 1234, 1234, %{}, ["1"], %{}] =
              Dealer.invocation(%Invocation{
@@ -247,8 +243,7 @@ defmodule WampexTest do
 
   @tag :router
   test "Broker.subscribed" do
-    assert [33, 123_456, 123_456] =
-             Broker.subscribed(%Subscribed{request_id: 123_456, subscription_id: 123_456})
+    assert [33, 123_456, 123_456] = Broker.subscribed(%Subscribed{request_id: 123_456, subscription_id: 123_456})
   end
 
   @tag :router
@@ -258,8 +253,7 @@ defmodule WampexTest do
 
   @tag :router
   test "Broker.published" do
-    assert [17, 123_456, 654_321] =
-             Broker.published(%Published{request_id: 123_456, publication_id: 654_321})
+    assert [17, 123_456, 654_321] = Broker.published(%Published{request_id: 123_456, publication_id: 654_321})
   end
 
   @tag :client
@@ -321,9 +315,7 @@ defmodule WampexTest do
     caller_name = TestExistCaller
     Client.start_link(name: caller_name, session: @session)
 
-    assert {:error, 48, "wamp.error.no_registration", %{"procedure" => "this.should.not.exist"},
-            [],
-            %{}} =
+    assert {:error, 48, "wamp.error.no_registration", %{"procedure" => "this.should.not.exist"}, [], %{}} =
              Client.send_request(
                caller_name,
                Caller.call(%Call{