}
end
+ defmodule Error do
+ @moduledoc false
+ @enforce_keys [:error]
+ defstruct [:request_id, :error, :arg_l, :arg_kw, details: %{}]
+
+ @type t :: %__MODULE__{
+ request_id: integer() | nil,
+ error: String.t(),
+ arg_l: list() | nil,
+ arg_kw: map() | nil,
+ details: map()
+ }
+ end
+
@impl true
def add(roles), do: roles
end
@impl true
- def handle([@error, type, id, _dets, error, arg_l, arg_kw]) do
- {[{:next_event, :internal, :established}], id, {:error, type, error, arg_l, arg_kw}}
+ def handle([@error, type, id, dets, error, arg_l, arg_kw]) do
+ {[{:next_event, :internal, :established}], id, {:error, type, error, dets, arg_l, arg_kw}}
end
end
alias __MODULE__, as: Sess
alias StatesLanguage, as: SL
alias Wampex.Realm
- alias Wampex.Roles.{Broker, Dealer, Peer}
- alias Wampex.Roles.Peer.{Challenge, Welcome, Abort}
+ alias Wampex.Roles.{Broker, Caller, Dealer, Peer}
+ alias Wampex.Roles.Peer.{Abort, Challenge, Error, Welcome}
alias Wampex.Router
alias Wampex.Serializers.JSON
alias Broker.{Subscribed, Published, Event}
%Sess{
db: db,
proxy: proxy,
- transport: _tt,
- transport_pid: _t,
+ transport: tt,
+ transport_pid: t,
realm: realm,
request_id: ri,
call: {:call, call_id, dets, proc, al, akw}
} = data
} = sl
) do
- {_key, callees} = ClusterKV.get(db, realm, proc, 500)
-
- {data, actions} =
- case get_live_callee(proxy, callees) do
- {id, {pid, node}} ->
- req_id = get_request_id(ri)
+ data =
+ with {_key, callees} <- ClusterKV.get(db, realm, proc, 500),
+ {id, {pid, node}} <- get_live_callee(proxy, callees) do
+ req_id = get_request_id(ri)
- send(
- {proxy, node},
+ send(
+ {proxy, node},
+ {
{
- {
- call_id,
- %Invocation{
- request_id: req_id,
- registration_id: id,
- options: dets,
- arg_list: al,
- arg_kw: akw
- },
- {self(), Node.self()}
+ call_id,
+ %Invocation{
+ request_id: req_id,
+ registration_id: id,
+ options: dets,
+ arg_list: al,
+ arg_kw: akw
},
- pid
- }
+ {self(), Node.self()}
+ },
+ pid
+ }
+ )
+
+ %SL{sl | data: %Sess{data | request_id: req_id}}
+ else
+ {:error, :no_live_callees} ->
+ send_to_peer(
+ Caller.call_error(%Error{
+ request_id: call_id,
+ error: "wamp.error.no_callees",
+ details: %{procedure: proc}
+ }),
+ tt,
+ t
)
- {%SL{sl | data: %Sess{data | request_id: req_id}},
- [{:next_event, :internal, :transition}]}
+ sl
- {:error, :no_live_callees} ->
- {%SL{sl | data: %Sess{data | error: "wamp.error.no_callees_registered"}},
- [{:next_event, :internal, :transition}, {:next_event, :internal, :error}]}
+ :not_found ->
+ send_to_peer(
+ Caller.call_error(%Error{
+ request_id: call_id,
+ error: "wamp.error.no_registration",
+ details: %{procedure: proc}
+ }),
+ tt,
+ t
+ )
+
+ sl
end
- {:ok, data, actions}
+ {:ok, data, [{:next_event, :internal, :transition}]}
end
@impl true
%SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} =
data
) do
- Logger.info("Handling yield #{id} - #{inspect(arg_kw)}")
-
- case Enum.find(inv, fn
- {_, %Invocation{request_id: ^id}, _} -> true
- _ -> false
- end) do
- {call_id, _, {pid, node}} ->
- send(
- {proxy, node},
- {%Result{request_id: call_id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid}
- )
+ {call_id, _, {pid, node}} =
+ Enum.find(inv, fn
+ {_, %Invocation{request_id: ^id}, _} -> true
+ _ -> false
+ end)
- nil ->
- Logger.error("No invocation in #{inspect(inv)} for #{id} #{inspect(arg_kw)}")
- :noop
- end
+ send(
+ {proxy, node},
+ {%Result{request_id: call_id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid}
+ )
inv =
Enum.filter(inv, fn
@impl true
def handle_info(%Result{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
- Logger.info("Handling Result #{inspect(e)}")
send_to_peer(Dealer.result(e), tt, t)
{:ok, data, []}
end