From: Christopher Date: Sat, 21 Mar 2020 15:16:06 +0000 (-0500) Subject: shard proxies based on realm X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=e4c109f959a4adf895728472074816f0d455aa7f;p=wampex_router.git shard proxies based on realm --- diff --git a/.formatter.exs b/.formatter.exs index d2cda26..0a70dc0 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -1,4 +1,5 @@ # Used by "mix format" [ - inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"], + line_length: 120 ] diff --git a/lib/router.ex b/lib/router.ex index 2650155..c6988a5 100644 --- a/lib/router.ex +++ b/lib/router.ex @@ -1,7 +1,7 @@ defmodule Wampex.Router do use Supervisor - alias Wampex.Router.{Admin, Authentication, Proxy, REST} + alias Wampex.Router.{Admin, Authentication, Realms, REST} alias Wampex.Router.Authentication.EnsureDefaultAdmin alias Wampex.Router.Transports.WebSocket @@ -35,25 +35,15 @@ defmodule Wampex.Router do ) end - @spec init( - {module(), pos_integer(), list(), integer(), integer(), String.t(), String.t(), - String.t()} - ) :: + @spec init({module(), pos_integer(), list(), integer(), integer(), String.t(), String.t(), String.t()}) :: {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore def init({name, port, topologies, replicas, quorum, admin_realm, admin_authid, admin_password}) do children = [ + {ClusterKV, name: db_name(name), topologies: topologies, replicas: replicas, quorum: quorum}, Authentication.Repo, {EnsureDefaultAdmin, [uri: admin_realm, authid: admin_authid, password: admin_password]}, - {ClusterKV, - [ - name: db_name(name), - topologies: topologies, - replicas: replicas, - quorum: quorum - ]}, - {Proxy, [name: proxy_name(name)]}, - {Admin, - [name: admin_name(name), db: db_name(name), realm: admin_realm, proxy: proxy_name(name)]}, + {Realms, name: realms_name(name)}, + {Admin, name: admin_name(name), db: db_name(name), realm: admin_realm, realms: realms_name(name)}, {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch(name)]} ] @@ -61,14 +51,14 @@ defmodule Wampex.Router do end def db_name(name), do: Module.concat([name, KV]) - def proxy_name(name), do: Module.concat([name, Proxy]) + def realms_name(name), do: Module.concat([name, Realms]) def admin_name(name), do: Module.concat([name, Admin]) defp dispatch(name) do [ {:_, [ - {"/ws", WebSocket, %{db: db_name(name), proxy: proxy_name(name)}}, + {"/ws", WebSocket, %{db: db_name(name), name: name}}, {:_, Plug.Cowboy.Handler, {REST, []}} ]} ] diff --git a/lib/router/admin.ex b/lib/router/admin.ex index 1d484fb..f20906e 100644 --- a/lib/router/admin.ex +++ b/lib/router/admin.ex @@ -5,19 +5,19 @@ defmodule Wampex.Router.Admin do alias Wampex.Roles.Dealer.{Invocation, Result} alias Wampex.Roles.Peer.Error alias Wampex.Router.Authentication.{User, Realm} - alias Wampex.Router.Session + alias Wampex.Router.{Realms, Session} @procedures [ "admin.create_user", "admin.create_realm" ] - def start_link(name: name, db: db, realm: realm, proxy: proxy) do - GenServer.start_link(__MODULE__, {db, realm, proxy}, name: name) + def start_link(name: name, db: db, realm: realm, realms: realms) do + GenServer.start_link(__MODULE__, {db, realm, realms}, name: name) end - def init({db, realm, proxy}) do - {:ok, %{db: db, realm: realm, proxy: proxy, regs: []}, {:continue, :ok}} + def init({db, realm, realms}) do + {:ok, %{db: db, realm: realm, proxy: Realms.start_realm(realms, realm), regs: []}, {:continue, :ok}} end def handle_continue(:ok, %{db: db, realm: realm, regs: regs} = state) do @@ -26,6 +26,8 @@ defmodule Wampex.Router.Admin do end def register_procedures(db, realm, regs) do + Logger.info("Registering admin procedures: #{inspect(@procedures)}") + Enum.reduce(@procedures, regs, fn proc, acc -> val = {Session.get_global_id(), {self(), Node.self()}} ClusterKV.put(db, realm, proc, val) diff --git a/lib/router/authentication.ex b/lib/router/authentication.ex index 56c59e9..40a59e9 100644 --- a/lib/router/authentication.ex +++ b/lib/router/authentication.ex @@ -42,8 +42,7 @@ defmodule Wampex.Router.Authentication do def parse_challenge(challenge) do ch = JSON.deserialize!(challenge.challenge) - {get_in(ch, ["authid"]), get_in(ch, ["authrole"]), get_in(ch, ["authmethod"]), - get_in(ch, ["authprovider"])} + {get_in(ch, ["authid"]), get_in(ch, ["authrole"]), get_in(ch, ["authmethod"]), get_in(ch, ["authprovider"])} end def authenticate(signature, realm, authid, %{ diff --git a/lib/router/realms.ex b/lib/router/realms.ex new file mode 100644 index 0000000..06625c6 --- /dev/null +++ b/lib/router/realms.ex @@ -0,0 +1,30 @@ +defmodule Wampex.Router.Realms do + @moduledoc false + use Supervisor + alias Wampex.Router.Realms.Proxy + require Logger + + def start_link(name: name) do + Supervisor.start_link(__MODULE__, name, name: name) + end + + @impl true + def init(name) do + children = [ + {DynamicSupervisor, strategy: :one_for_one, name: realm_supervisor_name(name)} + ] + + Logger.info("Starting Realms Supervisor: #{inspect(children)}") + Supervisor.init(children, strategy: :one_for_one) + end + + def start_realm(name, realm) do + r_name = realm_name(realm) + rs_name = realm_supervisor_name(name) + DynamicSupervisor.start_child(rs_name, {Proxy, name: r_name}) + r_name + end + + def realm_supervisor_name(name), do: Module.concat([name, Supervisor]) + def realm_name(realm), do: Module.concat([__MODULE__, realm]) +end diff --git a/lib/router/proxy.ex b/lib/router/realms/proxy.ex similarity index 89% rename from lib/router/proxy.ex rename to lib/router/realms/proxy.ex index 2ddb7d6..9c8f64f 100644 --- a/lib/router/proxy.ex +++ b/lib/router/realms/proxy.ex @@ -1,4 +1,4 @@ -defmodule Wampex.Router.Proxy do +defmodule Wampex.Router.Realms.Proxy do @moduledoc false use GenServer diff --git a/lib/router/session.ex b/lib/router/session.ex index 106437a..8406778 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -13,7 +13,7 @@ defmodule Wampex.Router.Session do alias Wampex.Router alias Broker.{Subscribed, Published, Event, Unsubscribed} alias Dealer.{Invocation, Registered, Result, Unregistered} - alias Router.Authentication + alias Router.{Authentication, Realms} @enforce_keys [:transport, :transport_pid] defstruct [ @@ -125,8 +125,7 @@ defmodule Wampex.Router.Session do ) do debug("Init") - {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}}, - [{:next_event, :internal, :transition}]} + {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}}, [{:next_event, :internal, :transition}]} end @impl true @@ -208,15 +207,16 @@ defmodule Wampex.Router.Session do authentication: auth, challenge: challenge, realm: realm, + name: name, authenticate: {:authenticate, sig, _dets} } } = sl ) do info = auth.parse_challenge(challenge) - actions = maybe_welcome(auth, session_id, sig, realm, challenge, info, tt, t) + {actions, proxy} = maybe_welcome({auth, session_id, sig, realm, name, challenge, info, tt, t}) - {:ok, sl, actions} + {:ok, %SL{sl | data: %Sess{sl.data | proxy: proxy}}, actions} end @impl true @@ -297,8 +297,7 @@ defmodule Wampex.Router.Session do send_to_peer(Broker.unsubscribed(%Unsubscribed{request_id: rid}), tt, t) - {:ok, %SL{sl | data: %Sess{sl.data | subscriptions: subs}}, - [{:next_event, :internal, :transition}]} + {:ok, %SL{sl | data: %Sess{sl.data | subscriptions: subs}}, [{:next_event, :internal, :transition}]} end @impl true @@ -479,8 +478,7 @@ defmodule Wampex.Router.Session do @handle_yield, _, @yield, - %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} = - data + %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} = data ) do {call_id, _, {pid, node}} = Enum.find(inv, fn @@ -499,8 +497,7 @@ defmodule Wampex.Router.Session do _ -> true end) - {:ok, %SL{data | data: %Sess{data.data | invocations: inv}}, - [{:next_event, :internal, :transition}]} + {:ok, %SL{data | data: %Sess{data.data | invocations: inv}}, [{:next_event, :internal, :transition}]} end @impl true @@ -562,8 +559,7 @@ defmodule Wampex.Router.Session do @impl true def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do - {:ok, %SL{data | data: %Sess{sess | message: message}}, - [{:next_event, :internal, :message_received}]} + {:ok, %SL{data | data: %Sess{sess | message: message}}, [{:next_event, :internal, :message_received}]} end @impl true @@ -601,17 +597,12 @@ defmodule Wampex.Router.Session do end defp maybe_welcome( - auth, - session_id, - sig, - realm, - challenge, - {authid, authrole, authmethod, authprovider}, - tt, - t + {auth, session_id, sig, realm, name, challenge, {authid, authrole, authmethod, authprovider}, tt, t} ) do case auth.authenticate(sig, realm, authid, challenge) do true -> + proxy = Realms.start_realm(Router.realms_name(name), realm) + send_to_peer( Peer.welcome(%Welcome{ session_id: session_id, @@ -628,10 +619,10 @@ defmodule Wampex.Router.Session do t ) - [{:next_event, :internal, :transition}] + {[{:next_event, :internal, :transition}], proxy} false -> - [{:next_event, :internal, :transition}, {:next_event, :internal, :abort}] + {[{:next_event, :internal, :transition}, {:next_event, :internal, :abort}], nil} end end diff --git a/lib/router/transports/web_socket.ex b/lib/router/transports/web_socket.ex index ae36b57..75bfec5 100644 --- a/lib/router/transports/web_socket.ex +++ b/lib/router/transports/web_socket.ex @@ -19,15 +19,15 @@ defmodule Wampex.Router.Transports.WebSocket do end @impl true - def init(req, %{db: db, proxy: proxy}) do + def init(req, %{db: db, name: name}) do {:ok, serializer, protocol} = get_serializer(req) req = :cowboy_req.set_resp_header(@protocol_header, protocol, req) - {:cowboy_websocket, req, {%WebSocket{serializer: serializer}, db, proxy}} + {:cowboy_websocket, req, {%WebSocket{serializer: serializer}, db, name}} end @impl true - def websocket_init({state, db, proxy}) do - {:ok, session} = start_session(db, proxy) + def websocket_init({state, db, name}) do + {:ok, session} = start_session(db, name) Logger.info("Websocket Initialized: #{inspect(state)}") {:ok, %WebSocket{state | session: session}} end @@ -87,8 +87,8 @@ defmodule Wampex.Router.Transports.WebSocket do {:ok, state} end - defp start_session(db, proxy) do - Session.start_link(%Session{db: db, proxy: proxy, transport: WebSocket, transport_pid: self()}) + defp start_session(db, name) do + Session.start_link(%Session{db: db, name: name, transport: WebSocket, transport_pid: self()}) end defp get_serializer(req) do diff --git a/mix.exs b/mix.exs index b306e20..9015ad7 100644 --- a/mix.exs +++ b/mix.exs @@ -39,8 +39,7 @@ defmodule Wampex.Router.MixProject do [ # {:cluster_kv, path: "../cluster_kv"}, {:cluster_kv, - git: "https://gitlab.com/entropealabs/cluster_kv.git", - tag: "4c36b8d68f40711cd167edb9917d32a992f49561"}, + git: "https://gitlab.com/entropealabs/cluster_kv.git", tag: "4c36b8d68f40711cd167edb9917d32a992f49561"}, {:cors_plug, "~> 2.0"}, {:credo, "~> 1.2", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false}, @@ -53,9 +52,7 @@ defmodule Wampex.Router.MixProject do {:plug_cowboy, "~> 2.1"}, {:postgrex, ">= 0.0.0"}, {:states_language, "~> 0.2"}, - {:wampex, - git: "https://gitlab.com/entropealabs/wampex.git", - tag: "7c5d257937ace7776f61ad63f8590c24ef5e382a"}, + {:wampex, git: "https://gitlab.com/entropealabs/wampex.git", tag: "7c5d257937ace7776f61ad63f8590c24ef5e382a"}, {:wampex_client, git: "https://gitlab.com/entropealabs/wampex_client.git", tag: "59424e0d6a7bbc2646ab1cad9e71a8d27a016ac9", diff --git a/test/support/test_subscriber.ex b/test/support/test_subscriber.ex index e86d354..77af14c 100644 --- a/test/support/test_subscriber.ex +++ b/test/support/test_subscriber.ex @@ -15,8 +15,7 @@ defmodule TestSubscriber do {:ok, sub2} = Client.subscribe(name, %Subscribe{topic: "com.data.test"}) {:ok, sub3} = Client.subscribe(name, %Subscribe{topic: "com.data"}) - {:ok, sub4} = - Client.subscribe(name, %Subscribe{topic: "com...temp", options: %{"match" => "wildcard"}}) + {:ok, sub4} = Client.subscribe(name, %Subscribe{topic: "com...temp", options: %{"match" => "wildcard"}}) {:ok, sub5} = Client.subscribe(name, %Subscribe{ diff --git a/test/wampex_test.exs b/test/wampex_test.exs index 7a12881..c496773 100644 --- a/test/wampex_test.exs +++ b/test/wampex_test.exs @@ -121,8 +121,7 @@ defmodule WampexTest do @tag :client test "Peer.hello" do - assert [1, "test", %{agent: "WAMPex", roles: %{callee: %{}}}] = - Peer.hello(%Hello{realm: "test", roles: [Callee]}) + assert [1, "test", %{agent: "WAMPex", roles: %{callee: %{}}}] = Peer.hello(%Hello{realm: "test", roles: [Callee]}) end @tag :client @@ -162,8 +161,7 @@ defmodule WampexTest do @tag :client test "Subscriber.subscribe" do - assert [32, %{test: 1}, "test"] = - Subscriber.subscribe(%Subscribe{topic: "test", options: %{test: 1}}) + assert [32, %{test: 1}, "test"] = Subscriber.subscribe(%Subscribe{topic: "test", options: %{test: 1}}) end @tag :client @@ -178,8 +176,7 @@ defmodule WampexTest do @tag :router test "Dealer.registered" do - assert [65, 123_456, 765_432] = - Dealer.registered(%Registered{request_id: 123_456, registration_id: 765_432}) + assert [65, 123_456, 765_432] = Dealer.registered(%Registered{request_id: 123_456, registration_id: 765_432}) end @tag :router @@ -203,8 +200,7 @@ defmodule WampexTest do @tag :router test "Dealer.invocation" do - assert [68, 1234, 1234, %{}, [], %{}] = - Dealer.invocation(%Invocation{request_id: 1234, registration_id: 1234}) + assert [68, 1234, 1234, %{}, [], %{}] = Dealer.invocation(%Invocation{request_id: 1234, registration_id: 1234}) assert [68, 1234, 1234, %{}, ["1"], %{}] = Dealer.invocation(%Invocation{ @@ -247,8 +243,7 @@ defmodule WampexTest do @tag :router test "Broker.subscribed" do - assert [33, 123_456, 123_456] = - Broker.subscribed(%Subscribed{request_id: 123_456, subscription_id: 123_456}) + assert [33, 123_456, 123_456] = Broker.subscribed(%Subscribed{request_id: 123_456, subscription_id: 123_456}) end @tag :router @@ -258,8 +253,7 @@ defmodule WampexTest do @tag :router test "Broker.published" do - assert [17, 123_456, 654_321] = - Broker.published(%Published{request_id: 123_456, publication_id: 654_321}) + assert [17, 123_456, 654_321] = Broker.published(%Published{request_id: 123_456, publication_id: 654_321}) end @tag :client @@ -321,9 +315,7 @@ defmodule WampexTest do caller_name = TestExistCaller Client.start_link(name: caller_name, session: @session) - assert {:error, 48, "wamp.error.no_registration", %{"procedure" => "this.should.not.exist"}, - [], - %{}} = + assert {:error, 48, "wamp.error.no_registration", %{"procedure" => "this.should.not.exist"}, [], %{}} = Client.send_request( caller_name, Caller.call(%Call{