From: Christopher Date: Sun, 15 Mar 2020 18:26:54 +0000 (-0500) Subject: routed RPC working X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=808cf1d1a803cd902f5247225c570d816f251b68;p=wampex.git routed RPC working --- diff --git a/.tool-versions b/.tool-versions index c217a1b..30c5b80 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -elixir 1.10.1 +elixir 1.9.4-otp-22 erlang 22.2.6 diff --git a/config/config.exs b/config/config.exs index 19932f3..988fb25 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1,3 +1,14 @@ use Mix.Config -config :wampex, state_machine: "priv/session.json" +config :cluster_kv, + topologies: [ + kv: [ + strategy: Cluster.Strategy.Epmd, + config: [ + hosts: [] + ], + connect: {:net_kernel, :connect_node, []}, + disconnect: {:erlang, :disconnect_node, []}, + list_nodes: {:erlang, :nodes, [:connected]} + ] + ] diff --git a/lib/roles/dealer.ex b/lib/roles/dealer.ex index a4c8624..61bab0b 100644 --- a/lib/roles/dealer.ex +++ b/lib/roles/dealer.ex @@ -111,18 +111,19 @@ defmodule Wampex.Roles.Dealer do end @impl true - def handle([@call, id, dets]) do - handle([@call, id, dets, [], %{}]) + def handle([@call, id, dets, proc]) do + handle([@call, id, dets, proc, [], %{}]) end @impl true - def handle([@call, id, dets, arg_l]) do - handle([@call, id, dets, arg_l, %{}]) + def handle([@call, id, dets, proc, arg_l]) do + handle([@call, id, dets, proc, arg_l, %{}]) end @impl true - def handle([@call, id, dets, arg_l, arg_kw]) do - {[{:next_event, :internal, :call}], id, {:update, :call, {:call, id, dets, arg_l, arg_kw}}} + def handle([@call, id, dets, proc, arg_l, arg_kw]) do + {[{:next_event, :internal, :call}], id, + {:update, :call, {:call, id, dets, proc, arg_l, arg_kw}}} end @impl true diff --git a/lib/router.ex b/lib/router.ex index 2bd3f6a..9736b83 100644 --- a/lib/router.ex +++ b/lib/router.ex @@ -1,31 +1,42 @@ defmodule Wampex.Router do use Supervisor - alias Wampex.Router.REST + alias Wampex.Router.{Proxy, REST} alias Wampex.Router.Transports.WebSocket - @spec start_link(name: module(), port: pos_integer()) :: + @spec start_link(name: module(), port: pos_integer(), topologies: []) :: {:ok, pid()} | {:error, {:already_started, pid()} | {:shutdown, term()} | term()} - def start_link(name: name, port: port) when is_atom(name) do - Supervisor.start_link(__MODULE__, port, name: name) + def start_link(name: name, port: port, topologies: topologies) when is_atom(name) do + Supervisor.start_link(__MODULE__, {name, port, topologies}, name: name) end - @spec init(pos_integer()) :: + @spec init({module(), pos_integer(), list()}) :: {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore - def init(port) do + def init({name, port, topologies}) do children = [ - {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch()]} + {ClusterKV, + [ + name: db_name(name), + topologies: topologies, + replicas: 1, + quorum: 1 + ]}, + {Proxy, [name: proxy_name(name)]}, + {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch(name)]} ] Supervisor.init(children, strategy: :one_for_one, max_restarts: 10, max_seconds: 10) end - defp dispatch do + def db_name(name), do: Module.concat([name, KV]) + def proxy_name(name), do: Module.concat([name, Proxy]) + + defp dispatch(name) do [ {:_, [ - {"/ws", WebSocket, []}, + {"/ws", WebSocket, %{db: db_name(name), proxy: proxy_name(name)}}, {:_, Plug.Cowboy.Handler, {REST, []}} ]} ] diff --git a/lib/router/proxy.ex b/lib/router/proxy.ex new file mode 100644 index 0000000..c931e07 --- /dev/null +++ b/lib/router/proxy.ex @@ -0,0 +1,17 @@ +defmodule Wampex.Router.Proxy do + @moduledoc false + use GenServer + + def start_link(name: name) do + GenServer.start_link(__MODULE__, :ok, name: name) + end + + def init(:ok) do + {:ok, []} + end + + def handle_info({e, to}, state) do + send(to, e) + {:noreply, state} + end +end diff --git a/lib/router/session.ex b/lib/router/session.ex index 895eab5..7aded9e 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -12,10 +12,13 @@ defmodule Wampex.Router.Session do alias Wampex.Roles.Peer.{Challenge, Welcome} alias Wampex.Router alias __MODULE__, as: Sess + alias Dealer.{Invocation, Registered, Result, Unregistered} @enforce_keys [:transport, :transport_pid] defstruct [ :id, + :db, + :proxy, :transport, :transport_pid, :message, @@ -33,6 +36,9 @@ defmodule Wampex.Router.Session do :name, :goodbye, roles: [Peer, Broker, Dealer], + registrations: [], + subscriptions: [], + invocations: [], message_queue: [], request_id: 0, requests: [] @@ -40,6 +46,8 @@ defmodule Wampex.Router.Session do @type t :: %__MODULE__{ id: integer() | nil, + db: module() | nil, + proxy: module() | nil, transport_pid: pid() | module() | nil, message: Wampex.message() | nil, hello: Wampex.hello() | nil, @@ -55,6 +63,9 @@ defmodule Wampex.Router.Session do realm: Realm.t(), name: module() | nil, roles: [module()], + registrations: [], + subscriptions: [], + invocations: [], message_queue: [], request_id: integer(), requests: [] @@ -104,7 +115,7 @@ defmodule Wampex.Router.Session do ) do debug("Init") - {:ok, %SL{data | data: %Sess{data.data | id: Enum.random(0..@max_id)}}, + {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}}, [{:next_event, :internal, :transition}]} end @@ -131,7 +142,7 @@ defmodule Wampex.Router.Session do {%SL{data: sess} = data, response} = maybe_update_response(data, response) {requests, actions} = resp = handle_response(id, actions, requests, response) debug("Response: #{inspect(resp)}") - {:ok, %SL{data | data: %Sess{sess | requests: requests}}, actions} + {:ok, %SL{data: %Sess{sess | requests: requests}}, actions} end @impl true @@ -154,14 +165,96 @@ defmodule Wampex.Router.Session do _, @register, %SL{ - data: %Sess{transport: tt, transport_pid: t, register: r} + data: + %Sess{ + transport: tt, + transport_pid: t, + registrations: regs, + db: db, + register: {:register, rid, opts, procedure} + } = data } = sl ) do - # TODO check for authentication request - debug("Handling Registration #{inspect(r)}") + debug("Handling Registration #{inspect(procedure)}") + id = get_global_id() + ClusterKV.put(db, procedure, {id, {self(), Node.self()}}) + tt.send_request(t, Dealer.registered(%Registered{request_id: rid, registration_id: id})) + + {:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}}, + [{:next_event, :internal, :transition}]} + end + + @impl true + def handle_resource( + @handle_call, + _, + @call, + %SL{ + data: + %Sess{ + db: db, + proxy: proxy, + transport: tt, + transport_pid: t, + call: {:call, ri, dets, proc, al, akw} + } = data + } = sl + ) do + {key, [{id, {pid, node}} | _]} = ClusterKV.get(db, proc, 500) + + Logger.info( + "Sending Invocation to #{inspect(proxy)} at node #{inspect(node)} with pid #{inspect(pid)}" + ) + + send( + {proxy, node}, + { + { + %Invocation{ + request_id: ri, + registration_id: id, + options: dets, + arg_list: al, + arg_kw: akw + }, + {self(), Node.self()} + }, + pid + } + ) + {:ok, sl, [{:next_event, :internal, :transition}]} end + @impl true + def handle_resource( + @handle_yield, + _, + @yield, + %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} = + data + ) do + {_, {pid, node}} = + Enum.find(inv, fn + {%Invocation{request_id: ^id}, _} -> true + _ -> false + end) + + send( + {proxy, node}, + {%Result{request_id: id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid} + ) + + inv = + Enum.filter(inv, fn + {%Invocation{request_id: ^id}, _} -> false + _ -> false + end) + + {:ok, %SL{data | data: %Sess{data.data | invocations: inv}}, + [{:next_event, :internal, :transition}]} + end + @impl true def handle_resource( @handle_subscribe, @@ -199,12 +292,35 @@ defmodule Wampex.Router.Session do {:ok, data, []} end + @impl true + def handle_info( + {%Invocation{} = e, from}, + _, + %SL{data: %Sess{transport: tt, transport_pid: t, invocations: inv}} = data + ) do + Logger.info("Received Invocation #{inspect(e)}") + tt.send_request(t, Dealer.invocation(e)) + {:ok, %SL{data | data: %Sess{data.data | invocations: [{e, from} | inv]}}, []} + end + + @impl true + def handle_info(%Result{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do + tt.send_request(t, Dealer.result(e)) + {:ok, data, []} + end + @impl true def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do {:ok, %SL{data | data: %Sess{sess | message: message}}, [{:next_event, :internal, :message_received}]} end + @impl true + def handle_info(event, state, data) do + Logger.info("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}") + {:ok, data, []} + end + defp handle_response(nil, actions, requests, _), do: {requests, actions} defp handle_response(id, actions, requests, response) do @@ -220,6 +336,10 @@ defmodule Wampex.Router.Session do end) end + defp get_global_id do + Enum.random(0..@max_id) + end + defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do send(from, {:progress, arg_l, arg_kw}) [] diff --git a/lib/router/transports/web_socket.ex b/lib/router/transports/web_socket.ex index 54ae007..02173cb 100644 --- a/lib/router/transports/web_socket.ex +++ b/lib/router/transports/web_socket.ex @@ -19,15 +19,15 @@ defmodule Wampex.Router.Transports.WebSocket do end @impl true - def init(req, _state) do + def init(req, %{db: db, proxy: proxy}) do {:ok, serializer, protocol} = get_serializer(req) req = :cowboy_req.set_resp_header(@protocol_header, protocol, req) - {:cowboy_websocket, req, %WebSocket{serializer: serializer}} + {:cowboy_websocket, req, {%WebSocket{serializer: serializer}, db, proxy}} end @impl true - def websocket_init(state) do - {:ok, session} = start_session(state) + def websocket_init({state, db, proxy}) do + {:ok, session} = start_session(db, proxy) Logger.info("Websocket Initialized: #{inspect(state)}") {:ok, %WebSocket{state | session: session}} end @@ -60,8 +60,8 @@ defmodule Wampex.Router.Transports.WebSocket do {:ok, state} end - defp start_session(_state) do - Session.start_link(%Session{transport: WebSocket, transport_pid: self()}) + defp start_session(db, proxy) do + Session.start_link(%Session{db: db, proxy: proxy, transport: WebSocket, transport_pid: self()}) end defp get_serializer(req) do diff --git a/lib/wampex.ex b/lib/wampex.ex index 30a6f1f..4bbddba 100644 --- a/lib/wampex.ex +++ b/lib/wampex.ex @@ -12,8 +12,8 @@ defmodule Wampex do @type authenticate :: {:authenticate, signature :: binary(), details :: map()} @type call :: - {:call, request_id :: integer(), details :: map(), arg_list :: arg_list(), - arg_keywords :: arg_keyword()} + {:call, request_id :: integer(), details :: map(), procedure :: String.t(), + arg_list :: arg_list(), arg_keywords :: arg_keyword()} @type yield :: {:yield, request_id :: integer(), details :: map(), arg_list :: arg_list(), diff --git a/mix.exs b/mix.exs index 4de3490..66459e1 100644 --- a/mix.exs +++ b/mix.exs @@ -37,6 +37,9 @@ defmodule Wampex.MixProject do defp deps do [ + {:cluster_kv, + git: "https://gitlab.com/entropealabs/cluster_kv.git", + tag: "c274b77acb2711ac0c8a79253cb7870c606f7297"}, {:cors_plug, "~> 2.0"}, {:credo, "~> 1.2", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false}, diff --git a/mix.lock b/mix.lock index bc5f8f6..2b4fe0b 100644 --- a/mix.lock +++ b/mix.lock @@ -1,6 +1,7 @@ %{ "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, "certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"}, + "cluster_kv": {:git, "https://gitlab.com/entropealabs/cluster_kv.git", "c274b77acb2711ac0c8a79253cb7870c606f7297", [tag: "c274b77acb2711ac0c8a79253cb7870c606f7297"]}, "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"}, "cors_plug": {:hex, :cors_plug, "2.0.2", "2b46083af45e4bc79632bd951550509395935d3e7973275b2b743bd63cc942ce", [:mix], [{:plug, "~> 1.8", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "f0d0e13f71c51fd4ef8b2c7e051388e4dfb267522a83a22392c856de7e46465f"}, "coveralls": {:hex, :coveralls, "2.1.0", "b44cea21202c0e1994dfced8cedbff66bcd03eeeb565484345b28afacda0559b", [:rebar3], [{:jsx, "2.10.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "bbfbe2a7bebb2d22db15a657e6c1733243e1501714d60fe14a14790d96e7c3b2"}, @@ -17,6 +18,8 @@ "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"}, "json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"}, "jsx": {:hex, :jsx, "2.10.0", "77760560d6ac2b8c51fd4c980e9e19b784016aa70be354ce746472c33beb0b1c", [:rebar3], [], "hexpm", "9a83e3704807298016968db506f9fad0f027de37546eb838b3ae1064c3a0ad62"}, + "libcluster": {:hex, :libcluster, "3.2.0", "ed483eea1b960b89a4cee374898644930b28759226716cd7aeaf707976c5966d", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "350e7c2cd5311b9dfb0156d5f5778561e5833f90fabdaac3cd79ea022c4dc3dc"}, + "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"}, "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a10c6eb62cca416019663129699769f0c2ccf39428b3bb3c0cb38c718a0c186d"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, @@ -29,6 +32,7 @@ "plug": {:hex, :plug, "1.9.0", "8d7c4e26962283ff9f8f3347bd73838e2413fbc38b7bb5467d5924f68f3a5a4a", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "9902eda2c52ada2a096434682e99a2493f5d06a94d6ac6bcfff9805f952350f1"}, "plug_cowboy": {:hex, :plug_cowboy, "2.1.2", "8b0addb5908c5238fac38e442e81b6fcd32788eaa03246b4d55d147c47c5805e", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "7d722581ce865a237e14da6d946f92704101740a256bd13ec91e63c0b122fc70"}, "plug_crypto": {:hex, :plug_crypto, "1.1.2", "bdd187572cc26dbd95b87136290425f2b580a116d3fb1f564216918c9730d227", [:mix], [], "hexpm", "6b8b608f895b6ffcfad49c37c7883e8df98ae19c6a28113b02aa1e9c5b22d6b5"}, + "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"}, "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm", "13104d7897e38ed7f044c4de953a6c28597d1c952075eb2e328bc6d6f2bfc496"}, "states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"}, diff --git a/test/wampex_test.exs b/test/wampex_test.exs index b9b8ef4..a1dfa47 100644 --- a/test/wampex_test.exs +++ b/test/wampex_test.exs @@ -24,7 +24,8 @@ defmodule WampexTest do @device "as987d9a8sd79a87ds" setup_all do - [server: Router.start_link(name: TestRouter, port: 4000)] + topologies = Application.get_env(:cluster_kv, :topologies) + [server: Router.start_link(name: TestRouter, port: 4000, topologies: topologies)] end @session %Session{url: @url, realm: @realm, roles: @roles}