Handles requests and responses for a Broker
"""
alias Wampex.Role
+ alias Wampex.Roles.Peer.Error
alias Wampex.Roles.Publisher.Publish
alias Wampex.Roles.Subscriber.{Subscribe, Unsubscribe}
@behaviour Role
+ @error 8
@publish 16
@published 17
@subscribe 32
[@subscribed, ri, si]
end
+ @spec subscribe_error(Error.t()) :: Wampex.message()
+ def subscribe_error(%Error{request_id: rid, error: er, details: dets}) do
+ [@error, @subscribe, rid, dets, er]
+ end
+
@spec unsubscribed(Unsubscribed.t()) :: Wampex.message()
def unsubscribed(%Unsubscribed{request_id: ri}) do
[@unsubscribed, ri]
alias Wampex.Role
alias Wampex.Roles.Dealer.{Invocation}
+ alias Wampex.Roles.Peer.Error
@behaviour Role
+ @error 8
@register 64
@registered 65
@unregister 66
[@yield, ri, opts, al, akw]
end
+ @spec invocation_error(Error.t()) :: Wampex.message()
+ def invocation_error(%Error{
+ request_id: rid,
+ error: er,
+ details: dets,
+ arg_list: al,
+ arg_kw: akw
+ }) do
+ [@error, @invocation, rid, dets, er, al, akw]
+ end
+
@impl true
def handle(<<@unregistered, request_id>>) do
{[{:next_event, :internal, :established}], request_id, {:ok, request_id}}
alias Wampex.Role
alias Wampex.Roles.Dealer.Result
- alias Wampex.Roles.Peer.Error
@behaviour Role
@call 48
@cancel 49
- @error 8
@result 50
defmodule Call do
[@call, opts, p, al, akw]
end
- @spec call_error(Error.t()) :: Wampex.message()
- def call_error(%Error{request_id: rid, error: er, details: dets, arg_list: al, arg_kw: akw}) do
- [@error, @call, rid, dets, er, al, akw]
- end
-
@spec cancel(Cancel.t()) :: Wampex.message()
def cancel(%Cancel{request_id: ri, options: opts}) do
[@cancel, ri, opts]
alias Wampex.Role
alias Wampex.Roles.Callee.{Register, Unregister, Yield}
alias Wampex.Roles.Caller.Call
+ alias Wampex.Roles.Peer.Error
@behaviour Role
+ @error 8
@call 48
@result 50
@register 64
[@invocation, ri, reg_id, opts, al, akw]
end
+ @spec call_error(Error.t()) :: Wampex.message()
+ def call_error(%Error{request_id: rid, error: er, details: dets, arg_list: al, arg_kw: akw}) do
+ [@error, @call, rid, dets, er, al, akw]
+ end
+
+ @spec register_error(Error.t()) :: Wampex.message()
+ def register_error(%Error{request_id: rid, error: er, details: dets}) do
+ [@error, @register, rid, dets, er]
+ end
+
@spec result(Result.t()) :: Wampex.message()
def result(%Result{
request_id: ri,
@authenticate 5
@goodbye 6
@error 8
+ @invocation 68
defmodule Hello do
@moduledoc false
handle([@error, type, id, dets, error, arg_l, %{}])
end
+ @impl true
+ def handle([@error, @invocation, id, dets, err, arg_l, arg_kw]) do
+ {[{:next_event, :internal, :invocation_error}], id,
+ {:update, :error,
+ %Error{
+ type: @invocation,
+ request_id: id,
+ error: err,
+ details: dets,
+ arg_list: arg_l,
+ arg_kw: arg_kw
+ }}}
+ end
+
@impl true
def handle([@error, type, id, dets, error, arg_l, arg_kw]) do
{[{:next_event, :internal, :established}], id,
+++ /dev/null
-{
- "Comment": "Session State Machine",
- "StartAt": "WaitForTransport",
- "States": {
- "WaitForTransport": {
- "Type": "Task",
- "Resource": "InitTransport",
- "TransitionEvent": "{:connected, true}",
- "Next": "Init",
- "Catch": [
- {
- "ErrorEquals": ["{:connected, false}"],
- "Next": "Abort"
- }
- ]
- },
- "Init": {
- "Type": "Task",
- "Resource": "Hello",
- "TransitionEvent": ":hello_sent",
- "Next": "Handshake",
- "Catch": [
- {
- "ErrorEquals": [":abort"],
- "Next": "Abort"
- }
-
- ]
- },
- "Handshake": {
- "Type": "Choice",
- "Resource": "HandleHandshake",
- "Choices": [
- {
- "StringEquals": ":message_received",
- "Next": "Message"
- },
- {
- "StringEquals": ":abort",
- "Next": "Abort"
- }
- ]
- },
- "Established": {
- "Type": "Choice",
- "Resource": "HandleEstablished",
- "Choices": [
- {
- "StringEquals": ":message_received",
- "Next": "Message"
- },
- {
- "StringEquals": ":goodbye",
- "Next": "GoodBye"
- },
- {
- "StringEquals": ":abort",
- "Next": "Abort"
- }
- ]
- },
- "Message": {
- "Type": "Choice",
- "Resource": "HandleMessage",
- "Choices": [
- {
- "StringEquals": ":established",
- "Next": "Established"
- },
- {
- "StringEquals": ":event",
- "Next": "Event"
- },
- {
- "StringEquals": ":invocation",
- "Next": "Invocation"
- },
- {
- "StringEquals": ":abort",
- "Next": "Abort"
- },
- {
- "StringEquals": ":goodbye",
- "Next": "GoodBye"
- },
- {
- "StringEquals": ":challenge",
- "Next": "Challenge"
- },
- {
- "StringEquals": ":interrupt",
- "Next": "Interrupt"
- }
- ]
- },
- "Interrupt": {
- "Type": "Task",
- "Resource": "HandleInterrupt",
- "Next": "Established"
- },
- "Challenge": {
- "Type": "Task",
- "Resource": "HandleChallenge",
- "Next": "Handshake",
- "TransitionEvent": ":challenged"
- },
- "Event": {
- "Type": "Task",
- "Resource": "HandleEvent",
- "Next": "Established"
- },
- "Invocation": {
- "Type": "Task",
- "Resource": "HandleInvocation",
- "Next": "Established"
- },
- "GoodBye": {
- "Type": "Task",
- "Resource": "GoodBye",
- "End": true
- },
- "Abort": {
- "Type": "Task",
- "Resource": "HandleAbort",
- "End": true
- }
- }
-}
+++ /dev/null
-defmodule Wampex.Router.Authentication.Repo.Migrations.CreateRealms do
- use Ecto.Migration
-
- def change do
- create table("realms", primary_key: false) do
- add(:id, :binary_id, primary_key: true, default: fragment("gen_random_uuid()"))
- add(:uri, :string, null: false)
- add(:parent, :string, null: true)
- timestamps()
- end
-
- create(unique_index(:realms, [:uri]))
-
- create table("users", primary_key: false) do
- add(:id, :binary_id, primary_key: true, default: fragment("gen_random_uuid()"))
- add(:authid, :string, null: false)
- add(:password, :string, null: false)
- add(:salt, :string, null: false)
- add(:iterations, :integer, null: false)
- add(:keylen, :integer, null: false)
- add(:realm_id, references(:realms, type: :binary_id, on_delete: :delete_all), null: false)
- timestamps()
- end
-
- create(index(:users, [:authid]))
- create(unique_index(:users, [:authid, :realm_id]))
- end
-end
+++ /dev/null
-{
- "Comment": "Router Session State Machine",
- "StartAt": "Init",
- "States": {
- "Init": {
- "Type": "Task",
- "Resource": "HandleInit",
- "Next": "Established"
- },
- "Established": {
- "Type": "Choice",
- "Resource": "HandleEstablished",
- "Choices": [
- {
- "StringEquals": ":message_received",
- "Next": "Message"
- },
- {
- "StringEquals": ":goodbye",
- "Next": "GoodBye"
- },
- {
- "StringEquals": ":abort",
- "Next": "Abort"
- }
- ]
- },
- "Message": {
- "Type": "Choice",
- "Resource": "HandleMessage",
- "Choices": [
- {
- "StringEquals": ":hello",
- "Next": "Hello"
- },
- {
- "StringEquals": ":authenticate",
- "Next": "Authenticate"
- },
- {
- "StringEquals": ":call",
- "Next": "Call"
- },
- {
- "StringEquals": ":yield",
- "Next": "Yield"
- },
- {
- "StringEquals": ":register",
- "Next": "Register"
- },
- {
- "StringEquals": ":unregister",
- "Next": "Unregister"
- },
- {
- "StringEquals": ":subscribe",
- "Next": "Subscribe"
- },
- {
- "StringEquals": ":unsubscribe",
- "Next": "Unsubscribe"
- },
- {
- "StringEquals": ":publish",
- "Next": "Publish"
- },
- {
- "StringEquals": ":error",
- "Next": "Error"
- },
- {
- "StringEquals": ":abort",
- "Next": "Abort"
- },
- {
- "StringEquals": ":goodbye",
- "Next": "GoodBye"
- },
- {
- "StringEquals": ":cancel",
- "Next": "Cancel"
- }
- ]
- },
- "Hello": {
- "Type": "Task",
- "Resource": "HandleHello",
- "Next": "Established"
- },
- "Authenticate": {
- "Type": "Task",
- "Resource": "HandleAuthenticate",
- "Next": "Established"
- },
- "Call": {
- "Type": "Task",
- "Resource": "HandleCall",
- "Next": "Established",
- "Catch": [
- {
- "ErrorEquals": ["{:error, :no_live_callees}"],
- "Next": "Error"
- }
- ]
- },
- "Yield": {
- "Type": "Task",
- "Resource": "HandleYield",
- "Next": "Established"
- },
- "Register":{
- "Type": "Task",
- "Resource": "HandleRegister",
- "Next": "Established"
- },
- "Unregister":{
- "Type": "Task",
- "Resource": "HandleUnregister",
- "Next": "Established"
- },
- "Subscribe":{
- "Type": "Task",
- "Resource": "HandleSubscribe",
- "Next": "Established"
- },
- "Unsubscribe":{
- "Type": "Task",
- "Resource": "HandleUnsubscribe",
- "Next": "Established"
- },
- "Publish":{
- "Type": "Task",
- "Resource": "HandlePublish",
- "Next": "Established"
- },
- "Error":{
- "Type": "Task",
- "Resource": "HandleError",
- "Next": "Established"
- },
- "GoodBye": {
- "Type": "Task",
- "Resource": "HandleGoodbye",
- "End": true
- },
- "Abort": {
- "Type": "Task",
- "Resource": "HandleAbort",
- "End": true
- }
- }
-}
alias Callee.{Register, Unregister, Yield}
alias Caller.Call
alias Dealer.{Invocation, Registered, Result, Unregistered}
- alias Peer.{Authenticate, Goodbye, Hello}
+ alias Peer.{Authenticate, Error, Goodbye, Hello}
alias Publisher.Publish
alias Subscriber.{Subscribe, Unsubscribe}
require Logger
assert %{callee: %{}} = Callee.add(%{})
end
+ test "Callee.invocation_error" do
+ assert [8, 68, 1234, %{}, "error", [], %{}] =
+ Callee.invocation_error(%Error{request_id: 1234, error: "error"})
+ end
+
test "Callee.register" do
assert [64, %{}, "test"] = Callee.register(%Register{procedure: "test"})
end
assert [67, 123_456] = Dealer.unregistered(%Unregistered{request_id: 123_456})
end
+ test "Dealer.call_error" do
+ assert [8, 48, 123_456, %{}, "error", [], %{}] =
+ Dealer.call_error(%Error{request_id: 123_456, details: %{}, error: "error"})
+ end
+
+ test "Dealer.register_error" do
+ assert [8, 64, 123_456, %{}, "error"] =
+ Dealer.register_error(%Error{request_id: 123_456, details: %{}, error: "error"})
+ end
+
test "Dealer.result" do
assert [50, 1234, %{}, [], %{}] = Dealer.result(%Result{request_id: 1234})
assert [50, 1234, %{}, ["1"], %{}] = Dealer.result(%Result{request_id: 1234, arg_list: ["1"]})
assert [35, 123_456] = Broker.unsubscribed(%Unsubscribed{request_id: 123_456})
end
+ test "Broker.subscribe_error" do
+ assert [8, 32, 123_456, %{}, "error"] =
+ Broker.subscribe_error(%Error{request_id: 123_456, details: %{}, error: "error"})
+ end
+
test "Broker.published" do
assert [17, 123_456, 654_321] =
Broker.published(%Published{request_id: 123_456, publication_id: 654_321})