]> Entropealabs - wampex_router.git/commitdiff
adds cockroach backed Realms and Users, internal Admin procedure registration, ensure...
authorChristopher <chris@entropealabs.com>
Fri, 20 Mar 2020 18:15:57 +0000 (13:15 -0500)
committerChristopher <chris@entropealabs.com>
Fri, 20 Mar 2020 18:15:57 +0000 (13:15 -0500)
20 files changed:
README.md
config/config.exs
lib/roles/broker.ex
lib/roles/callee.ex
lib/roles/caller.ex
lib/roles/dealer.ex
lib/roles/peer.ex
lib/roles/publisher.ex
lib/roles/subscriber.ex
lib/router.ex
lib/router/admin.ex [new file with mode: 0644]
lib/router/authentication.ex
lib/router/authentication/ensure_default_admin.ex [new file with mode: 0644]
lib/router/authentication/realm.ex
lib/router/authentication/user.ex [new file with mode: 0644]
lib/router/ensure_default_realm.ex [deleted file]
lib/router/session.ex
lib/wampex/router/authentication/repo.ex [deleted file]
priv/repo/migrations/20200319210508_create_realms.exs [new file with mode: 0644]
test/wampex_test.exs

index bd5b551db8b2e06a97fbcd7a771f4b639225af5f..1985eac78bc7501e3ddd7080103e89d0e610a903 100644 (file)
--- a/README.md
+++ b/README.md
@@ -211,4 +211,10 @@ Warning as errors, mix format, Dialyzer, Credo and Coveralls
 $ MIX_ENV=test mix all_tests
 ```
 
+The Router uses CockroachDB
+
+```bash 
+$ docker run -p 26257:26257 -p 8080:8080 cockroachdb/cockroach-unstable:v20.1.0-beta.2 start-single-node --insecure
+```
+
 Happy hacking ;)
index a47cc7169924f77ff52acc11afb51f28300e6847..7a4a5aef36061286cb7a59da6ae3d78db2b6174a 100644 (file)
@@ -19,4 +19,6 @@ config :cluster_kv,
     ]
   ]
 
-config :wampex, ecto_repos: [Wampex.Router.Authentication.Repo]
+config :wampex,
+  ecto_repos: [Wampex.Router.Authentication.Repo],
+  keylen: 32
index 4d5daa23b3b9919762e73945ac016ca5c8424a06..89036b4a857022d8d8de4768d8d506bd4fbf7940 100644 (file)
@@ -16,13 +16,13 @@ defmodule Wampex.Roles.Broker do
   defmodule Event do
     @moduledoc false
     @enforce_keys [:subscription_id, :publication_id]
-    defstruct [:subscription_id, :publication_id, :arg_list, :arg_kw, options: %{}]
+    defstruct [:subscription_id, :publication_id, arg_list: [], arg_kw: %{}, options: %{}]
 
     @type t :: %__MODULE__{
             subscription_id: integer(),
             publication_id: integer(),
-            arg_list: nonempty_list(any()) | nil,
-            arg_kw: map() | nil,
+            arg_list: list(any()),
+            arg_kw: map(),
             options: map()
           }
   end
@@ -104,12 +104,12 @@ defmodule Wampex.Roles.Broker do
 
   @impl true
   def handle([@publish, id, opts, topic]) do
-    handle([@publish, id, opts, topic, nil, nil])
+    handle([@publish, id, opts, topic, [], %{}])
   end
 
   @impl true
   def handle([@publish, id, opts, topic, arg_l]) do
-    handle([@publish, id, opts, topic, arg_l, nil])
+    handle([@publish, id, opts, topic, arg_l, %{}])
   end
 
   @impl true
index d88777af65582a4e1ca4677b28f3e46191eba1cf..54b0fd01fca9e0991ffcadec8b8ea902f1e28233 100644 (file)
@@ -38,12 +38,12 @@ defmodule Wampex.Roles.Callee do
   defmodule Yield do
     @moduledoc false
     @enforce_keys [:request_id]
-    defstruct [:request_id, :arg_list, :arg_kw, options: %{}]
+    defstruct [:request_id, arg_list: [], arg_kw: %{}, options: %{}]
 
     @type t :: %__MODULE__{
             request_id: integer(),
-            arg_list: nonempty_list(any()) | nil,
-            arg_kw: map() | nil,
+            arg_list: list(any()),
+            arg_kw: map(),
             options: map()
           }
   end
@@ -80,12 +80,12 @@ defmodule Wampex.Roles.Callee do
 
   @impl true
   def handle([@invocation, id, reg_id, dets]) do
-    handle([@invocation, id, reg_id, dets, nil, nil])
+    handle([@invocation, id, reg_id, dets, [], %{}])
   end
 
   @impl true
   def handle([@invocation, id, reg_id, dets, arg_l]) do
-    handle([@invocation, id, dets, reg_id, arg_l, nil])
+    handle([@invocation, id, dets, reg_id, arg_l, %{}])
   end
 
   @impl true
index c662f19c69c64abc436a265d552f6cd3cb685ee5..068eb2885852c734f72c44a207d3527ce90191b0 100644 (file)
@@ -16,12 +16,12 @@ defmodule Wampex.Roles.Caller do
     @moduledoc false
     @enforce_keys [:procedure]
 
-    defstruct [:procedure, :arg_list, :arg_kw, options: %{}]
+    defstruct [:procedure, arg_list: [], arg_kw: %{}, options: %{}]
 
     @type t :: %__MODULE__{
             procedure: binary(),
-            arg_list: nonempty_list(any()) | nil,
-            arg_kw: map() | nil,
+            arg_list: list(any()),
+            arg_kw: map(),
             options: map()
           }
   end
@@ -59,12 +59,12 @@ defmodule Wampex.Roles.Caller do
 
   @impl true
   def handle([@result, id, dets]) do
-    handle([@result, id, dets, nil, nil])
+    handle([@result, id, dets, [], %{}])
   end
 
   @impl true
   def handle([@result, id, dets, arg_l]) do
-    handle([@result, id, dets, arg_l, nil])
+    handle([@result, id, dets, arg_l, %{}])
   end
 
   @impl true
index 5bd712925cde9821267b84dbbb758c03ba8089b5..d8e9ae032cde927a3c24d1424831e904723b99ac 100644 (file)
@@ -38,12 +38,12 @@ defmodule Wampex.Roles.Dealer do
   defmodule Result do
     @moduledoc false
     @enforce_keys [:request_id]
-    defstruct [:request_id, :arg_list, :arg_kw, options: %{}]
+    defstruct [:request_id, arg_list: [], arg_kw: %{}, options: %{}]
 
     @type t :: %__MODULE__{
             request_id: integer(),
-            arg_list: nonempty_list(any()) | nil,
-            arg_kw: map() | nil,
+            arg_list: list(any()),
+            arg_kw: map(),
             options: map()
           }
   end
@@ -51,13 +51,13 @@ defmodule Wampex.Roles.Dealer do
   defmodule Invocation do
     @moduledoc false
     @enforce_keys [:request_id, :registration_id]
-    defstruct [:request_id, :registration_id, :arg_list, :arg_kw, options: %{}]
+    defstruct [:request_id, :registration_id, arg_list: [], arg_kw: %{}, options: %{}]
 
     @type t :: %__MODULE__{
             request_id: integer(),
             registration_id: integer(),
-            arg_list: nonempty_list(any()) | nil,
-            arg_kw: map() | nil,
+            arg_list: list(any()),
+            arg_kw: map(),
             options: map()
           }
   end
@@ -112,12 +112,12 @@ defmodule Wampex.Roles.Dealer do
 
   @impl true
   def handle([@call, id, dets, proc]) do
-    handle([@call, id, dets, proc, nil, nil])
+    handle([@call, id, dets, proc, [], %{}])
   end
 
   @impl true
   def handle([@call, id, dets, proc, arg_l]) do
-    handle([@call, id, dets, proc, arg_l, nil])
+    handle([@call, id, dets, proc, arg_l, %{}])
   end
 
   @impl true
@@ -128,12 +128,12 @@ defmodule Wampex.Roles.Dealer do
 
   @impl true
   def handle([@yield, id, dets]) do
-    handle([@yield, id, dets, nil, nil])
+    handle([@yield, id, dets, [], %{}])
   end
 
   @impl true
   def handle([@yield, id, dets, arg_l]) do
-    handle([@yield, id, dets, arg_l, nil])
+    handle([@yield, id, dets, arg_l, %{}])
   end
 
   @impl true
index 329765e84d75bbb7a243de8c0ea08afaee6a6ece..23552fe66eaece1a9278e4f87b3a8dc3ca53eb56 100644 (file)
@@ -84,13 +84,13 @@ defmodule Wampex.Roles.Peer do
   defmodule Error do
     @moduledoc false
     @enforce_keys [:error]
-    defstruct [:request_id, :error, :arg_l, :arg_kw, details: %{}]
+    defstruct [:request_id, :error, arg_l: [], arg_kw: %{}, details: %{}]
 
     @type t :: %__MODULE__{
             request_id: integer() | nil,
             error: String.t(),
-            arg_l: list() | nil,
-            arg_kw: map() | nil,
+            arg_l: list(),
+            arg_kw: map(),
             details: map()
           }
   end
@@ -164,12 +164,12 @@ defmodule Wampex.Roles.Peer do
 
   @impl true
   def handle([@error, type, id, dets, error]) do
-    handle([@error, type, id, dets, error, nil, nil])
+    handle([@error, type, id, dets, error, [], %{}])
   end
 
   @impl true
   def handle([@error, type, id, dets, error, arg_l]) do
-    handle([@error, type, id, dets, error, arg_l, nil])
+    handle([@error, type, id, dets, error, arg_l, %{}])
   end
 
   @impl true
index 7d7b7982000f7105cf751c7b86031992863c7a2e..d31d7cac3d4711b95bb28eb81da24b408d1e1848 100644 (file)
@@ -12,12 +12,12 @@ defmodule Wampex.Roles.Publisher do
   defmodule Publish do
     @moduledoc false
     @enforce_keys [:topic]
-    defstruct [:topic, :arg_list, :arg_kw, options: %{}]
+    defstruct [:topic, arg_list: [], arg_kw: %{}, options: %{}]
 
     @type t :: %__MODULE__{
             topic: binary(),
-            arg_list: nonempty_list(any()) | nil,
-            arg_kw: map() | nil,
+            arg_list: list(any()),
+            arg_kw: map(),
             options: map()
           }
   end
index 62536ec6b06c4399dc45e8e11d0f11b76f831ac6..db3e86f74158d0f7823a3cf10e70359c6baad29f 100644 (file)
@@ -65,12 +65,12 @@ defmodule Wampex.Roles.Subscriber do
 
   @impl true
   def handle([@event, sub_id, pub_id, dets]) do
-    handle([@event, sub_id, pub_id, dets, nil, nil])
+    handle([@event, sub_id, pub_id, dets, [], %{}])
   end
 
   @impl true
   def handle([@event, sub_id, pub_id, dets, arg_l]) do
-    handle([@event, sub_id, pub_id, dets, arg_l, nil])
+    handle([@event, sub_id, pub_id, dets, arg_l, %{}])
   end
 
   @impl true
index ef901af1b0b8234a3dfc52579055f6ec25d31a25..2650155fc3651395dfaf299c69e39d21d74006e1 100644 (file)
@@ -1,7 +1,8 @@
 defmodule Wampex.Router do
   use Supervisor
 
-  alias Wampex.Router.{EnsureDefaultRealm, Authentication, Proxy, REST}
+  alias Wampex.Router.{Admin, Authentication, Proxy, REST}
+  alias Wampex.Router.Authentication.EnsureDefaultAdmin
   alias Wampex.Router.Transports.WebSocket
 
   @spec start_link(
@@ -9,21 +10,40 @@ defmodule Wampex.Router do
           port: pos_integer(),
           topologies: [],
           replicas: integer(),
-          quorum: integer()
+          quorum: integer(),
+          admin_realm: String.t(),
+          admin_authid: String.t(),
+          admin_password: String.t()
         ) ::
           {:ok, pid()}
           | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
-  def start_link(name: name, port: port, topologies: topologies, replicas: repls, quorum: quorum)
+  def start_link(
+        name: name,
+        port: port,
+        topologies: topologies,
+        replicas: repls,
+        quorum: quorum,
+        admin_realm: admin_realm,
+        admin_authid: admin_authid,
+        admin_password: admin_password
+      )
       when is_atom(name) do
-    Supervisor.start_link(__MODULE__, {name, port, topologies, repls, quorum}, name: name)
+    Supervisor.start_link(
+      __MODULE__,
+      {name, port, topologies, repls, quorum, admin_realm, admin_authid, admin_password},
+      name: name
+    )
   end
 
-  @spec init({module(), pos_integer(), list(), integer(), integer()}) ::
+  @spec init(
+          {module(), pos_integer(), list(), integer(), integer(), String.t(), String.t(),
+           String.t()}
+        ) ::
           {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
-  def init({name, port, topologies, replicas, quorum}) do
+  def init({name, port, topologies, replicas, quorum, admin_realm, admin_authid, admin_password}) do
     children = [
       Authentication.Repo,
-      {EnsureDefaultRealm, [uri: "org.entropealabs"]},
+      {EnsureDefaultAdmin, [uri: admin_realm, authid: admin_authid, password: admin_password]},
       {ClusterKV,
        [
          name: db_name(name),
@@ -32,6 +52,8 @@ defmodule Wampex.Router do
          quorum: quorum
        ]},
       {Proxy, [name: proxy_name(name)]},
+      {Admin,
+       [name: admin_name(name), db: db_name(name), realm: admin_realm, proxy: proxy_name(name)]},
       {Plug.Cowboy, scheme: :http, plug: REST, options: [port: port, dispatch: dispatch(name)]}
     ]
 
@@ -40,6 +62,7 @@ defmodule Wampex.Router do
 
   def db_name(name), do: Module.concat([name, KV])
   def proxy_name(name), do: Module.concat([name, Proxy])
+  def admin_name(name), do: Module.concat([name, Admin])
 
   defp dispatch(name) do
     [
diff --git a/lib/router/admin.ex b/lib/router/admin.ex
new file mode 100644 (file)
index 0000000..51866dd
--- /dev/null
@@ -0,0 +1,49 @@
+defmodule Wampex.Router.Admin do
+  @moduledoc false
+  use GenServer
+  require Logger
+  alias Wampex.Roles.Dealer.{Invocation, Result}
+  alias Wampex.Router.Authentication.{User, Realm}
+  alias Wampex.Router.Session
+
+  @procedures [
+    "admin.create_user",
+    "admin.create_realm"
+  ]
+
+  def start_link(name: name, db: db, realm: realm, proxy: proxy) do
+    GenServer.start_link(__MODULE__, {db, realm, proxy}, name: name)
+  end
+
+  def init({db, realm, proxy}) do
+    {:ok, %{db: db, realm: realm, proxy: proxy, regs: []}, {:continue, :ok}}
+  end
+
+  def handle_continue(:ok, %{db: db, realm: realm, regs: regs} = state) do
+    regs = register_procedures(db, realm, regs)
+    {:noreply, %{state | regs: regs}}
+  end
+
+  def register_procedures(db, realm, regs) do
+    Enum.reduce(@procedures, regs, fn proc, acc ->
+      val = {Session.get_global_id(), {self(), Node.self()}}
+      ClusterKV.put(db, realm, proc, val)
+      [{proc, val} | acc]
+    end)
+  end
+
+  def handle_info(
+        {req_id,
+         %Invocation{
+           arg_kw: %{"realm" => realm, "authid" => authid, "password" => password},
+           options: %{"procedure" => "admin.create_user"}
+         } = event, {pid, node}},
+        %{proxy: proxy} = state
+      ) do
+    realm = Realm.get(uri: realm)
+    %User{id: id} = User.create(authid: authid, password: password, realm: realm)
+    Logger.info("Admin handled event: #{inspect(event)}")
+    send({proxy, node}, {%Result{request_id: req_id, arg_list: [id]}, pid})
+    {:noreply, state}
+  end
+end
index 1ffafe6cf82443a340c3260b2a2dc5c1c91bc707..5ebf9705996bcbebe4fff7911724fe1533f08b77 100644 (file)
@@ -5,10 +5,9 @@ defmodule Wampex.Router.Authentication do
 
   alias Wampex.Crypto
   alias Wampex.Serializers.JSON
+  alias Wampex.Router.Authentication.{Realm, User}
 
   @wampcra "wampcra"
-  @salt_length 16
-  @key_length 32
   @auth_provider "userdb"
   @auth_role "user"
 
@@ -18,15 +17,15 @@ defmodule Wampex.Router.Authentication do
 
   def method, do: @wampcra
 
-  def challenge(authid, session_id) do
-    nonce = Crypto.random_string(@salt_length)
-    salt = Crypto.random_string(@salt_length)
+  def challenge(realm, authid, session_id) do
+    %Realm{} = realm = Realm.get(uri: realm)
+    %User{} = user = User.get(authid: authid, realm: realm)
     now = DateTime.to_iso8601(DateTime.utc_now())
 
     %{
       challenge:
         JSON.serialize!(%{
-          nonce: nonce,
+          nonce: Crypto.random_string(user.keylen),
           authprovider: @auth_provider,
           authid: authid,
           timestamp: now,
@@ -34,26 +33,26 @@ defmodule Wampex.Router.Authentication do
           authmethod: @wampcra,
           session: session_id
         }),
-      salt: salt,
-      keylen: @key_length,
-      iterations: Enum.random(10_000..22_000)
+      salt: user.salt,
+      keylen: user.keylen,
+      iterations: user.iterations
     }
   end
 
-  def authenticate(signature, authid, %{
-        challenge: challenge,
-        salt: salt,
-        keylen: keylen,
-        iterations: iters
+  def authenticate(signature, realm, authid, %{
+        challenge: challenge
       }) do
     authid
-    |> get_secret()
-    |> Crypto.pbkdf2(salt, iters, keylen)
+    |> get_secret(realm)
     |> Crypto.hash_challenge(challenge)
     |> :pbkdf2.compare_secure(signature)
   end
 
-  defp get_secret(_authid), do: "test1234"
+  defp get_secret(authid, uri) do
+    realm = Realm.get(uri: uri)
+    %User{password: password} = User.get(authid: authid, realm: realm)
+    password
+  end
 
   defp can_auth([]), do: false
   defp can_auth([@wampcra | _]), do: true
diff --git a/lib/router/authentication/ensure_default_admin.ex b/lib/router/authentication/ensure_default_admin.ex
new file mode 100644 (file)
index 0000000..9052191
--- /dev/null
@@ -0,0 +1,19 @@
+defmodule Wampex.Router.Authentication.EnsureDefaultAdmin do
+  @moduledoc false
+  require Logger
+  use GenServer
+
+  alias Wampex.Router.Authentication.{Realm, User}
+
+  def start_link(uri: uri, authid: authid, password: password) do
+    GenServer.start_link(__MODULE__, {uri, authid, password})
+  end
+
+  def init({uri, authid, password}) do
+    %Realm{} = realm = Realm.create(uri: uri)
+    %User{} = user = User.create(authid: authid, password: password, realm: realm)
+    Logger.info("Realm: #{inspect(realm)}")
+    Logger.info("User: #{inspect(user)}")
+    :ignore
+  end
+end
index 0c10f0b2e51ee8a8e73a711ae14eb6c6f3513988..10fa68bc340b82f0684029c7b63cd3bacdf7212f 100644 (file)
@@ -1,10 +1,41 @@
 defmodule Wampex.Router.Authentication.Realm do
   @moduledoc """
-  CREATE TABLE IF NOT EXISTS authentication.realms (uri STRING PRIMARY KEY, parent STRING);
+  CREATE TABLE authentication.realms (
+        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+        uri STRING(255) NOT NULL UNIQUE,
+        parent STRING NULL,
+        inserted_at TIMESTAMP NOT NULL,
+        updated_at TIMESTAMP NOT NULL
+  );
+
   """
   use Ecto.Schema
-  @primary_key {:uri, :string, autogenerate: false, read_after_writes: true}
+
+  alias __MODULE__
+  alias Wampex.Router.Authentication.Repo
+
+  @primary_key {:id, :binary_id, autogenerate: false, read_after_writes: true}
+  @foreign_key_type :binary_id
   schema "realms" do
+    field(:uri, :string)
     field(:parent, :string)
+    timestamps()
+  end
+
+  def get(uri: uri) do
+    Repo.get_by(Realm, uri: uri)
+  end
+
+  def create(uri: uri) do
+    try do
+      {:ok, r} =
+        %Realm{uri: uri, parent: uri}
+        |> Repo.insert()
+
+      r
+    rescue
+      _er ->
+        get(uri: uri)
+    end
   end
 end
diff --git a/lib/router/authentication/user.ex b/lib/router/authentication/user.ex
new file mode 100644 (file)
index 0000000..ea470af
--- /dev/null
@@ -0,0 +1,66 @@
+defmodule Wampex.Router.Authentication.User do
+  @moduledoc """
+  CREATE TABLE authentication.users (
+        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+        authid STRING(255) NOT NULL,
+        password STRING NOT NULL,
+        salt STRING NOT NULL,
+        iterations INT NOT NULL,
+        keylen INT NOT NULL,
+        realm_id UUID NOT NULL REFERENCES authentication.realms (id) ON DELETE CASCADE,
+        inserted_at TIMESTAMP NOT NULL,
+        updated_at TIMESTAMP NOT NULL,
+        UNIQUE (authid, realm_id),
+        INDEX (authid)
+  );
+
+  """
+
+  use Ecto.Schema
+  alias __MODULE__
+  alias Ecto.Migration
+  alias Wampex.Crypto
+  alias Wampex.Router.Authentication.{Realm, Repo}
+
+  @primary_key {:id, :binary_id, autogenerate: false, read_after_writes: true}
+  @foreign_key_type :binary_id
+  schema "users" do
+    field(:authid, :string)
+    field(:password, :string)
+    field(:salt, :string)
+    field(:iterations, :integer)
+    field(:keylen, :integer)
+    belongs_to(:realm, Realm)
+    timestamps()
+  end
+
+  def get(authid: authid, realm: realm) do
+    Repo.get_by(User, authid: authid, realm_id: realm.id)
+  end
+
+  def create(authid: authid, password: password, realm: realm) do
+    keylen = Application.get_env(:wampex, :keylen)
+    salt = Crypto.random_string(keylen)
+    iterations = Enum.random(10_000..50_000)
+
+    try do
+      password = Crypto.pbkdf2(password, salt, iterations, keylen)
+
+      {:ok, u} =
+        %User{
+          authid: authid,
+          password: password,
+          realm: realm,
+          salt: salt,
+          iterations: iterations,
+          keylen: keylen
+        }
+        |> Repo.insert()
+
+      u
+    rescue
+      _er ->
+        get(authid: authid, realm: realm)
+    end
+  end
+end
diff --git a/lib/router/ensure_default_realm.ex b/lib/router/ensure_default_realm.ex
deleted file mode 100644 (file)
index bc5e8a3..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-defmodule Wampex.Router.EnsureDefaultRealm do
-  @moduledoc false
-  require Logger
-  use GenServer
-
-  alias Wampex.Router.Authentication.{Realm, Repo}
-
-  def start_link(uri: uri) do
-    GenServer.start_link(__MODULE__, uri)
-  end
-
-  def init(uri) do
-    try do
-      %Realm{uri: uri}
-      |> Repo.insert()
-    rescue
-      er -> Logger.warn("#{inspect(er)}")
-    end
-
-    %Realm{uri: _uri} =
-      Realm
-      |> Ecto.Query.first()
-      |> Repo.one()
-
-    :ignore
-  end
-end
index 0727349b8da8160269aa91b8d735f0617cf8ad21..2cd7847049da51d0b977ed4eeeb11586bd3878a6 100644 (file)
@@ -8,7 +8,6 @@ defmodule Wampex.Router.Session do
 
   alias __MODULE__, as: Sess
   alias StatesLanguage, as: SL
-  alias Wampex.Realm
   alias Wampex.Roles.{Broker, Caller, Dealer, Peer}
   alias Wampex.Roles.Peer.{Abort, Challenge, Error, Welcome}
   alias Wampex.Router
@@ -67,7 +66,7 @@ defmodule Wampex.Router.Session do
           unsubscribe: Wampex.unsubscribe() | nil,
           publish: Wampex.publish() | nil,
           goodbye: binary() | nil,
-          realm: Realm.t(),
+          realm: String.t() | nil,
           name: module() | nil,
           challenge: map() | nil,
           roles: [module()],
@@ -114,6 +113,10 @@ defmodule Wampex.Router.Session do
   @handle_goodbye "HandleGoodbye"
   # @handle_cancel "HandleCancel"
 
+  def get_global_id do
+    Enum.random(0..@max_id)
+  end
+
   @impl true
   def handle_resource(
         @handle_init,
@@ -185,7 +188,7 @@ defmodule Wampex.Router.Session do
         %{"authid" => ai, "authmethods" => am} ->
           case auth.authenticate?(am) do
             true ->
-              ch = auth.challenge(ai, id)
+              ch = auth.challenge(realm, ai, id)
 
               chal = %Challenge{
                 auth_method: auth.method(),
@@ -233,6 +236,7 @@ defmodule Wampex.Router.Session do
             transport_pid: t,
             authentication: auth,
             challenge: challenge,
+            realm: realm,
             authenticate: {:authenticate, sig, _dets}
           }
         } = sl
@@ -244,7 +248,7 @@ defmodule Wampex.Router.Session do
     authprovider = get_in(ch, ["authprovider"])
 
     actions =
-      case auth.authenticate(sig, authid, challenge) do
+      case auth.authenticate(sig, realm, authid, challenge) do
         true ->
           send_to_peer(
             Peer.welcome(%Welcome{
@@ -445,7 +449,8 @@ defmodule Wampex.Router.Session do
       ) do
     id = get_global_id()
     ClusterKV.put(db, realm, procedure, {id, {self(), Node.self()}})
-    send_to_peer(Dealer.registered(%Registered{request_id: rid, registration_id: id}), tt, t)
+    regd = %Registered{request_id: rid, registration_id: id}
+    send_to_peer(Dealer.registered(regd), tt, t)
 
     {:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}},
      [{:next_event, :internal, :transition}]}
@@ -473,6 +478,7 @@ defmodule Wampex.Router.Session do
       with {_key, callees} <- ClusterKV.get(db, realm, proc, 500),
            {id, {pid, node}} <- get_live_callee(proxy, callees) do
         req_id = get_request_id(ri)
+        dets = Map.put(dets, "procedure", proc)
 
         send(
           {proxy, node},
@@ -723,17 +729,34 @@ defmodule Wampex.Router.Session do
     transport.send_request(pid, remove_nil_values(msg))
   end
 
-  defp get_global_id do
-    Enum.random(0..@max_id)
-  end
-
   defp maybe_update_response(data, {:update, key, resp}) do
     {%SL{data | data: Map.put(data.data, key, resp)}, resp}
   end
 
   defp maybe_update_response(data, resp), do: {data, resp}
 
-  defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1)
+  defp remove_nil_values(message) do
+    message =
+      case Enum.reverse(message) do
+        [map | t] when map == %{} -> t
+        [nil | t] -> t
+        m -> m
+      end
+
+    message =
+      case message do
+        [list | t] when list == [] ->
+          t
+
+        [nil | t] ->
+          t
+
+        m ->
+          m
+      end
+
+    Enum.reverse(message)
+  end
 
   defp get_request_id(current_id) when current_id == @max_id do
     1
diff --git a/lib/wampex/router/authentication/repo.ex b/lib/wampex/router/authentication/repo.ex
deleted file mode 100644 (file)
index 9fe8e40..0000000
+++ /dev/null
@@ -1,5 +0,0 @@
-defmodule Wampex.Router.Authentication.Repo do
-  use Ecto.Repo,
-    otp_app: :wampex,
-    adapter: Ecto.Adapters.Postgres
-end
diff --git a/priv/repo/migrations/20200319210508_create_realms.exs b/priv/repo/migrations/20200319210508_create_realms.exs
new file mode 100644 (file)
index 0000000..0ec59d6
--- /dev/null
@@ -0,0 +1,28 @@
+defmodule Wampex.Router.Authentication.Repo.Migrations.CreateRealms do
+  use Ecto.Migration
+
+  def change do
+    create table("realms", primary_key: false) do
+      add(:id, :binary_id, primary_key: true, default: fragment("gen_random_uuid()"))
+      add(:uri, :string, null: false)
+      add(:parent, :string, null: true)
+      timestamps()
+    end
+
+    create(unique_index(:realms, [:uri]))
+
+    create table("users", primary_key: false) do
+      add(:id, :binary_id, primary_key: true, default: fragment("gen_random_uuid()"))
+      add(:authid, :string, null: false)
+      add(:password, :string, null: false)
+      add(:salt, :string, null: false)
+      add(:iterations, :integer, null: false)
+      add(:keylen, :integer, null: false)
+      add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all))
+      timestamps()
+    end
+
+    create(index(:users, [:authid]))
+    create(unique_index(:users, [:authid, :realm_id]))
+  end
+end
index 24c4631834f80eeee29e1e9b2c4d10e4c20749c5..4e874f9e25fa4052ad709821944e110ec4dcef18 100644 (file)
@@ -18,8 +18,11 @@ defmodule WampexTest do
   require Logger
 
   @url "ws://localhost:4000/ws"
-  @auth %Authentication{authid: "entone", authmethods: ["wampcra"], secret: "test1234"}
-  @realm %Realm{name: "org.entropealabs.iot", authentication: @auth}
+  @authid "admin"
+  @auth_password "test1234"
+  @realm_uri "org.entropealabs.admin"
+  @auth %Authentication{authid: @authid, authmethods: ["wampcra"], secret: @auth_password}
+  @realm %Realm{name: @realm_uri, authentication: @auth}
   @roles [Callee, Caller, Publisher, Subscriber]
   @device "as987d9a8sd79a87ds"
 
@@ -33,7 +36,10 @@ defmodule WampexTest do
           port: 4000,
           topologies: topologies,
           replicas: 1,
-          quorum: 1
+          quorum: 1,
+          admin_realm: @realm_uri,
+          admin_authid: @authid,
+          admin_password: @auth_password
         )
     ]
   end
@@ -77,8 +83,8 @@ defmodule WampexTest do
 
   @tag :client
   test "Callee.yield" do
-    assert [70, 1234, %{}, nil, nil] = Callee.yield(%Yield{request_id: 1234})
-    assert [70, 1234, %{}, ["1"], nil] = Callee.yield(%Yield{request_id: 1234, arg_list: ["1"]})
+    assert [70, 1234, %{}, [], %{}] = Callee.yield(%Yield{request_id: 1234})
+    assert [70, 1234, %{}, ["1"], %{}] = Callee.yield(%Yield{request_id: 1234, arg_list: ["1"]})
 
     assert [70, 1234, %{test: 1}, [2], %{}] =
              Callee.yield(%Yield{
@@ -96,8 +102,8 @@ defmodule WampexTest do
 
   @tag :client
   test "Caller.call" do
-    assert [48, %{}, "test", nil, nil] = Caller.call(%Call{procedure: "test"})
-    assert [48, %{}, "test", [1], nil] = Caller.call(%Call{procedure: "test", arg_list: [1]})
+    assert [48, %{}, "test", [], %{}] = Caller.call(%Call{procedure: "test"})
+    assert [48, %{}, "test", [1], %{}] = Caller.call(%Call{procedure: "test", arg_list: [1]})
 
     assert [48, %{ok: 1}, "test", [1], %{test: 1}] =
              Caller.call(%Call{
@@ -137,8 +143,8 @@ defmodule WampexTest do
 
   @tag :client
   test "Publisher.publish" do
-    assert [16, %{}, "test", nil, nil] = Publisher.publish(%Publish{topic: "test"})
-    assert [16, %{}, "test", [1], nil] = Publisher.publish(%Publish{topic: "test", arg_list: [1]})
+    assert [16, %{}, "test", [], %{}] = Publisher.publish(%Publish{topic: "test"})
+    assert [16, %{}, "test", [1], %{}] = Publisher.publish(%Publish{topic: "test", arg_list: [1]})
 
     assert [16, %{test: 2}, "test", [1], %{test: 1}] =
              Publisher.publish(%Publish{
@@ -183,8 +189,8 @@ defmodule WampexTest do
 
   @tag :router
   test "Dealer.result" do
-    assert [50, 1234, %{}, nil, nil] = Dealer.result(%Result{request_id: 1234})
-    assert [50, 1234, %{}, ["1"], nil] = Dealer.result(%Result{request_id: 1234, arg_list: ["1"]})
+    assert [50, 1234, %{}, [], %{}] = Dealer.result(%Result{request_id: 1234})
+    assert [50, 1234, %{}, ["1"], %{}] = Dealer.result(%Result{request_id: 1234, arg_list: ["1"]})
 
     assert [50, 1234, %{test: 1}, [2], %{}] =
              Dealer.result(%Result{
@@ -197,10 +203,10 @@ defmodule WampexTest do
 
   @tag :router
   test "Dealer.invocation" do
-    assert [68, 1234, 1234, %{}, nil, nil] =
+    assert [68, 1234, 1234, %{}, [], %{}] =
              Dealer.invocation(%Invocation{request_id: 1234, registration_id: 1234})
 
-    assert [68, 1234, 1234, %{}, ["1"], nil] =
+    assert [68, 1234, 1234, %{}, ["1"], %{}] =
              Dealer.invocation(%Invocation{
                request_id: 1234,
                registration_id: 1234,
@@ -224,10 +230,10 @@ defmodule WampexTest do
 
   @tag :router
   test "Broker.event" do
-    assert [36, 123_456, 765_432, %{}, nil, nil] =
+    assert [36, 123_456, 765_432, %{}, [], %{}] =
              Broker.event(%Event{subscription_id: 123_456, publication_id: 765_432})
 
-    assert [36, 123_456, 765_432, %{}, [2], nil] =
+    assert [36, 123_456, 765_432, %{}, [2], %{}] =
              Broker.event(%Event{subscription_id: 123_456, publication_id: 765_432, arg_list: [2]})
 
     assert [36, 123_456, 765_432, %{}, [2], %{test: 1}] =
@@ -312,12 +318,12 @@ defmodule WampexTest do
 
   @tag :client
   test "caller receives error when calling unknown procedure" do
-    caller_name = TestCaller
+    caller_name = TestExistCaller
     Client.start_link(name: caller_name, session: @session)
 
     assert {:error, 48, "wamp.error.no_registration", %{"procedure" => "this.should.not.exist"},
-            nil,
-            nil} =
+            [],
+            %{}} =
              Client.send_request(
                caller_name,
                Caller.call(%Call{
@@ -328,6 +334,23 @@ defmodule WampexTest do
              )
   end
 
+  @tag :client
+  test "admin callee is invoked and responds and caller gets result" do
+    caller_name = TestAdminCaller
+    Client.start_link(name: caller_name, session: @session)
+
+    {:ok, %{}, [id], %{}} =
+      Client.send_request(
+        caller_name,
+        Caller.call(%Call{
+          procedure: "admin.create_user",
+          arg_kw: %{authid: "chris", password: "woot!", realm: @realm_uri}
+        })
+      )
+
+    assert is_binary(id)
+  end
+
   @tag :client
   test "callee is invoked and responds and caller gets result" do
     callee_name = TestCalleeRespond