alias Wampex.Roles.Peer.{Challenge, Welcome}
alias Wampex.Router
alias __MODULE__, as: Sess
+ alias Broker.{Subscribed, Published, Event}
alias Dealer.{Invocation, Registered, Result, Unregistered}
@enforce_keys [:transport, :transport_pid]
:realm,
:name,
:goodbye,
+ :peer_information,
roles: [Peer, Broker, Dealer],
registrations: [],
subscriptions: [],
@handle_message,
_,
_,
- %SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data
+ %SL{data: %Sess{message: msg, roles: roles}} = data
) do
debug("Handling Message #{inspect(msg)}")
{actions, id, response} = handle_message(msg, roles)
- {%SL{data: sess} = data, response} = maybe_update_response(data, response)
- {requests, actions} = resp = handle_response(id, actions, requests, response)
- debug("Response: #{inspect(resp)}")
- {:ok, %SL{data: %Sess{sess | requests: requests}}, actions}
+ {data, response} = maybe_update_response(data, response)
+ debug("Response: #{inspect(response)}")
+ {:ok, data, actions}
end
@impl true
_,
@hello,
%SL{
- data: %Sess{id: id, transport: tt, transport_pid: t}
+ data:
+ %Sess{
+ id: id,
+ transport: tt,
+ transport_pid: t,
+ hello: {:hello, realm, dets}
+ } = data
} = sl
) do
# TODO check for authentication request
tt.send_request(t, Peer.welcome(%Welcome{session_id: id}))
+
+ {:ok, %SL{sl | data: %Sess{data | realm: realm, peer_information: dets}},
+ [{:next_event, :internal, :transition}]}
+ end
+
+ @impl true
+ def handle_resource(
+ @handle_subscribe,
+ _,
+ @subscribe,
+ %SL{
+ data:
+ %Sess{
+ transport: tt,
+ transport_pid: t,
+ subscriptions: subs,
+ realm: realm,
+ subscribe: {:subscribe, ri, opts, topic},
+ db: db
+ } = data
+ } = sl
+ ) do
+ id = get_global_id()
+ key = "#{realm}:#{topic}"
+
+ debug("Handling Subscription #{inspect(key)} #{inspect(opts)}")
+
+ case opts do
+ %{"match" => "wildcard"} ->
+ Logger.info("Putting wildcard")
+ ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "")
+
+ _ ->
+ Logger.info("Putting exact")
+ ClusterKV.put(db, key, {id, {self(), Node.self()}})
+ end
+
+ tt.send_request(t, Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}))
+
+ {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic} | subs]}},
+ [{:next_event, :internal, :transition}]}
+ end
+
+ @impl true
+ def handle_resource(
+ @handle_publish,
+ _,
+ @publish,
+ %SL{
+ data: %Sess{
+ proxy: proxy,
+ realm: realm,
+ transport: tt,
+ transport_pid: t,
+ db: db,
+ publish: {:publish, id, opts, topic, arg_l, arg_kw}
+ }
+ } = sl
+ ) do
+ pub_id = get_global_id()
+ key = "#{realm}:#{topic}"
+
+ {_, _, subs} =
+ {db, key, []}
+ |> get_subscribers()
+ |> get_prefix()
+ |> get_wildcard(realm)
+
+ subs = Enum.uniq(subs)
+ Logger.info("Subscribers: #{inspect(subs)}")
+
+ Enum.each(subs, fn {id, {pid, node}} ->
+ send(
+ {proxy, node},
+ {%Event{
+ subscription_id: id,
+ publication_id: pub_id,
+ arg_list: arg_l,
+ arg_kw: arg_kw,
+ options: opts
+ }, pid}
+ )
+ end)
+
{:ok, sl, [{:next_event, :internal, :transition}]}
end
+ @impl true
+ def handle_resource(
+ @handle_unregister,
+ _,
+ @unregister,
+ %SL{
+ data:
+ %Sess{
+ db: db,
+ registrations: regs,
+ unregister: {:unregister, request_id, registration_id}
+ } = data
+ } = sl
+ ) do
+ {id, proc} =
+ reg =
+ Enum.find(regs, fn
+ {^registration_id, proc} -> true
+ _ -> false
+ end)
+
+ filter = fn {id, values}, value ->
+ {id, List.delete(values, value)}
+ end
+
+ ClusterKV.update(db, proc, {id, {self(), Node.self()}}, filter)
+ {:ok, %SL{sl | data: %Sess{data | registrations: List.delete(regs, reg)}}}
+ end
+
@impl true
def handle_resource(
@handle_register,
transport_pid: t,
registrations: regs,
db: db,
+ realm: realm,
register: {:register, rid, opts, procedure}
} = data
} = sl
) do
- debug("Handling Registration #{inspect(procedure)}")
+ key = "#{realm}:#{procedure}"
+ debug("Handling Registration #{inspect(key)}")
id = get_global_id()
- ClusterKV.put(db, procedure, {id, {self(), Node.self()}})
+ ClusterKV.put(db, key, {id, {self(), Node.self()}})
tt.send_request(t, Dealer.registered(%Registered{request_id: rid, registration_id: id}))
{:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}},
proxy: proxy,
transport: tt,
transport_pid: t,
- call: {:call, ri, dets, proc, al, akw}
+ realm: realm,
+ request_id: ri,
+ call: {:call, call_id, dets, proc, al, akw}
} = data
} = sl
) do
- {key, [{id, {pid, node}} | _]} = ClusterKV.get(db, proc, 500)
+ key = "#{realm}:#{proc}"
+ {_key, callees} = ClusterKV.get(db, key, 500)
+
+ req_id = get_request_id(ri)
+
+ actions =
+ case get_live_callee(proxy, callees) do
+ {id, {pid, node}} ->
+ send(
+ {proxy, node},
+ {
+ {
+ call_id,
+ %Invocation{
+ request_id: req_id,
+ registration_id: id,
+ options: dets,
+ arg_list: al,
+ arg_kw: akw
+ },
+ {self(), Node.self()}
+ },
+ pid
+ }
+ )
+
+ [{:next_event, :internal, :transition}]
+
+ {:error, :no_live_callees} = er ->
+ [{:next_event, :internal, er}]
+ end
- Logger.info(
- "Sending Invocation to #{inspect(proxy)} at node #{inspect(node)} with pid #{inspect(pid)}"
- )
+ {:ok, %SL{sl | data: %Sess{data | request_id: req_id}}, actions}
+ end
- send(
- {proxy, node},
- {
- {
- %Invocation{
- request_id: ri,
- registration_id: id,
- options: dets,
- arg_list: al,
- arg_kw: akw
- },
- {self(), Node.self()}
- },
- pid
- }
- )
+ defp get_live_callee(proxy, []), do: {:error, :no_live_callees}
- {:ok, sl, [{:next_event, :internal, :transition}]}
+ defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do
+ case GenServer.call({proxy, node}, {:is_up, pid}) do
+ true -> c
+ false -> get_live_callee(proxy, t)
+ end
end
@impl true
%SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} =
data
) do
- {_, {pid, node}} =
+ {call_id, _, {pid, node}} =
Enum.find(inv, fn
- {%Invocation{request_id: ^id}, _} -> true
+ {_, %Invocation{request_id: ^id}, _} -> true
_ -> false
end)
send(
{proxy, node},
- {%Result{request_id: id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid}
+ {%Result{request_id: call_id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid}
)
inv =
Enum.filter(inv, fn
- {%Invocation{request_id: ^id}, _} -> false
+ {_, %Invocation{request_id: ^id}, _} -> false
_ -> false
end)
[{:next_event, :internal, :transition}]}
end
- @impl true
- def handle_resource(
- @handle_subscribe,
- _,
- @subscribe,
- %SL{
- data: %Sess{transport: tt, transport_pid: t, subscribe: s}
- } = sl
- ) do
- # TODO check for authentication request
- debug("Handling Subscription #{inspect(s)}")
- {:ok, sl, [{:next_event, :internal, :transition}]}
- end
-
@impl true
def handle_resource(@handle_abort, _, @abort, data) do
Logger.warn("Aborting: #{data.data.error}")
@goodbye,
%SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data
) do
- tt.send_message(t, Peer.goodbye(goodbye))
+ send_to_peer(Peer.goodbye(goodbye), tt, t)
{:ok, data, []}
end
@impl true
def handle_info(
- {%Invocation{} = e, from},
+ {call_id, %Invocation{} = e, from},
_,
%SL{data: %Sess{transport: tt, transport_pid: t, invocations: inv}} = data
) do
Logger.info("Received Invocation #{inspect(e)}")
- tt.send_request(t, Dealer.invocation(e))
- {:ok, %SL{data | data: %Sess{data.data | invocations: [{e, from} | inv]}}, []}
+ send_to_peer(Dealer.invocation(e), tt, t)
+ {:ok, %SL{data | data: %Sess{data.data | invocations: [{call_id, e, from} | inv]}}, []}
+ end
+
+ @impl true
+ def handle_info(%Event{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
+ send_to_peer(Broker.event(e), tt, t)
+ {:ok, data, []}
end
@impl true
def handle_info(%Result{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
- tt.send_request(t, Dealer.result(e))
+ send_to_peer(Dealer.result(e), tt, t)
{:ok, data, []}
end
{:ok, data, []}
end
- defp handle_response(nil, actions, requests, _), do: {requests, actions}
-
- defp handle_response(id, actions, requests, response) do
- Enum.reduce_while(requests, {requests, actions}, fn
- {^id, nil}, _ ->
- {:halt, {remove_request(id, requests), actions}}
+ @impl true
+ def handle_termination(reason, _, %SL{
+ data: %Sess{db: db, registrations: regs, subscriptions: subs}
+ }) do
+ filter = fn {id, values}, value ->
+ {id, List.delete(values, value)}
+ end
- {^id, from}, _ ->
- {:halt, {remove_request(id, requests), [get_response_action(from, response) | actions]}}
+ me = {self(), Node.self()}
- {_, _}, acc ->
- {:cont, acc}
+ Enum.each(regs, fn {id, proc} ->
+ ClusterKV.update(db, proc, {id, me}, filter)
end)
end
- defp get_global_id do
- Enum.random(0..@max_id)
+ defp get_subscribers({db, key, acc}) do
+ case ClusterKV.get(db, key, 500) do
+ {_, subscribers} -> {db, key, acc ++ subscribers}
+ _ -> {db, key, acc}
+ end
end
- defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do
- send(from, {:progress, arg_l, arg_kw})
- []
- end
+ defp get_prefix({db, key, acc}) do
+ case ClusterKV.prefix(db, key, ".", 2, 500) do
+ {_, subscribers} ->
+ {db, key, acc ++ subscribers}
- defp get_response_action(from, response), do: {:reply, from, response}
+ l when is_list(l) ->
+ {db, key, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)}
- defp maybe_update_response(data, {:update, key, resp}) do
- {%SL{data | data: Map.put(data.data, key, resp)}, resp}
+ _ ->
+ {db, key, acc}
+ end
end
- defp maybe_update_response(data, resp), do: {data, resp}
+ defp get_wildcard({db, key, acc}, realm) do
+ case ClusterKV.wildcard(db, realm, key, ".", ":", "") do
+ l when is_list(l) ->
+ {db, key, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)}
- defp do_send(r_id, tt, t, message) do
- {request_id, message} = maybe_inject_request_id(r_id, message)
- tt.send_request(t, remove_nil_values(message))
- request_id
+ _ ->
+ {db, key, acc}
+ end
end
- defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1)
-
- defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {r_id, message}
-
- defp maybe_inject_request_id(r_id, message) do
- request_id = get_request_id(r_id)
- {request_id, List.insert_at(message, 1, request_id)}
+ defp send_to_peer(msg, transport, pid) do
+ transport.send_request(pid, remove_nil_values(msg))
end
- defp get_request_id(current_id) when current_id == @max_id do
- 1
+ defp get_global_id do
+ Enum.random(0..@max_id)
end
- defp get_request_id(current_id) do
- current_id + 1
+ defp maybe_update_response(data, {:update, key, resp}) do
+ {%SL{data | data: Map.put(data.data, key, resp)}, resp}
end
- defp remove_request(id, requests) do
- Enum.filter(requests, fn
- {^id, _} -> false
- {_id, _} -> true
- end)
- end
+ defp maybe_update_response(data, resp), do: {data, resp}
- defp send_message_queue(r_id, [], _, _, reqs), do: {r_id, reqs}
+ defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1)
- defp send_message_queue(r_id, mq, tt, t, reqs) do
- mq
- |> Enum.reverse()
- |> Enum.reduce({r_id, reqs}, fn {request, from}, {id, requests} ->
- r = do_send(id, tt, t, request)
- {r, [{r, from} | requests]}
- end)
+ defp get_request_id(current_id) when current_id == @max_id do
+ 1
end
- def remove_cast_requests([]), do: []
-
- def remove_cast_requests(requests) do
- Enum.filter(requests, fn
- {_, nil} -> false
- {_, _} -> true
- end)
+ defp get_request_id(current_id) do
+ current_id + 1
end
defp handle_message(msg, roles) do