From: Christopher Date: Mon, 16 Mar 2020 03:39:28 +0000 (-0500) Subject: initial broker support X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=f0e5835f3211f58cf12c07f83e0ad27ccc52b9ff;p=wampex_router.git initial broker support --- diff --git a/lib/router/proxy.ex b/lib/router/proxy.ex index c931e07..2ddb7d6 100644 --- a/lib/router/proxy.ex +++ b/lib/router/proxy.ex @@ -14,4 +14,6 @@ defmodule Wampex.Router.Proxy do send(to, e) {:noreply, state} end + + def handle_call({:is_up, pid}, _from, s), do: {:reply, Process.alive?(pid), s} end diff --git a/lib/router/session.ex b/lib/router/session.ex index 7aded9e..fda6cab 100644 --- a/lib/router/session.ex +++ b/lib/router/session.ex @@ -12,6 +12,7 @@ defmodule Wampex.Router.Session do alias Wampex.Roles.Peer.{Challenge, Welcome} alias Wampex.Router alias __MODULE__, as: Sess + alias Broker.{Subscribed, Published, Event} alias Dealer.{Invocation, Registered, Result, Unregistered} @enforce_keys [:transport, :transport_pid] @@ -35,6 +36,7 @@ defmodule Wampex.Router.Session do :realm, :name, :goodbye, + :peer_information, roles: [Peer, Broker, Dealer], registrations: [], subscriptions: [], @@ -135,14 +137,13 @@ defmodule Wampex.Router.Session do @handle_message, _, _, - %SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data + %SL{data: %Sess{message: msg, roles: roles}} = data ) do debug("Handling Message #{inspect(msg)}") {actions, id, response} = handle_message(msg, roles) - {%SL{data: sess} = data, response} = maybe_update_response(data, response) - {requests, actions} = resp = handle_response(id, actions, requests, response) - debug("Response: #{inspect(resp)}") - {:ok, %SL{data: %Sess{sess | requests: requests}}, actions} + {data, response} = maybe_update_response(data, response) + debug("Response: #{inspect(response)}") + {:ok, data, actions} end @impl true @@ -151,14 +152,133 @@ defmodule Wampex.Router.Session do _, @hello, %SL{ - data: %Sess{id: id, transport: tt, transport_pid: t} + data: + %Sess{ + id: id, + transport: tt, + transport_pid: t, + hello: {:hello, realm, dets} + } = data } = sl ) do # TODO check for authentication request tt.send_request(t, Peer.welcome(%Welcome{session_id: id})) + + {:ok, %SL{sl | data: %Sess{data | realm: realm, peer_information: dets}}, + [{:next_event, :internal, :transition}]} + end + + @impl true + def handle_resource( + @handle_subscribe, + _, + @subscribe, + %SL{ + data: + %Sess{ + transport: tt, + transport_pid: t, + subscriptions: subs, + realm: realm, + subscribe: {:subscribe, ri, opts, topic}, + db: db + } = data + } = sl + ) do + id = get_global_id() + key = "#{realm}:#{topic}" + + debug("Handling Subscription #{inspect(key)} #{inspect(opts)}") + + case opts do + %{"match" => "wildcard"} -> + Logger.info("Putting wildcard") + ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "") + + _ -> + Logger.info("Putting exact") + ClusterKV.put(db, key, {id, {self(), Node.self()}}) + end + + tt.send_request(t, Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id})) + + {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic} | subs]}}, + [{:next_event, :internal, :transition}]} + end + + @impl true + def handle_resource( + @handle_publish, + _, + @publish, + %SL{ + data: %Sess{ + proxy: proxy, + realm: realm, + transport: tt, + transport_pid: t, + db: db, + publish: {:publish, id, opts, topic, arg_l, arg_kw} + } + } = sl + ) do + pub_id = get_global_id() + key = "#{realm}:#{topic}" + + {_, _, subs} = + {db, key, []} + |> get_subscribers() + |> get_prefix() + |> get_wildcard(realm) + + subs = Enum.uniq(subs) + Logger.info("Subscribers: #{inspect(subs)}") + + Enum.each(subs, fn {id, {pid, node}} -> + send( + {proxy, node}, + {%Event{ + subscription_id: id, + publication_id: pub_id, + arg_list: arg_l, + arg_kw: arg_kw, + options: opts + }, pid} + ) + end) + {:ok, sl, [{:next_event, :internal, :transition}]} end + @impl true + def handle_resource( + @handle_unregister, + _, + @unregister, + %SL{ + data: + %Sess{ + db: db, + registrations: regs, + unregister: {:unregister, request_id, registration_id} + } = data + } = sl + ) do + {id, proc} = + reg = + Enum.find(regs, fn + {^registration_id, proc} -> true + _ -> false + end) + + filter = fn {id, values}, value -> + {id, List.delete(values, value)} + end + + ClusterKV.update(db, proc, {id, {self(), Node.self()}}, filter) + {:ok, %SL{sl | data: %Sess{data | registrations: List.delete(regs, reg)}}} + end + @impl true def handle_resource( @handle_register, @@ -171,13 +291,15 @@ defmodule Wampex.Router.Session do transport_pid: t, registrations: regs, db: db, + realm: realm, register: {:register, rid, opts, procedure} } = data } = sl ) do - debug("Handling Registration #{inspect(procedure)}") + key = "#{realm}:#{procedure}" + debug("Handling Registration #{inspect(key)}") id = get_global_id() - ClusterKV.put(db, procedure, {id, {self(), Node.self()}}) + ClusterKV.put(db, key, {id, {self(), Node.self()}}) tt.send_request(t, Dealer.registered(%Registered{request_id: rid, registration_id: id})) {:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}}, @@ -196,34 +318,54 @@ defmodule Wampex.Router.Session do proxy: proxy, transport: tt, transport_pid: t, - call: {:call, ri, dets, proc, al, akw} + realm: realm, + request_id: ri, + call: {:call, call_id, dets, proc, al, akw} } = data } = sl ) do - {key, [{id, {pid, node}} | _]} = ClusterKV.get(db, proc, 500) + key = "#{realm}:#{proc}" + {_key, callees} = ClusterKV.get(db, key, 500) + + req_id = get_request_id(ri) + + actions = + case get_live_callee(proxy, callees) do + {id, {pid, node}} -> + send( + {proxy, node}, + { + { + call_id, + %Invocation{ + request_id: req_id, + registration_id: id, + options: dets, + arg_list: al, + arg_kw: akw + }, + {self(), Node.self()} + }, + pid + } + ) + + [{:next_event, :internal, :transition}] + + {:error, :no_live_callees} = er -> + [{:next_event, :internal, er}] + end - Logger.info( - "Sending Invocation to #{inspect(proxy)} at node #{inspect(node)} with pid #{inspect(pid)}" - ) + {:ok, %SL{sl | data: %Sess{data | request_id: req_id}}, actions} + end - send( - {proxy, node}, - { - { - %Invocation{ - request_id: ri, - registration_id: id, - options: dets, - arg_list: al, - arg_kw: akw - }, - {self(), Node.self()} - }, - pid - } - ) + defp get_live_callee(proxy, []), do: {:error, :no_live_callees} - {:ok, sl, [{:next_event, :internal, :transition}]} + defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do + case GenServer.call({proxy, node}, {:is_up, pid}) do + true -> c + false -> get_live_callee(proxy, t) + end end @impl true @@ -234,20 +376,20 @@ defmodule Wampex.Router.Session do %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} = data ) do - {_, {pid, node}} = + {call_id, _, {pid, node}} = Enum.find(inv, fn - {%Invocation{request_id: ^id}, _} -> true + {_, %Invocation{request_id: ^id}, _} -> true _ -> false end) send( {proxy, node}, - {%Result{request_id: id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid} + {%Result{request_id: call_id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid} ) inv = Enum.filter(inv, fn - {%Invocation{request_id: ^id}, _} -> false + {_, %Invocation{request_id: ^id}, _} -> false _ -> false end) @@ -255,20 +397,6 @@ defmodule Wampex.Router.Session do [{:next_event, :internal, :transition}]} end - @impl true - def handle_resource( - @handle_subscribe, - _, - @subscribe, - %SL{ - data: %Sess{transport: tt, transport_pid: t, subscribe: s} - } = sl - ) do - # TODO check for authentication request - debug("Handling Subscription #{inspect(s)}") - {:ok, sl, [{:next_event, :internal, :transition}]} - end - @impl true def handle_resource(@handle_abort, _, @abort, data) do Logger.warn("Aborting: #{data.data.error}") @@ -282,7 +410,7 @@ defmodule Wampex.Router.Session do @goodbye, %SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data ) do - tt.send_message(t, Peer.goodbye(goodbye)) + send_to_peer(Peer.goodbye(goodbye), tt, t) {:ok, data, []} end @@ -294,18 +422,24 @@ defmodule Wampex.Router.Session do @impl true def handle_info( - {%Invocation{} = e, from}, + {call_id, %Invocation{} = e, from}, _, %SL{data: %Sess{transport: tt, transport_pid: t, invocations: inv}} = data ) do Logger.info("Received Invocation #{inspect(e)}") - tt.send_request(t, Dealer.invocation(e)) - {:ok, %SL{data | data: %Sess{data.data | invocations: [{e, from} | inv]}}, []} + send_to_peer(Dealer.invocation(e), tt, t) + {:ok, %SL{data | data: %Sess{data.data | invocations: [{call_id, e, from} | inv]}}, []} + end + + @impl true + def handle_info(%Event{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do + send_to_peer(Broker.event(e), tt, t) + {:ok, data, []} end @impl true def handle_info(%Result{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do - tt.send_request(t, Dealer.result(e)) + send_to_peer(Dealer.result(e), tt, t) {:ok, data, []} end @@ -321,86 +455,73 @@ defmodule Wampex.Router.Session do {:ok, data, []} end - defp handle_response(nil, actions, requests, _), do: {requests, actions} - - defp handle_response(id, actions, requests, response) do - Enum.reduce_while(requests, {requests, actions}, fn - {^id, nil}, _ -> - {:halt, {remove_request(id, requests), actions}} + @impl true + def handle_termination(reason, _, %SL{ + data: %Sess{db: db, registrations: regs, subscriptions: subs} + }) do + filter = fn {id, values}, value -> + {id, List.delete(values, value)} + end - {^id, from}, _ -> - {:halt, {remove_request(id, requests), [get_response_action(from, response) | actions]}} + me = {self(), Node.self()} - {_, _}, acc -> - {:cont, acc} + Enum.each(regs, fn {id, proc} -> + ClusterKV.update(db, proc, {id, me}, filter) end) end - defp get_global_id do - Enum.random(0..@max_id) + defp get_subscribers({db, key, acc}) do + case ClusterKV.get(db, key, 500) do + {_, subscribers} -> {db, key, acc ++ subscribers} + _ -> {db, key, acc} + end end - defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do - send(from, {:progress, arg_l, arg_kw}) - [] - end + defp get_prefix({db, key, acc}) do + case ClusterKV.prefix(db, key, ".", 2, 500) do + {_, subscribers} -> + {db, key, acc ++ subscribers} - defp get_response_action(from, response), do: {:reply, from, response} + l when is_list(l) -> + {db, key, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)} - defp maybe_update_response(data, {:update, key, resp}) do - {%SL{data | data: Map.put(data.data, key, resp)}, resp} + _ -> + {db, key, acc} + end end - defp maybe_update_response(data, resp), do: {data, resp} + defp get_wildcard({db, key, acc}, realm) do + case ClusterKV.wildcard(db, realm, key, ".", ":", "") do + l when is_list(l) -> + {db, key, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)} - defp do_send(r_id, tt, t, message) do - {request_id, message} = maybe_inject_request_id(r_id, message) - tt.send_request(t, remove_nil_values(message)) - request_id + _ -> + {db, key, acc} + end end - defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1) - - defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {r_id, message} - - defp maybe_inject_request_id(r_id, message) do - request_id = get_request_id(r_id) - {request_id, List.insert_at(message, 1, request_id)} + defp send_to_peer(msg, transport, pid) do + transport.send_request(pid, remove_nil_values(msg)) end - defp get_request_id(current_id) when current_id == @max_id do - 1 + defp get_global_id do + Enum.random(0..@max_id) end - defp get_request_id(current_id) do - current_id + 1 + defp maybe_update_response(data, {:update, key, resp}) do + {%SL{data | data: Map.put(data.data, key, resp)}, resp} end - defp remove_request(id, requests) do - Enum.filter(requests, fn - {^id, _} -> false - {_id, _} -> true - end) - end + defp maybe_update_response(data, resp), do: {data, resp} - defp send_message_queue(r_id, [], _, _, reqs), do: {r_id, reqs} + defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1) - defp send_message_queue(r_id, mq, tt, t, reqs) do - mq - |> Enum.reverse() - |> Enum.reduce({r_id, reqs}, fn {request, from}, {id, requests} -> - r = do_send(id, tt, t, request) - {r, [{r, from} | requests]} - end) + defp get_request_id(current_id) when current_id == @max_id do + 1 end - def remove_cast_requests([]), do: [] - - def remove_cast_requests(requests) do - Enum.filter(requests, fn - {_, nil} -> false - {_, _} -> true - end) + defp get_request_id(current_id) do + current_id + 1 end defp handle_message(msg, roles) do diff --git a/mix.exs b/mix.exs index 66459e1..19e52f7 100644 --- a/mix.exs +++ b/mix.exs @@ -37,9 +37,9 @@ defmodule Wampex.MixProject do defp deps do [ - {:cluster_kv, - git: "https://gitlab.com/entropealabs/cluster_kv.git", - tag: "c274b77acb2711ac0c8a79253cb7870c606f7297"}, + {:cluster_kv, path: "../cluster_kv"}, + # git: "https://gitlab.com/entropealabs/cluster_kv.git", + # tag: "c274b77acb2711ac0c8a79253cb7870c606f7297"}, {:cors_plug, "~> 2.0"}, {:credo, "~> 1.2", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false}, diff --git a/priv/router.json b/priv/router.json index a0b99a3..9f60d81 100644 --- a/priv/router.json +++ b/priv/router.json @@ -96,7 +96,13 @@ "Call": { "Type": "Task", "Resource": "HandleCall", - "Next": "Established" + "Next": "Established", + "Catch": [ + { + "ErrorEquals": ["{:error, :no_live_callees}"], + "Next": "Error" + } + ] }, "Yield": { "Type": "Task", diff --git a/test/support/test_subscriber.ex b/test/support/test_subscriber.ex index 949b36a..1c803b3 100644 --- a/test/support/test_subscriber.ex +++ b/test/support/test_subscriber.ex @@ -12,6 +12,18 @@ defmodule TestSubscriber do def init({test, name, topic}) do {:ok, sub} = Client.subscribe(name, %Subscribe{topic: topic}) + {:ok, sub} = Client.subscribe(name, %Subscribe{topic: "com.data.test"}) + {:ok, sub} = Client.subscribe(name, %Subscribe{topic: "com.data"}) + + {:ok, sub} = + Client.subscribe(name, %Subscribe{topic: "com...temp", options: %{"match" => "wildcard"}}) + + {:ok, sub} = + Client.subscribe(name, %Subscribe{ + topic: "com..test.temp", + options: %{"match" => "wildcard"} + }) + send(test, {:subscribed, sub}) {:ok, {test, name, sub}} end diff --git a/test/wampex_test.exs b/test/wampex_test.exs index a1dfa47..a559616 100644 --- a/test/wampex_test.exs +++ b/test/wampex_test.exs @@ -19,7 +19,7 @@ defmodule WampexTest do @url "ws://localhost:4000/ws" @auth %Authentication{authid: "entone", authmethods: ["wampcra"], secret: "test1234"} - @realm %Realm{name: "com.myrealm"} + @realm %Realm{name: "myrealm"} @roles [Callee, Caller, Publisher, Subscriber] @device "as987d9a8sd79a87ds" @@ -281,7 +281,7 @@ defmodule WampexTest do assert_receive {:registered, id} end - @tag :client + @tag :abort test "abort" do Process.flag(:trap_exit, true) callee_name = TestAbort @@ -306,7 +306,8 @@ defmodule WampexTest do procedure: "com.actuator.#{@device}.light", arg_list: [1], arg_kw: %{color: "#FFFFFF"} - }) + }), + 60 ) assert_receive {:invocation, _, _, _, _, _} @@ -324,7 +325,7 @@ defmodule WampexTest do test "subscriber receives events from publisher" do name = TestSubscriberEvents Client.start_link(name: name, session: @session) - TestSubscriber.start_link(self(), name, "com.data.temp") + TestSubscriber.start_link(self(), name, "com.data.test.temp") assert_receive {:subscribed, id} Client.start_link(name: TestPublisher, session: @session) @@ -332,12 +333,14 @@ defmodule WampexTest do Client.cast_send_request( TestPublisher, Publisher.publish(%Publish{ - topic: "com.data.temp", + topic: "com.data.test.temp", arg_list: [12.5, 45.6, 87.5], arg_kw: %{loc: "60645"} }) ) assert_receive {:event, _, _, _, _, _}, 2000 + assert_receive {:event, _, _, _, _, _}, 2000 + assert_receive {:event, _, _, _, _, _}, 2000 end end