-elixir 1.10.1
+elixir 1.9.4-otp-22
erlang 22.2.6
use Mix.Config
-config :wampex, state_machine: "priv/session.json"
+config :cluster_kv,
+ topologies: [
+ kv: [
+ strategy: Cluster.Strategy.Epmd,
+ config: [
+ hosts: []
+ ],
+ connect: {:net_kernel, :connect_node, []},
+ disconnect: {:erlang, :disconnect_node, []},
+ list_nodes: {:erlang, :nodes, [:connected]}
+ ]
+ ]
end
@impl true
- def handle([@call, id, dets]) do
- handle([@call, id, dets, [], %{}])
+ def handle([@call, id, dets, proc]) do
+ handle([@call, id, dets, proc, [], %{}])
end
@impl true
- def handle([@call, id, dets, arg_l]) do
- handle([@call, id, dets, arg_l, %{}])
+ def handle([@call, id, dets, proc, arg_l]) do
+ handle([@call, id, dets, proc, arg_l, %{}])
end
@impl true
- def handle([@call, id, dets, arg_l, arg_kw]) do
- {[{:next_event, :internal, :call}], id, {:update, :call, {:call, id, dets, arg_l, arg_kw}}}
+ def handle([@call, id, dets, proc, arg_l, arg_kw]) do
+ {[{:next_event, :internal, :call}], id,
+ {:update, :call, {:call, id, dets, proc, arg_l, arg_kw}}}
end
@impl true
defmodule Wampex.Router do
use Supervisor
- alias Wampex.Router.REST
+ alias Wampex.Router.{Proxy, REST}
alias Wampex.Router.Transports.WebSocket
- @spec start_link(name: module(), port: pos_integer()) ::
+ @spec start_link(name: module(), port: pos_integer(), topologies: []) ::
{:ok, pid()}
| {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
- def start_link(name: name, port: port) when is_atom(name) do
- Supervisor.start_link(__MODULE__, port, name: name)
+ def start_link(name: name, port: port, topologies: topologies) when is_atom(name) do
+ Supervisor.start_link(__MODULE__, {name, port, topologies}, name: name)
end
- @spec init(pos_integer()) ::
+ @spec init({module(), pos_integer(), list()}) ::
{:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
- def init(port) do
+ def init({name, port, topologies}) do
children = [
- {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch()]}
+ {ClusterKV,
+ [
+ name: db_name(name),
+ topologies: topologies,
+ replicas: 1,
+ quorum: 1
+ ]},
+ {Proxy, [name: proxy_name(name)]},
+ {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch(name)]}
]
Supervisor.init(children, strategy: :one_for_one, max_restarts: 10, max_seconds: 10)
end
- defp dispatch do
+ def db_name(name), do: Module.concat([name, KV])
+ def proxy_name(name), do: Module.concat([name, Proxy])
+
+ defp dispatch(name) do
[
{:_,
[
- {"/ws", WebSocket, []},
+ {"/ws", WebSocket, %{db: db_name(name), proxy: proxy_name(name)}},
{:_, Plug.Cowboy.Handler, {REST, []}}
]}
]
--- /dev/null
+defmodule Wampex.Router.Proxy do
+ @moduledoc false
+ use GenServer
+
+ def start_link(name: name) do
+ GenServer.start_link(__MODULE__, :ok, name: name)
+ end
+
+ def init(:ok) do
+ {:ok, []}
+ end
+
+ def handle_info({e, to}, state) do
+ send(to, e)
+ {:noreply, state}
+ end
+end
alias Wampex.Roles.Peer.{Challenge, Welcome}
alias Wampex.Router
alias __MODULE__, as: Sess
+ alias Dealer.{Invocation, Registered, Result, Unregistered}
@enforce_keys [:transport, :transport_pid]
defstruct [
:id,
+ :db,
+ :proxy,
:transport,
:transport_pid,
:message,
:name,
:goodbye,
roles: [Peer, Broker, Dealer],
+ registrations: [],
+ subscriptions: [],
+ invocations: [],
message_queue: [],
request_id: 0,
requests: []
@type t :: %__MODULE__{
id: integer() | nil,
+ db: module() | nil,
+ proxy: module() | nil,
transport_pid: pid() | module() | nil,
message: Wampex.message() | nil,
hello: Wampex.hello() | nil,
realm: Realm.t(),
name: module() | nil,
roles: [module()],
+ registrations: [],
+ subscriptions: [],
+ invocations: [],
message_queue: [],
request_id: integer(),
requests: []
) do
debug("Init")
- {:ok, %SL{data | data: %Sess{data.data | id: Enum.random(0..@max_id)}},
+ {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}},
[{:next_event, :internal, :transition}]}
end
{%SL{data: sess} = data, response} = maybe_update_response(data, response)
{requests, actions} = resp = handle_response(id, actions, requests, response)
debug("Response: #{inspect(resp)}")
- {:ok, %SL{data | data: %Sess{sess | requests: requests}}, actions}
+ {:ok, %SL{data: %Sess{sess | requests: requests}}, actions}
end
@impl true
_,
@register,
%SL{
- data: %Sess{transport: tt, transport_pid: t, register: r}
+ data:
+ %Sess{
+ transport: tt,
+ transport_pid: t,
+ registrations: regs,
+ db: db,
+ register: {:register, rid, opts, procedure}
+ } = data
} = sl
) do
- # TODO check for authentication request
- debug("Handling Registration #{inspect(r)}")
+ debug("Handling Registration #{inspect(procedure)}")
+ id = get_global_id()
+ ClusterKV.put(db, procedure, {id, {self(), Node.self()}})
+ tt.send_request(t, Dealer.registered(%Registered{request_id: rid, registration_id: id}))
+
+ {:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}},
+ [{:next_event, :internal, :transition}]}
+ end
+
+ @impl true
+ def handle_resource(
+ @handle_call,
+ _,
+ @call,
+ %SL{
+ data:
+ %Sess{
+ db: db,
+ proxy: proxy,
+ transport: tt,
+ transport_pid: t,
+ call: {:call, ri, dets, proc, al, akw}
+ } = data
+ } = sl
+ ) do
+ {key, [{id, {pid, node}} | _]} = ClusterKV.get(db, proc, 500)
+
+ Logger.info(
+ "Sending Invocation to #{inspect(proxy)} at node #{inspect(node)} with pid #{inspect(pid)}"
+ )
+
+ send(
+ {proxy, node},
+ {
+ {
+ %Invocation{
+ request_id: ri,
+ registration_id: id,
+ options: dets,
+ arg_list: al,
+ arg_kw: akw
+ },
+ {self(), Node.self()}
+ },
+ pid
+ }
+ )
+
{:ok, sl, [{:next_event, :internal, :transition}]}
end
+ @impl true
+ def handle_resource(
+ @handle_yield,
+ _,
+ @yield,
+ %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} =
+ data
+ ) do
+ {_, {pid, node}} =
+ Enum.find(inv, fn
+ {%Invocation{request_id: ^id}, _} -> true
+ _ -> false
+ end)
+
+ send(
+ {proxy, node},
+ {%Result{request_id: id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid}
+ )
+
+ inv =
+ Enum.filter(inv, fn
+ {%Invocation{request_id: ^id}, _} -> false
+ _ -> false
+ end)
+
+ {:ok, %SL{data | data: %Sess{data.data | invocations: inv}},
+ [{:next_event, :internal, :transition}]}
+ end
+
@impl true
def handle_resource(
@handle_subscribe,
{:ok, data, []}
end
+ @impl true
+ def handle_info(
+ {%Invocation{} = e, from},
+ _,
+ %SL{data: %Sess{transport: tt, transport_pid: t, invocations: inv}} = data
+ ) do
+ Logger.info("Received Invocation #{inspect(e)}")
+ tt.send_request(t, Dealer.invocation(e))
+ {:ok, %SL{data | data: %Sess{data.data | invocations: [{e, from} | inv]}}, []}
+ end
+
+ @impl true
+ def handle_info(%Result{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
+ tt.send_request(t, Dealer.result(e))
+ {:ok, data, []}
+ end
+
@impl true
def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do
{:ok, %SL{data | data: %Sess{sess | message: message}},
[{:next_event, :internal, :message_received}]}
end
+ @impl true
+ def handle_info(event, state, data) do
+ Logger.info("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}")
+ {:ok, data, []}
+ end
+
defp handle_response(nil, actions, requests, _), do: {requests, actions}
defp handle_response(id, actions, requests, response) do
end)
end
+ defp get_global_id do
+ Enum.random(0..@max_id)
+ end
+
defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do
send(from, {:progress, arg_l, arg_kw})
[]
end
@impl true
- def init(req, _state) do
+ def init(req, %{db: db, proxy: proxy}) do
{:ok, serializer, protocol} = get_serializer(req)
req = :cowboy_req.set_resp_header(@protocol_header, protocol, req)
- {:cowboy_websocket, req, %WebSocket{serializer: serializer}}
+ {:cowboy_websocket, req, {%WebSocket{serializer: serializer}, db, proxy}}
end
@impl true
- def websocket_init(state) do
- {:ok, session} = start_session(state)
+ def websocket_init({state, db, proxy}) do
+ {:ok, session} = start_session(db, proxy)
Logger.info("Websocket Initialized: #{inspect(state)}")
{:ok, %WebSocket{state | session: session}}
end
{:ok, state}
end
- defp start_session(_state) do
- Session.start_link(%Session{transport: WebSocket, transport_pid: self()})
+ defp start_session(db, proxy) do
+ Session.start_link(%Session{db: db, proxy: proxy, transport: WebSocket, transport_pid: self()})
end
defp get_serializer(req) do
@type authenticate :: {:authenticate, signature :: binary(), details :: map()}
@type call ::
- {:call, request_id :: integer(), details :: map(), arg_list :: arg_list(),
- arg_keywords :: arg_keyword()}
+ {:call, request_id :: integer(), details :: map(), procedure :: String.t(),
+ arg_list :: arg_list(), arg_keywords :: arg_keyword()}
@type yield ::
{:yield, request_id :: integer(), details :: map(), arg_list :: arg_list(),
defp deps do
[
+ {:cluster_kv,
+ git: "https://gitlab.com/entropealabs/cluster_kv.git",
+ tag: "c274b77acb2711ac0c8a79253cb7870c606f7297"},
{:cors_plug, "~> 2.0"},
{:credo, "~> 1.2", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
%{
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"},
+ "cluster_kv": {:git, "https://gitlab.com/entropealabs/cluster_kv.git", "c274b77acb2711ac0c8a79253cb7870c606f7297", [tag: "c274b77acb2711ac0c8a79253cb7870c606f7297"]},
"conv_case": {:hex, :conv_case, "0.2.2", "5a98b74ab8f7ddbad670e5c7bb39ff280e60699aa3b25c7062ceccf48137433c", [:mix], [], "hexpm", "561c550ab6d55b2a4d4c14449e58c9957798613eb26ea182e14a962965377bca"},
"cors_plug": {:hex, :cors_plug, "2.0.2", "2b46083af45e4bc79632bd951550509395935d3e7973275b2b743bd63cc942ce", [:mix], [{:plug, "~> 1.8", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "f0d0e13f71c51fd4ef8b2c7e051388e4dfb267522a83a22392c856de7e46465f"},
"coveralls": {:hex, :coveralls, "2.1.0", "b44cea21202c0e1994dfced8cedbff66bcd03eeeb565484345b28afacda0559b", [:rebar3], [{:jsx, "2.10.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "bbfbe2a7bebb2d22db15a657e6c1733243e1501714d60fe14a14790d96e7c3b2"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},
"json_xema": {:hex, :json_xema, "0.4.0", "377446cd5c0e2cbba52b9d7ab67c05579e6d4a788335220215a8870eac821996", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.11", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "452724a5b2751cd69191edd3fd3da0c2194c164ebd49efd85f9abb64d6621b53"},
"jsx": {:hex, :jsx, "2.10.0", "77760560d6ac2b8c51fd4c980e9e19b784016aa70be354ce746472c33beb0b1c", [:rebar3], [], "hexpm", "9a83e3704807298016968db506f9fad0f027de37546eb838b3ae1064c3a0ad62"},
+ "libcluster": {:hex, :libcluster, "3.2.0", "ed483eea1b960b89a4cee374898644930b28759226716cd7aeaf707976c5966d", [:mix], [{:jason, "~> 1.1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "350e7c2cd5311b9dfb0156d5f5778561e5833f90fabdaac3cd79ea022c4dc3dc"},
+ "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm", "1feaf05ee886815ad047cad7ede17d6910710986148ae09cf73eee2989717b81"},
"makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a10c6eb62cca416019663129699769f0c2ccf39428b3bb3c0cb38c718a0c186d"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"plug": {:hex, :plug, "1.9.0", "8d7c4e26962283ff9f8f3347bd73838e2413fbc38b7bb5467d5924f68f3a5a4a", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "9902eda2c52ada2a096434682e99a2493f5d06a94d6ac6bcfff9805f952350f1"},
"plug_cowboy": {:hex, :plug_cowboy, "2.1.2", "8b0addb5908c5238fac38e442e81b6fcd32788eaa03246b4d55d147c47c5805e", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "7d722581ce865a237e14da6d946f92704101740a256bd13ec91e63c0b122fc70"},
"plug_crypto": {:hex, :plug_crypto, "1.1.2", "bdd187572cc26dbd95b87136290425f2b580a116d3fb1f564216918c9730d227", [:mix], [], "hexpm", "6b8b608f895b6ffcfad49c37c7883e8df98ae19c6a28113b02aa1e9c5b22d6b5"},
+ "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm", "13104d7897e38ed7f044c4de953a6c28597d1c952075eb2e328bc6d6f2bfc496"},
"states_language": {:hex, :states_language, "0.2.8", "f9dfd3c0bd9a9d7bda25ef315f2d90944cd6b2022a7f3c403deb1d4ec451825e", [:mix], [{:elixpath, "~> 0.1.0", [hex: :elixpath, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:json_xema, "~> 0.4.0", [hex: :json_xema, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xema, "~> 0.11.0", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "a5231691e7cb37fe32dc7de54c2dc86d1d60e84c4f0379f3246e55be2a85ec78"},
@device "as987d9a8sd79a87ds"
setup_all do
- [server: Router.start_link(name: TestRouter, port: 4000)]
+ topologies = Application.get_env(:cluster_kv, :topologies)
+ [server: Router.start_link(name: TestRouter, port: 4000, topologies: topologies)]
end
@session %Session{url: @url, realm: @realm, roles: @roles}