@register "Register"
@unregister "Unregister"
@subscribe "Subscribe"
- @unsubscribe "Unsubscribe"
+ # @unsubscribe "Unsubscribe"
@publish "Publish"
- @error "Error"
+ # @error "Error"
@abort "Abort"
@goodbye "Goodbye"
- @event "Cancel"
## Resources
@handle_init "HandleInit"
@handle_register "HandleRegister"
@handle_unregister "HandleUnregister"
@handle_subscribe "HandleSubscribe"
- @handle_unsubscribe "HandleUnsubscribe"
+ # @handle_unsubscribe "HandleUnsubscribe"
@handle_publish "HandlePublish"
- @handle_interrupt "HandleError"
+ # @handle_interrupt "HandleInterrupt"
@handle_abort "HandleAbort"
@handle_goodbye "HandleGoodbye"
- @handle_cancel "HandleCancel"
+ # @handle_cancel "HandleCancel"
@impl true
def handle_resource(
@authenticate,
%SL{
data: %Sess{
+ id: session_id,
transport: tt,
transport_pid: t,
authentication: auth,
challenge: challenge,
- authenticate: {:authenticate, sig, dets}
+ authenticate: {:authenticate, sig, _dets}
}
} = sl
) do
ch = JSON.deserialize!(challenge.challenge)
- session_id = get_in(ch, ["session"])
authid = get_in(ch, ["authid"])
authrole = get_in(ch, ["authrole"])
authmethod = get_in(ch, ["authmethod"])
} = sl
) do
id = get_global_id()
- key = "#{realm}:#{topic}"
-
- debug("Handling Subscription #{inspect(key)} #{inspect(opts)}")
case opts do
%{"match" => "wildcard"} ->
- Logger.info("Putting wildcard")
ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "")
_ ->
- Logger.info("Putting exact")
- ClusterKV.put(db, key, {id, {self(), Node.self()}})
+ ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}})
end
send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t)
} = sl
) do
pub_id = get_global_id()
- key = "#{realm}:#{topic}"
{_, _, subs} =
- {db, key, []}
+ {db, {realm, topic}, []}
|> get_subscribers()
|> get_prefix()
- |> get_wildcard(realm)
+ |> get_wildcard()
subs = Enum.uniq(subs)
- Logger.info("Subscribers: #{inspect(subs)}")
Enum.each(subs, fn {id, {pid, node}} ->
send(
transport: tt,
transport_pid: t,
registrations: regs,
+ realm: realm,
unregister: {:unregister, request_id, registration_id}
} = data
} = sl
) do
- regs = remove_registration(regs, registration_id, db)
+ regs = remove_registration(regs, realm, registration_id, db)
send_to_peer(Dealer.unregistered(%Unregistered{request_id: request_id}), tt, t)
{:ok, %SL{sl | data: %Sess{data | registrations: regs}}}
registrations: regs,
db: db,
realm: realm,
- register: {:register, rid, opts, procedure}
+ register: {:register, rid, _opts, procedure}
} = data
} = sl
) do
- key = "#{realm}:#{procedure}"
- debug("Handling Registration #{inspect(key)}")
id = get_global_id()
- ClusterKV.put(db, key, {id, {self(), Node.self()}})
+ ClusterKV.put(db, realm, procedure, {id, {self(), Node.self()}})
send_to_peer(Dealer.registered(%Registered{request_id: rid, registration_id: id}), tt, t)
{:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}},
%Sess{
db: db,
proxy: proxy,
- transport: tt,
- transport_pid: t,
+ transport: _tt,
+ transport_pid: _t,
realm: realm,
request_id: ri,
call: {:call, call_id, dets, proc, al, akw}
} = data
} = sl
) do
- key = "#{realm}:#{proc}"
- {_key, callees} = ClusterKV.get(db, key, 500)
+ {_key, callees} = ClusterKV.get(db, realm, proc, 500)
{data, actions} =
case get_live_callee(proxy, callees) do
_,
%SL{data: %Sess{transport: tt, transport_pid: t, invocations: inv}} = data
) do
- Logger.info("Received Invocation #{inspect(e)}")
send_to_peer(Dealer.invocation(e), tt, t)
{:ok, %SL{data | data: %Sess{data.data | invocations: [{call_id, e, from} | inv]}}, []}
end
@impl true
def handle_info(event, state, data) do
- Logger.info("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}")
+ Logger.warn("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}")
{:ok, data, []}
end
transport: tt,
transport_pid: t,
registrations: regs,
- subscriptions: subs
+ realm: realm,
+ subscriptions: _subs
}
}) do
- Enum.each(regs, fn {id, proc} ->
- remove_registration(regs, id, db)
+ Logger.warn("Router session terminating #{inspect(reason)}")
+
+ Enum.each(regs, fn {id, _proc} ->
+ remove_registration(regs, realm, id, db)
end)
send_to_peer(
)
end
- defp remove_registration(regs, registration_id, db) do
+ defp remove_registration(regs, realm, registration_id, db) do
{id, proc} =
reg =
Enum.find(regs, fn
- {^registration_id, proc} -> true
+ {^registration_id, _proc} -> true
_ -> false
end)
{id, List.delete(values, value)}
end
- ClusterKV.update(db, proc, {id, {self(), Node.self()}}, filter)
+ ClusterKV.update(db, realm, proc, {id, {self(), Node.self()}}, filter)
List.delete(regs, reg)
end
- defp get_live_callee(proxy, []), do: {:error, :no_live_callees}
+ defp get_live_callee(_proxy, []), do: {:error, :no_live_callees}
defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do
case GenServer.call({proxy, node}, {:is_up, pid}) do
end
end
- defp get_subscribers({db, key, acc}) do
- case ClusterKV.get(db, key, 500) do
- {_, subscribers} -> {db, key, acc ++ subscribers}
- _ -> {db, key, acc}
+ defp get_subscribers({db, {keyspace, key}, acc}) do
+ case ClusterKV.get(db, keyspace, key, 500) do
+ {_, subscribers} -> {db, {keyspace, key}, acc ++ subscribers}
+ _ -> {db, {keyspace, key}, acc}
end
end
- defp get_prefix({db, key, acc}) do
- case ClusterKV.prefix(db, key, ".", 2, 500) do
- {_, subscribers} ->
- {db, key, acc ++ subscribers}
-
- l when is_list(l) ->
- {db, key, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)}
-
- _ ->
- {db, key, acc}
- end
+ defp get_prefix({db, {keyspace, key}, acc}) do
+ l = ClusterKV.prefix(db, keyspace, key, ".", 2, 500)
+ {db, {keyspace, key}, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)}
end
- defp get_wildcard({db, key, acc}, realm) do
- case ClusterKV.wildcard(db, realm, key, ".", ":", "") do
- l when is_list(l) ->
- {db, key, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)}
-
- _ ->
- {db, key, acc}
- end
+ defp get_wildcard({db, {keyspace, key}, acc}) do
+ l = ClusterKV.wildcard(db, keyspace, key, ".", ":", "")
+ {db, {keyspace, key}, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)}
end
defp send_to_peer(msg, transport, pid) do