From: Christopher Date: Tue, 17 Mar 2020 17:48:17 +0000 (-0500) Subject: use keyspace for ClusterKV X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=2f7eda892d13600b6eae9304b5973d6e0f464a65;p=wampex_client.git use keyspace for ClusterKV --- diff --git a/lib/roles/peer.ex b/lib/roles/peer.ex index 13c581e..8bb7911 100644 --- a/lib/roles/peer.ex +++ b/lib/roles/peer.ex @@ -33,7 +33,7 @@ defmodule Wampex.Roles.Peer do @type t :: %__MODULE__{ session_id: integer(), - options: %{} + options: map() } end diff --git a/lib/router/session.ex b/lib/router/session.ex index db54c40..f053c7a 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -90,12 +90,11 @@ defmodule Wampex.Router.Session do @register "Register" @unregister "Unregister" @subscribe "Subscribe" - @unsubscribe "Unsubscribe" + # @unsubscribe "Unsubscribe" @publish "Publish" - @error "Error" + # @error "Error" @abort "Abort" @goodbye "Goodbye" - @event "Cancel" ## Resources @handle_init "HandleInit" @@ -108,12 +107,12 @@ defmodule Wampex.Router.Session do @handle_register "HandleRegister" @handle_unregister "HandleUnregister" @handle_subscribe "HandleSubscribe" - @handle_unsubscribe "HandleUnsubscribe" + # @handle_unsubscribe "HandleUnsubscribe" @handle_publish "HandlePublish" - @handle_interrupt "HandleError" + # @handle_interrupt "HandleInterrupt" @handle_abort "HandleAbort" @handle_goodbye "HandleGoodbye" - @handle_cancel "HandleCancel" + # @handle_cancel "HandleCancel" @impl true def handle_resource( @@ -225,16 +224,16 @@ defmodule Wampex.Router.Session do @authenticate, %SL{ data: %Sess{ + id: session_id, transport: tt, transport_pid: t, authentication: auth, challenge: challenge, - authenticate: {:authenticate, sig, dets} + authenticate: {:authenticate, sig, _dets} } } = sl ) do ch = JSON.deserialize!(challenge.challenge) - session_id = get_in(ch, ["session"]) authid = get_in(ch, ["authid"]) authrole = get_in(ch, ["authrole"]) authmethod = get_in(ch, ["authmethod"]) @@ -308,18 +307,13 @@ defmodule Wampex.Router.Session do } = sl ) do id = get_global_id() - key = "#{realm}:#{topic}" - - debug("Handling Subscription #{inspect(key)} #{inspect(opts)}") case opts do %{"match" => "wildcard"} -> - Logger.info("Putting wildcard") ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "") _ -> - Logger.info("Putting exact") - ClusterKV.put(db, key, {id, {self(), Node.self()}}) + ClusterKV.put(db, realm, topic, {id, {self(), Node.self()}}) end send_to_peer(Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}), tt, t) @@ -345,16 +339,14 @@ defmodule Wampex.Router.Session do } = sl ) do pub_id = get_global_id() - key = "#{realm}:#{topic}" {_, _, subs} = - {db, key, []} + {db, {realm, topic}, []} |> get_subscribers() |> get_prefix() - |> get_wildcard(realm) + |> get_wildcard() subs = Enum.uniq(subs) - Logger.info("Subscribers: #{inspect(subs)}") Enum.each(subs, fn {id, {pid, node}} -> send( @@ -392,11 +384,12 @@ defmodule Wampex.Router.Session do transport: tt, transport_pid: t, registrations: regs, + realm: realm, unregister: {:unregister, request_id, registration_id} } = data } = sl ) do - regs = remove_registration(regs, registration_id, db) + regs = remove_registration(regs, realm, registration_id, db) send_to_peer(Dealer.unregistered(%Unregistered{request_id: request_id}), tt, t) {:ok, %SL{sl | data: %Sess{data | registrations: regs}}} @@ -415,14 +408,12 @@ defmodule Wampex.Router.Session do registrations: regs, db: db, realm: realm, - register: {:register, rid, opts, procedure} + register: {:register, rid, _opts, procedure} } = data } = sl ) do - key = "#{realm}:#{procedure}" - debug("Handling Registration #{inspect(key)}") id = get_global_id() - ClusterKV.put(db, key, {id, {self(), Node.self()}}) + ClusterKV.put(db, realm, procedure, {id, {self(), Node.self()}}) send_to_peer(Dealer.registered(%Registered{request_id: rid, registration_id: id}), tt, t) {:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}}, @@ -439,16 +430,15 @@ defmodule Wampex.Router.Session do %Sess{ db: db, proxy: proxy, - transport: tt, - transport_pid: t, + transport: _tt, + transport_pid: _t, realm: realm, request_id: ri, call: {:call, call_id, dets, proc, al, akw} } = data } = sl ) do - key = "#{realm}:#{proc}" - {_key, callees} = ClusterKV.get(db, key, 500) + {_key, callees} = ClusterKV.get(db, realm, proc, 500) {data, actions} = case get_live_callee(proxy, callees) do @@ -548,7 +538,6 @@ defmodule Wampex.Router.Session do _, %SL{data: %Sess{transport: tt, transport_pid: t, invocations: inv}} = data ) do - Logger.info("Received Invocation #{inspect(e)}") send_to_peer(Dealer.invocation(e), tt, t) {:ok, %SL{data | data: %Sess{data.data | invocations: [{call_id, e, from} | inv]}}, []} end @@ -573,7 +562,7 @@ defmodule Wampex.Router.Session do @impl true def handle_info(event, state, data) do - Logger.info("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}") + Logger.warn("Got event #{inspect(event)} in state #{state} with data #{inspect(data)}") {:ok, data, []} end @@ -584,11 +573,14 @@ defmodule Wampex.Router.Session do transport: tt, transport_pid: t, registrations: regs, - subscriptions: subs + realm: realm, + subscriptions: _subs } }) do - Enum.each(regs, fn {id, proc} -> - remove_registration(regs, id, db) + Logger.warn("Router session terminating #{inspect(reason)}") + + Enum.each(regs, fn {id, _proc} -> + remove_registration(regs, realm, id, db) end) send_to_peer( @@ -598,11 +590,11 @@ defmodule Wampex.Router.Session do ) end - defp remove_registration(regs, registration_id, db) do + defp remove_registration(regs, realm, registration_id, db) do {id, proc} = reg = Enum.find(regs, fn - {^registration_id, proc} -> true + {^registration_id, _proc} -> true _ -> false end) @@ -610,11 +602,11 @@ defmodule Wampex.Router.Session do {id, List.delete(values, value)} end - ClusterKV.update(db, proc, {id, {self(), Node.self()}}, filter) + ClusterKV.update(db, realm, proc, {id, {self(), Node.self()}}, filter) List.delete(regs, reg) end - defp get_live_callee(proxy, []), do: {:error, :no_live_callees} + defp get_live_callee(_proxy, []), do: {:error, :no_live_callees} defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do case GenServer.call({proxy, node}, {:is_up, pid}) do @@ -623,34 +615,21 @@ defmodule Wampex.Router.Session do end end - defp get_subscribers({db, key, acc}) do - case ClusterKV.get(db, key, 500) do - {_, subscribers} -> {db, key, acc ++ subscribers} - _ -> {db, key, acc} + defp get_subscribers({db, {keyspace, key}, acc}) do + case ClusterKV.get(db, keyspace, key, 500) do + {_, subscribers} -> {db, {keyspace, key}, acc ++ subscribers} + _ -> {db, {keyspace, key}, acc} end end - defp get_prefix({db, key, acc}) do - case ClusterKV.prefix(db, key, ".", 2, 500) do - {_, subscribers} -> - {db, key, acc ++ subscribers} - - l when is_list(l) -> - {db, key, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)} - - _ -> - {db, key, acc} - end + defp get_prefix({db, {keyspace, key}, acc}) do + l = ClusterKV.prefix(db, keyspace, key, ".", 2, 500) + {db, {keyspace, key}, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)} end - defp get_wildcard({db, key, acc}, realm) do - case ClusterKV.wildcard(db, realm, key, ".", ":", "") do - l when is_list(l) -> - {db, key, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)} - - _ -> - {db, key, acc} - end + defp get_wildcard({db, {keyspace, key}, acc}) do + l = ClusterKV.wildcard(db, keyspace, key, ".", ":", "") + {db, {keyspace, key}, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)} end defp send_to_peer(msg, transport, pid) do diff --git a/lib/wampex.ex b/lib/wampex.ex index 4bbddba..7b0565e 100644 --- a/lib/wampex.ex +++ b/lib/wampex.ex @@ -26,14 +26,13 @@ defmodule Wampex do @type register :: {:register, request_id :: integer(), details :: map(), procedure :: String.t()} - @type unregister :: - {:unregister, request_id :: integer(), id :: integer()} + @type unregister :: {:unregister, request_id :: integer(), registration_id :: integer()} @type unsubscribe :: {:unsubscribe, request_id :: integer(), id :: integer()} @type subscribe :: - {:subscribe, request_id :: integer(), topic :: String.t(), details :: map()} + {:subscribe, request_id :: integer(), details :: map(), topic :: String.t()} @type invocation :: {:invocation, request_id :: integer(), registration_id :: integer(), details :: map(), arg_list :: arg_list(), arg_keywords :: arg_keyword()} @@ -44,9 +43,8 @@ defmodule Wampex do {:error, reason :: binary()} | {:error, type :: integer(), error :: binary(), arg_list :: arg_list(), arg_keyword :: arg_keyword()} - @type handle_response :: - {:ok, integer()} - | publish() + @type messages :: + publish() | hello() | authenticate() | unsubscribe() @@ -56,7 +54,11 @@ defmodule Wampex do | unregister() | call() | yield() - | {:ok, details :: map(), arg_list :: arg_list(), arg_keyword :: arg_keyword()} | error() | event() + + @type handle_response :: + {:ok, integer()} + | {:ok, details :: map(), arg_list :: arg_list(), arg_keyword :: arg_keyword()} + | {:update, atom(), messages()} end