]> Entropealabs - wampex.git/commitdiff
routed RPC working
authorChristopher <chris@entropealabs.com>
Sun, 15 Mar 2020 18:26:54 +0000 (13:26 -0500)
committerChristopher <chris@entropealabs.com>
Sun, 15 Mar 2020 18:26:54 +0000 (13:26 -0500)
.tool-versions
config/config.exs
lib/roles/dealer.ex
lib/router.ex
lib/router/proxy.ex [new file with mode: 0644]
lib/router/session.ex
lib/router/transports/web_socket.ex
lib/wampex.ex
mix.exs
mix.lock
test/wampex_test.exs

index c217a1bce95dbc831476b2eaf4ae1d6df79c2f4c..30c5b80cff5336a055d307cd57018a504022f5aa 100644 (file)
@@ -1,2 +1,2 @@
-elixir 1.10.1
+elixir 1.9.4-otp-22
 erlang 22.2.6
index 19932f3ed431dcac61d5f5478fdf2624f5a1e28d..988fb251decd957746ffee0c46d934366bfdb39a 100644 (file)
@@ -1,3 +1,14 @@
 use Mix.Config
 
-config :wampex, state_machine: "priv/session.json"
+config :cluster_kv,
+  topologies: [
+    kv: [
+      strategy: Cluster.Strategy.Epmd,
+      config: [
+        hosts: []
+      ],
+      connect: {:net_kernel, :connect_node, []},
+      disconnect: {:erlang, :disconnect_node, []},
+      list_nodes: {:erlang, :nodes, [:connected]}
+    ]
+  ]
index a4c86240ed1a0788c21ef3aafbec331ba8a8cefb..61bab0b912c5201c548b4f2a6e61d25d531d27b1 100644 (file)
@@ -111,18 +111,19 @@ defmodule Wampex.Roles.Dealer do
   end
 
   @impl true
-  def handle([@call, id, dets]) do
-    handle([@call, id, dets, [], %{}])
+  def handle([@call, id, dets, proc]) do
+    handle([@call, id, dets, proc, [], %{}])
   end
 
   @impl true
-  def handle([@call, id, dets, arg_l]) do
-    handle([@call, id, dets, arg_l, %{}])
+  def handle([@call, id, dets, proc, arg_l]) do
+    handle([@call, id, dets, proc, arg_l, %{}])
   end
 
   @impl true
-  def handle([@call, id, dets, arg_l, arg_kw]) do
-    {[{:next_event, :internal, :call}], id, {:update, :call, {:call, id, dets, arg_l, arg_kw}}}
+  def handle([@call, id, dets, proc, arg_l, arg_kw]) do
+    {[{:next_event, :internal, :call}], id,
+     {:update, :call, {:call, id, dets, proc, arg_l, arg_kw}}}
   end
 
   @impl true
index 2bd3f6a9359178f33ebcb096dd1889b0d72e65f5..9736b83a90c24b664f60049c34385204f6c09771 100644 (file)
@@ -1,31 +1,42 @@
 defmodule Wampex.Router do
   use Supervisor
 
-  alias Wampex.Router.REST
+  alias Wampex.Router.{Proxy, REST}
   alias Wampex.Router.Transports.WebSocket
 
-  @spec start_link(name: module(), port: pos_integer()) ::
+  @spec start_link(name: module(), port: pos_integer(), topologies: []) ::
           {:ok, pid()}
           | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
-  def start_link(name: name, port: port) when is_atom(name) do
-    Supervisor.start_link(__MODULE__, port, name: name)
+  def start_link(name: name, port: port, topologies: topologies) when is_atom(name) do
+    Supervisor.start_link(__MODULE__, {name, port, topologies}, name: name)
   end
 
-  @spec init(pos_integer()) ::
+  @spec init({module(), pos_integer(), list()}) ::
           {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
-  def init(port) do
+  def init({name, port, topologies}) do
     children = [
-      {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch()]}
+      {ClusterKV,
+       [
+         name: db_name(name),
+         topologies: topologies,
+         replicas: 1,
+         quorum: 1
+       ]},
+      {Proxy, [name: proxy_name(name)]},
+      {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch(name)]}
     ]
 
     Supervisor.init(children, strategy: :one_for_one, max_restarts: 10, max_seconds: 10)
   end
 
-  defp dispatch do
+  def db_name(name), do: Module.concat([name, KV])
+  def proxy_name(name), do: Module.concat([name, Proxy])
+
+  defp dispatch(name) do
     [
       {:_,
        [
-         {"/ws", WebSocket, []},
+         {"/ws", WebSocket, %{db: db_name(name), proxy: proxy_name(name)}},
          {:_, Plug.Cowboy.Handler, {REST, []}}
        ]}
     ]
diff --git a/lib/router/proxy.ex b/lib/router/proxy.ex
new file mode 100644 (file)
index 0000000..c931e07
--- /dev/null
@@ -0,0 +1,17 @@
+defmodule Wampex.Router.Proxy do
+  @moduledoc false
+  use GenServer
+
+  def start_link(name: name) do
+    GenServer.start_link(__MODULE__, :ok, name: name)
+  end
+
+  def init(:ok) do
+    {:ok, []}
+  end
+
+  def handle_info({e, to}, state) do
+    send(to, e)
+    {:noreply, state}
+  end
+end
index 895eab5257001475c44890df1d0d7279ce943757..7aded9e18059d5fdd84aa5920dc009828ed738ec 100644 (file)
@@ -12,10 +12,13 @@ defmodule Wampex.Router.Session do
   alias Wampex.Roles.Peer.{Challenge, Welcome}
   alias Wampex.Router
   alias __MODULE__, as: Sess
+  alias Dealer.{Invocation, Registered, Result, Unregistered}
 
   @enforce_keys [:transport, :transport_pid]
   defstruct [
     :id,
+    :db,
+    :proxy,
     :transport,
     :transport_pid,
     :message,
@@ -33,6 +36,9 @@ defmodule Wampex.Router.Session do
     :name,
     :goodbye,
     roles: [Peer, Broker, Dealer],
+    registrations: [],
+    subscriptions: [],
+    invocations: [],
     message_queue: [],
     request_id: 0,
     requests: []
@@ -40,6 +46,8 @@ defmodule Wampex.Router.Session do
 
   @type t :: %__MODULE__{
           id: integer() | nil,
+          db: module() | nil,
+          proxy: module() | nil,
           transport_pid: pid() | module() | nil,
           message: Wampex.message() | nil,
           hello: Wampex.hello() | nil,
@@ -55,6 +63,9 @@ defmodule Wampex.Router.Session do
           realm: Realm.t(),
           name: module() | nil,
           roles: [module()],
+          registrations: [],
+          subscriptions: [],
+          invocations: [],
           message_queue: [],
           request_id: integer(),
           requests: []
@@ -104,7 +115,7 @@ defmodule Wampex.Router.Session do
       ) do
     debug("Init")
 
-    {:ok, %SL{data | data: %Sess{data.data | id: Enum.random(0..@max_id)}},
+    {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}},
      [{:next_event, :internal, :transition}]}
   end
 
@@ -131,7 +142,7 @@ defmodule Wampex.Router.Session do
     {%SL{data: sess} = data, response} = maybe_update_response(data, response)
     {requests, actions} = resp = handle_response(id, actions, requests, response)
     debug("Response: #{inspect(resp)}")
-    {:ok, %SL{data | data: %Sess{sess | requests: requests}}, actions}
+    {:ok, %SL{data: %Sess{sess | requests: requests}}, actions}
   end
 
   @impl true
@@ -154,14 +165,96 @@ defmodule Wampex.Router.Session do
         _,
         @register,
         %SL{
-          data: %Sess{transport: tt, transport_pid: t, register: r}
+          data:
+            %Sess{
+              transport: tt,
+              transport_pid: t,
+              registrations: regs,
+              db: db,
+              register: {:register, rid, opts, procedure}
+            } = data
         } = sl
       ) do
-    # TODO check for authentication request
-    debug("Handling Registration #{inspect(r)}")
+    debug("Handling Registration #{inspect(procedure)}")
+    id = get_global_id()
+    ClusterKV.put(db, procedure, {id, {self(), Node.self()}})
+    tt.send_request(t, Dealer.registered(%Registered{request_id: rid, registration_id: id}))
+
+    {:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}},
+     [{:next_event, :internal, :transition}]}
+  end
+
+  @impl true
+  def handle_resource(
+        @handle_call,
+        _,
+        @call,
+        %SL{
+          data:
+            %Sess{
+              db: db,
+              proxy: proxy,
+              transport: tt,
+              transport_pid: t,
+              call: {:call, ri, dets, proc, al, akw}
+            } = data
+        } = sl
+      ) do
+    {key, [{id, {pid, node}} | _]} = ClusterKV.get(db, proc, 500)
+
+    Logger.info(
+      "Sending Invocation to #{inspect(proxy)} at node #{inspect(node)} with pid #{inspect(pid)}"
+    )
+
+    send(
+      {proxy, node},
+      {
+        {
+          %Invocation{
+            request_id: ri,
+            registration_id: id,
+            options: dets,
+            arg_list: al,
+            arg_kw: akw
+          },
+          {self(), Node.self()}
+        },
+        pid
+      }
+    )
+
     {:ok, sl, [{:next_event, :internal, :transition}]}
   end
 
+  @impl true
+  def handle_resource(
+        @handle_yield,
+        _,
+        @yield,
+        %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} =
+          data
+      ) do
+    {_, {pid, node}} =
+      Enum.find(inv, fn
+        {%Invocation{request_id: ^id}, _} -> true
+        _ -> false
+      end)
+
+    send(
+      {proxy, node},
+      {%Result{request_id: id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid}
+    )
+
+    inv =
+      Enum.filter(inv, fn
+        {%Invocation{request_id: ^id}, _} -> false
+        _ -> false
+      end)
+
+    {:ok, %SL{data | data: %Sess{data.data | invocations: inv}},
+     [{:next_event, :internal, :transition}]}
+  end
+
   @impl true
   def handle_resource(
         @handle_subscribe,
@@ -199,12 +292,35 @@ defmodule Wampex.Router.Session do
     {:ok, data, []}
   end
 
+  @impl true
+  def handle_info(
+        {%Invocation{} = e, from},
+        _,
+        %SL{data: %Sess{transport: tt, transport_pid: t, invocations: inv}} = data
+      ) do
+    Logger.info("Received Invocation #{inspect(e)}")
+    tt.send_request(t, Dealer.invocation(e))
+    {:ok, %SL{data | data: %Sess{data.data | invocations: [{e, from} | inv]}}, []}
+  end
+
+  @impl true
+  def handle_info(%Result{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
+    tt.send_request(t, Dealer.result(e))
+    {:ok, data, []}
+  end
+
   @impl true
   def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do
     {:ok, %SL{data | data: %Sess{sess | message: message}},
      [{:next_event, :internal, :message_received}]}
   end
 
+  @impl true
+  def handle_info(event, state, data) do
+    Logger.info("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}")
+    {:ok, data, []}
+  end
+
   defp handle_response(nil, actions, requests, _), do: {requests, actions}
 
   defp handle_response(id, actions, requests, response) do
@@ -220,6 +336,10 @@ defmodule Wampex.Router.Session do
     end)
   end
 
+  defp get_global_id do
+    Enum.random(0..@max_id)
+  end
+
   defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do
     send(from, {:progress, arg_l, arg_kw})
     []
index 54ae007adc0e801cf62c27606cee9f141aa0db2f..02173cb8640a8a6538917c1788aa11dee6134743 100644 (file)
@@ -19,15 +19,15 @@ defmodule Wampex.Router.Transports.WebSocket do
   end
 
   @impl true
-  def init(req, _state) do
+  def init(req, %{db: db, proxy: proxy}) do
     {:ok, serializer, protocol} = get_serializer(req)
     req = :cowboy_req.set_resp_header(@protocol_header, protocol, req)
-    {:cowboy_websocket, req, %WebSocket{serializer: serializer}}
+    {:cowboy_websocket, req, {%WebSocket{serializer: serializer}, db, proxy}}
   end
 
   @impl true
-  def websocket_init(state) do
-    {:ok, session} = start_session(state)
+  def websocket_init({state, db, proxy}) do
+    {:ok, session} = start_session(db, proxy)
     Logger.info("Websocket Initialized: #{inspect(state)}")
     {:ok, %WebSocket{state | session: session}}
   end
@@ -60,8 +60,8 @@ defmodule Wampex.Router.Transports.WebSocket do
     {:ok, state}
   end
 
-  defp start_session(_state) do
-    Session.start_link(%Session{transport: WebSocket, transport_pid: self()})
+  defp start_session(db, proxy) do
+    Session.start_link(%Session{db: db, proxy: proxy, transport: WebSocket, transport_pid: self()})
   end
 
   defp get_serializer(req) do
index 30a6f1f8349ed648deadc84749a649540bcdfce3..4bbddba389606efb12480a42ee5697abf479e0cd 100644 (file)
@@ -12,8 +12,8 @@ defmodule Wampex do
   @type authenticate :: {:authenticate, signature :: binary(), details :: map()}
 
   @type call ::
-          {:call, request_id :: integer(), details :: map(), arg_list :: arg_list(),
-           arg_keywords :: arg_keyword()}
+          {:call, request_id :: integer(), details :: map(), procedure :: String.t(),
+           arg_list :: arg_list(), arg_keywords :: arg_keyword()}
 
   @type yield ::
           {:yield, request_id :: integer(), details :: map(), arg_list :: arg_list(),
diff --git a/mix.exs b/mix.exs
index 4de3490d56664d0ce88a42b17d7297bcde735ade..66459e18d58b8f54bf08c161fc569c0348a6e436 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -37,6 +37,9 @@ defmodule Wampex.MixProject do
 
   defp deps do
     [
+      {:cluster_kv,
+       git: "https://gitlab.com/entropealabs/cluster_kv.git",
+       tag: "c274b77acb2711ac0c8a79253cb7870c606f7297"},
       {:cors_plug, "~> 2.0"},
       {:credo, "~> 1.2", only: [:dev, :test], runtime: false},
       {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
index bc5f8f6bb56b027737cdcd14b411d5e0eaf1babd..2b4fe0bff355155878117f744fe35bf5c0dec7f4 100644 (file)
--- a/mix.lock
+++ b/mix.lock
@@ -1,6 +1,7 @@
 %{
   "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
   "certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"},
+  "cluster_kv": {:git, "https://gitlab.com/entropealabs/cluster_kv.git", "c274b77acb2711ac0c8a79253cb7870c606f7297", [tag: "c274b77acb2711ac0c8a79253cb7870c606f7297"]},
   "conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"},
   "cors_plug": {:hex, :cors_plug, "2.0.2", "2b46083af45e4bc79632bd951550509395935d3e7973275b2b743bd63cc942ce", [:mix], [{:plug, "~> 1.8", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "f0d0e13f71c51fd4ef8b2c7e051388e4dfb267522a83a22392c856de7e46465f"},
   "coveralls": {:hex, :coveralls, "2.1.0", "b44cea21202c0e1994dfced8cedbff66bcd03eeeb565484345b28afacda0559b", [:rebar3], [{:jsx, "2.10.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "bbfbe2a7bebb2d22db15a657e6c1733243e1501714d60fe14a14790d96e7c3b2"},
@@ -17,6 +18,8 @@
   "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},
   "json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"},
   "jsx": {:hex, :jsx, "2.10.0", "77760560d6ac2b8c51fd4c980e9e19b784016aa70be354ce746472c33beb0b1c", [:rebar3], [], "hexpm", "9a83e3704807298016968db506f9fad0f027de37546eb838b3ae1064c3a0ad62"},
+  "libcluster": {:hex, :libcluster, "3.2.0", "ed483eea1b960b89a4cee374898644930b28759226716cd7aeaf707976c5966d", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "350e7c2cd5311b9dfb0156d5f5778561e5833f90fabdaac3cd79ea022c4dc3dc"},
+  "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"},
   "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a10c6eb62cca416019663129699769f0c2ccf39428b3bb3c0cb38c718a0c186d"},
   "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"},
   "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
@@ -29,6 +32,7 @@
   "plug": {:hex, :plug, "1.9.0", "8d7c4e26962283ff9f8f3347bd73838e2413fbc38b7bb5467d5924f68f3a5a4a", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "9902eda2c52ada2a096434682e99a2493f5d06a94d6ac6bcfff9805f952350f1"},
   "plug_cowboy": {:hex, :plug_cowboy, "2.1.2", "8b0addb5908c5238fac38e442e81b6fcd32788eaa03246b4d55d147c47c5805e", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "7d722581ce865a237e14da6d946f92704101740a256bd13ec91e63c0b122fc70"},
   "plug_crypto": {:hex, :plug_crypto, "1.1.2", "bdd187572cc26dbd95b87136290425f2b580a116d3fb1f564216918c9730d227", [:mix], [], "hexpm", "6b8b608f895b6ffcfad49c37c7883e8df98ae19c6a28113b02aa1e9c5b22d6b5"},
+  "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
   "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
   "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm", "13104d7897e38ed7f044c4de953a6c28597d1c952075eb2e328bc6d6f2bfc496"},
   "states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"},
index b9b8ef4f622c97feef61f0de37811074427c47bd..a1dfa47bb37f07e0cf2bad93c7d4e4cc0b95ae3b 100644 (file)
@@ -24,7 +24,8 @@ defmodule WampexTest do
   @device "as987d9a8sd79a87ds"
 
   setup_all do
-    [server: Router.start_link(name: TestRouter, port: 4000)]
+    topologies = Application.get_env(:cluster_kv, :topologies)
+    [server: Router.start_link(name: TestRouter, port: 4000, topologies: topologies)]
   end
 
   @session %Session{url: @url, realm: @realm, roles: @roles}