--- /dev/null
+defmodule Wampex.Router.Realms.Session do
+ @moduledoc false
+
+ @max_id 9_007_199_254_740_992
+
+ def register(db, realm, procedure, val) do
+ ClusterKV.put(db, realm, procedure, val)
+ ClusterKV.put(db, realm, "#{procedure}:callee_index", 0)
+ end
+
+ def unregister(db, realm, {id, _from} = value, regs) do
+ {_id, proc} = reg = get_registration(id, regs)
+ ClusterKV.update(db, realm, proc, value, &upsert_remove/2)
+ List.delete(regs, reg)
+ end
+
+ def callees(db, realm, procedure) do
+ with {_key, callees} <- ClusterKV.get(db, realm, procedure, 500),
+ {_key, [callee_index]} <- ClusterKV.get(db, realm, "#{procedure}:callee_index") do
+ {callees, callee_index}
+ end
+ end
+
+ def round_robin(db, realm, procedure, len) do
+ ClusterKV.update(db, realm, "#{procedure}:callee_index", len, &update_round_robin/2)
+ end
+
+ def subscribe(db, realm, topic, val, %{"match" => "wildcard"}) do
+ put_wildcard(db, realm, topic, val)
+ true
+ end
+
+ def subscribe(db, realm, topic, val, _) do
+ ClusterKV.put(db, realm, topic, val)
+ false
+ end
+
+ def unsubscribe(db, realm, {id, _from} = value, subscriptions) do
+ {_id, topic, wc} = sub = get_subscription(id, subscriptions)
+
+ {key, filter} =
+ case wc do
+ true ->
+ key = get_wildcard_key(topic)
+ {key, &upsert_remove/2}
+
+ false ->
+ {topic, &upsert_remove/2}
+ end
+
+ ClusterKV.update(db, realm, key, value, filter)
+ remove_val(sub, subscriptions)
+ end
+
+ def subscriptions(db, realm, topic) do
+ {_, _, subs} =
+ {db, {realm, topic}, []}
+ |> get_subscribers()
+ |> get_prefix()
+ |> get_wildcard()
+
+ Enum.uniq(subs)
+ end
+
+ def get_id do
+ Enum.random(0..@max_id)
+ end
+
+ def get_request_id(current_id) when current_id == @max_id do
+ 1
+ end
+
+ def get_request_id(current_id) do
+ current_id + 1
+ end
+
+ defp update_round_robin({id, [values]}, value) do
+ case values + 1 do
+ ni when ni < value -> {id, [ni]}
+ _ -> {id, [0]}
+ end
+ end
+
+ defp get_subscribers({db, {keyspace, key}, acc}) do
+ case ClusterKV.get(db, keyspace, key, 500) do
+ {_, subscribers} -> {db, {keyspace, key}, acc ++ subscribers}
+ _ -> {db, {keyspace, key}, acc}
+ end
+ end
+
+ defp get_prefix({db, {keyspace, key}, acc}) do
+ l = ClusterKV.prefix(db, keyspace, key, ".", 2, 500)
+ {db, {keyspace, key}, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)}
+ end
+
+ defp get_wildcard({db, {keyspace, key}, acc}) do
+ l = ClusterKV.wildcard(db, keyspace, key, ".", ":", "")
+ {db, {keyspace, key}, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)}
+ end
+
+ defp get_registration(id, regs) do
+ Enum.find(regs, fn
+ {^id, _proc} -> true
+ _ -> false
+ end)
+ end
+
+ defp get_subscription(id, subscriptions) do
+ Enum.find(subscriptions, fn
+ {^id, _topic, _} -> true
+ _ -> false
+ end)
+ end
+
+ defp remove_val(val, values) do
+ List.delete(values, val)
+ end
+
+ defp upsert_remove({id, values}, value), do: {id, remove_val(value, values)}
+
+ defp put_wildcard(db, realm, topic, val), do: ClusterKV.put_wildcard(db, realm, topic, val, ".", ":", "")
+ defp get_wildcard_key(topic), do: ClusterKV.get_wildcard_key(topic, ".", ":", "")
+end
"""
use StatesLanguage, data: "priv/router.json"
- @max_id 9_007_199_254_740_992
-
alias __MODULE__, as: Sess
alias StatesLanguage, as: SL
alias Wampex.Roles.{Broker, Caller, Dealer, Peer}
alias Broker.{Subscribed, Published, Event, Unsubscribed}
alias Dealer.{Invocation, Registered, Result, Unregistered}
alias Router.{Authentication, Realms}
+ alias Realms.Session, as: RealmSession
@enforce_keys [:transport, :transport_pid]
defstruct [
# @handle_interrupt "HandleInterrupt"
@handle_abort "HandleAbort"
@handle_goodbye "HandleGoodbye"
- # @handle_cancel "HandleCancel"
-
- def get_global_id do
- Enum.random(0..@max_id)
- end
+ # @handle_cancel "HandleCancel"
@impl true
def handle_resource(
) do
debug("Init")
- {:ok, %SL{data | data: %Sess{data.data | id: get_global_id()}}, [{:next_event, :internal, :transition}]}
+ {:ok, %SL{data | data: %Sess{data.data | id: RealmSession.get_id()}}, [{:next_event, :internal, :transition}]}
end
@impl true
} = data
} = sl
) do
- id = get_global_id()
+ id = RealmSession.get_id()
- wc =
- case opts do
- %{"match" => "wildcard"} ->
- ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "")
- true
-
- _ ->
- ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}})
- false
- end
+ wc = RealmSession.subscribe(db, realm, topic, {id, {self(), Node.self()}}, opts)
send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t)
}
} = sl
) do
- subs = remove_subscription(subs, subscription_id, realm, db)
+ subs = RealmSession.unsubscribe(db, realm, {subscription_id, {self(), Node.self()}}, subs)
send_to_peer(Broker.unsubscribed(%Unsubscribed{request_id: rid}), tt, t)
}
} = sl
) do
- pub_id = get_global_id()
+ pub_id = RealmSession.get_id()
- {_, _, subs} =
- {db, {realm, topic}, []}
- |> get_subscribers()
- |> get_prefix()
- |> get_wildcard()
-
- subs = Enum.uniq(subs)
+ subs = RealmSession.subscriptions(db, realm, topic)
Enum.each(subs, fn {id, {pid, node}} ->
send(
} = data
} = sl
) do
- regs = remove_registration(regs, realm, registration_id, db)
+ regs = RealmSession.unregister(db, realm, {registration_id, {self(), Node.self()}}, regs)
send_to_peer(Dealer.unregistered(%Unregistered{request_id: request_id}), tt, t)
{:ok, %SL{sl | data: %Sess{data | registrations: regs}}}
} = data
} = sl
) do
- id = get_global_id()
- ClusterKV.put(db, realm, procedure, {id, {self(), Node.self()}})
+ id = RealmSession.get_id()
+ RealmSession.register(db, realm, procedure, {id, {self(), Node.self()}})
regd = %Registered{request_id: rid, registration_id: id}
send_to_peer(Dealer.registered(regd), tt, t)
} = sl
) do
data =
- with {_key, callees} <- ClusterKV.get(db, realm, proc, 500),
- {id, {pid, node}} <- get_live_callee(proxy, callees) do
- req_id = get_request_id(ri)
+ with {callees, index} <- RealmSession.callees(db, realm, proc),
+ {id, {pid, node}} <- get_live_callee(proxy, callees, index, 3) do
+ req_id = RealmSession.get_request_id(ri)
dets = Map.put(dets, "procedure", proc)
send(
}
)
+ RealmSession.round_robin(db, realm, proc, length(callees))
+
%SL{sl | data: %Sess{data | request_id: req_id}}
else
{:error, :no_live_callees} ->
{:ok, %SL{data | data: %Sess{sess | message: message}}, [{:next_event, :internal, :message_received}]}
end
+ @impl true
+ def handle_info({:EXIT, t, reason}, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
+ debug("Transport #{tt} exited because #{inspect(reason)}: Shutting down")
+ {:ok, data, [{:next_event, :internal, :abort}]}
+ end
+
@impl true
def handle_info(event, state, data) do
Logger.warn("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}")
subscriptions: subs
}
}) do
- Logger.warn("Router session terminating #{inspect(reason)}")
+ Logger.debug("Router session terminating #{inspect(reason)}")
Enum.each(regs, fn {id, _proc} ->
- remove_registration(regs, realm, id, db)
+ RealmSession.unregister(db, realm, {id, {self(), Node.self()}}, regs)
end)
Enum.each(subs, fn {id, _proc, _} ->
- remove_subscription(subs, id, realm, db)
+ RealmSession.unsubscribe(db, realm, {id, {self(), Node.self()}}, subs)
end)
send_to_peer(
end
end
- defp remove_subscription(subs, subscription_id, realm, db) do
- {id, topic, wc} =
- sub =
- Enum.find(subs, fn
- {^subscription_id, _topic, _} -> true
- _ -> false
- end)
-
- {key, filter} =
- case wc do
- true ->
- key = ClusterKV.get_wildcard_key(topic, ".", ":", "")
-
- {key,
- fn {id, values}, value ->
- {id,
- Enum.filter(values, fn
- {_, ^value} -> false
- _ -> true
- end)}
- end}
-
- false ->
- {topic,
- fn {id, values}, value ->
- {id, List.delete(values, value)}
- end}
- end
-
- ClusterKV.update(db, realm, key, {id, {self(), Node.self()}}, filter)
- List.delete(subs, sub)
- end
-
- defp remove_registration(regs, realm, registration_id, db) do
- {id, proc} =
- reg =
- Enum.find(regs, fn
- {^registration_id, _proc} -> true
- _ -> false
- end)
-
- filter = fn {id, values}, value ->
- {id, List.delete(values, value)}
- end
-
- ClusterKV.update(db, realm, proc, {id, {self(), Node.self()}}, filter)
- List.delete(regs, reg)
- end
+ defp get_live_callee(_proxy, [], _index, 0), do: {:error, :no_live_callees}
- defp get_live_callee(_proxy, []), do: {:error, :no_live_callees}
+ defp get_live_callee(proxy, callees, index, tries) do
+ {_id, {pid, node}} = c = Enum.at(callees, index)
- defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do
case GenServer.call({proxy, node}, {:is_up, pid}) do
- true -> c
- false -> get_live_callee(proxy, t)
- end
- end
-
- defp get_subscribers({db, {keyspace, key}, acc}) do
- case ClusterKV.get(db, keyspace, key, 500) do
- {_, subscribers} -> {db, {keyspace, key}, acc ++ subscribers}
- _ -> {db, {keyspace, key}, acc}
- end
- end
+ true ->
+ c
- defp get_prefix({db, {keyspace, key}, acc}) do
- l = ClusterKV.prefix(db, keyspace, key, ".", 2, 500)
- {db, {keyspace, key}, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)}
- end
+ false ->
+ index =
+ case index + 1 do
+ ni when ni < length(callees) -> ni
+ _ -> 0
+ end
- defp get_wildcard({db, {keyspace, key}, acc}) do
- l = ClusterKV.wildcard(db, keyspace, key, ".", ":", "")
- {db, {keyspace, key}, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)}
+ get_live_callee(proxy, callees, index, tries - 1)
+ end
end
defp send_to_peer(msg, transport, pid) do
Enum.reverse(message)
end
- defp get_request_id(current_id) when current_id == @max_id do
- 1
- end
-
- defp get_request_id(current_id) do
- current_id + 1
- end
-
defp handle_message(msg, roles) do
Enum.reduce_while(roles, nil, fn r, _ ->
try do