]> Entropealabs - wampex_client.git/commitdiff
initial broker support
authorChristopher <chris@entropealabs.com>
Mon, 16 Mar 2020 03:39:28 +0000 (22:39 -0500)
committerChristopher <chris@entropealabs.com>
Mon, 16 Mar 2020 03:39:28 +0000 (22:39 -0500)
lib/router/proxy.ex
lib/router/session.ex
mix.exs
priv/router.json
test/support/test_subscriber.ex
test/wampex_test.exs

index c931e07a7e6d22fb4440c586aa3a5196c5968bdd..2ddb7d6e854c78c680d00fe6cbb106622cefe71d 100644 (file)
@@ -14,4 +14,6 @@ defmodule Wampex.Router.Proxy do
     send(to, e)
     {:noreply, state}
   end
+
+  def handle_call({:is_up, pid}, _from, s), do: {:reply, Process.alive?(pid), s}
 end
index 7aded9e18059d5fdd84aa5920dc009828ed738ec..fda6cabfde622b99f0c6194c84c057830c655bfd 100644 (file)
@@ -12,6 +12,7 @@ defmodule Wampex.Router.Session do
   alias Wampex.Roles.Peer.{Challenge, Welcome}
   alias Wampex.Router
   alias __MODULE__, as: Sess
+  alias Broker.{Subscribed, Published, Event}
   alias Dealer.{Invocation, Registered, Result, Unregistered}
 
   @enforce_keys [:transport, :transport_pid]
@@ -35,6 +36,7 @@ defmodule Wampex.Router.Session do
     :realm,
     :name,
     :goodbye,
+    :peer_information,
     roles: [Peer, Broker, Dealer],
     registrations: [],
     subscriptions: [],
@@ -135,14 +137,13 @@ defmodule Wampex.Router.Session do
         @handle_message,
         _,
         _,
-        %SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data
+        %SL{data: %Sess{message: msg, roles: roles}} = data
       ) do
     debug("Handling Message #{inspect(msg)}")
     {actions, id, response} = handle_message(msg, roles)
-    {%SL{data: sess} = data, response} = maybe_update_response(data, response)
-    {requests, actions} = resp = handle_response(id, actions, requests, response)
-    debug("Response: #{inspect(resp)}")
-    {:ok, %SL{data: %Sess{sess | requests: requests}}, actions}
+    {data, response} = maybe_update_response(data, response)
+    debug("Response: #{inspect(response)}")
+    {:ok, data, actions}
   end
 
   @impl true
@@ -151,14 +152,133 @@ defmodule Wampex.Router.Session do
         _,
         @hello,
         %SL{
-          data: %Sess{id: id, transport: tt, transport_pid: t}
+          data:
+            %Sess{
+              id: id,
+              transport: tt,
+              transport_pid: t,
+              hello: {:hello, realm, dets}
+            } = data
         } = sl
       ) do
     # TODO check for authentication request
     tt.send_request(t, Peer.welcome(%Welcome{session_id: id}))
+
+    {:ok, %SL{sl | data: %Sess{data | realm: realm, peer_information: dets}},
+     [{:next_event, :internal, :transition}]}
+  end
+
+  @impl true
+  def handle_resource(
+        @handle_subscribe,
+        _,
+        @subscribe,
+        %SL{
+          data:
+            %Sess{
+              transport: tt,
+              transport_pid: t,
+              subscriptions: subs,
+              realm: realm,
+              subscribe: {:subscribe, ri, opts, topic},
+              db: db
+            } = data
+        } = sl
+      ) do
+    id = get_global_id()
+    key = "#{realm}:#{topic}"
+
+    debug("Handling Subscription #{inspect(key)} #{inspect(opts)}")
+
+    case opts do
+      %{"match" => "wildcard"} ->
+        Logger.info("Putting wildcard")
+        ClusterKV.put_wildcard(db, realm, topic, {id, {self(), Node.self()}}, ".", ":", "")
+
+      _ ->
+        Logger.info("Putting exact")
+        ClusterKV.put(db, key, {id, {self(), Node.self()}})
+    end
+
+    tt.send_request(t, Broker.subscribed(%Subscribed{request_id: ri, subscription_id: id}))
+
+    {:ok, %SL{sl | data: %Sess{data | subscriptions: [{id, topic} | subs]}},
+     [{:next_event, :internal, :transition}]}
+  end
+
+  @impl true
+  def handle_resource(
+        @handle_publish,
+        _,
+        @publish,
+        %SL{
+          data: %Sess{
+            proxy: proxy,
+            realm: realm,
+            transport: tt,
+            transport_pid: t,
+            db: db,
+            publish: {:publish, id, opts, topic, arg_l, arg_kw}
+          }
+        } = sl
+      ) do
+    pub_id = get_global_id()
+    key = "#{realm}:#{topic}"
+
+    {_, _, subs} =
+      {db, key, []}
+      |> get_subscribers()
+      |> get_prefix()
+      |> get_wildcard(realm)
+
+    subs = Enum.uniq(subs)
+    Logger.info("Subscribers: #{inspect(subs)}")
+
+    Enum.each(subs, fn {id, {pid, node}} ->
+      send(
+        {proxy, node},
+        {%Event{
+           subscription_id: id,
+           publication_id: pub_id,
+           arg_list: arg_l,
+           arg_kw: arg_kw,
+           options: opts
+         }, pid}
+      )
+    end)
+
     {:ok, sl, [{:next_event, :internal, :transition}]}
   end
 
+  @impl true
+  def handle_resource(
+        @handle_unregister,
+        _,
+        @unregister,
+        %SL{
+          data:
+            %Sess{
+              db: db,
+              registrations: regs,
+              unregister: {:unregister, request_id, registration_id}
+            } = data
+        } = sl
+      ) do
+    {id, proc} =
+      reg =
+      Enum.find(regs, fn
+        {^registration_id, proc} -> true
+        _ -> false
+      end)
+
+    filter = fn {id, values}, value ->
+      {id, List.delete(values, value)}
+    end
+
+    ClusterKV.update(db, proc, {id, {self(), Node.self()}}, filter)
+    {:ok, %SL{sl | data: %Sess{data | registrations: List.delete(regs, reg)}}}
+  end
+
   @impl true
   def handle_resource(
         @handle_register,
@@ -171,13 +291,15 @@ defmodule Wampex.Router.Session do
               transport_pid: t,
               registrations: regs,
               db: db,
+              realm: realm,
               register: {:register, rid, opts, procedure}
             } = data
         } = sl
       ) do
-    debug("Handling Registration #{inspect(procedure)}")
+    key = "#{realm}:#{procedure}"
+    debug("Handling Registration #{inspect(key)}")
     id = get_global_id()
-    ClusterKV.put(db, procedure, {id, {self(), Node.self()}})
+    ClusterKV.put(db, key, {id, {self(), Node.self()}})
     tt.send_request(t, Dealer.registered(%Registered{request_id: rid, registration_id: id}))
 
     {:ok, %SL{sl | data: %Sess{data | registrations: [{id, procedure} | regs]}},
@@ -196,34 +318,54 @@ defmodule Wampex.Router.Session do
               proxy: proxy,
               transport: tt,
               transport_pid: t,
-              call: {:call, ri, dets, proc, al, akw}
+              realm: realm,
+              request_id: ri,
+              call: {:call, call_id, dets, proc, al, akw}
             } = data
         } = sl
       ) do
-    {key, [{id, {pid, node}} | _]} = ClusterKV.get(db, proc, 500)
+    key = "#{realm}:#{proc}"
+    {_key, callees} = ClusterKV.get(db, key, 500)
+
+    req_id = get_request_id(ri)
+
+    actions =
+      case get_live_callee(proxy, callees) do
+        {id, {pid, node}} ->
+          send(
+            {proxy, node},
+            {
+              {
+                call_id,
+                %Invocation{
+                  request_id: req_id,
+                  registration_id: id,
+                  options: dets,
+                  arg_list: al,
+                  arg_kw: akw
+                },
+                {self(), Node.self()}
+              },
+              pid
+            }
+          )
+
+          [{:next_event, :internal, :transition}]
+
+        {:error, :no_live_callees} = er ->
+          [{:next_event, :internal, er}]
+      end
 
-    Logger.info(
-      "Sending Invocation to #{inspect(proxy)} at node #{inspect(node)} with pid #{inspect(pid)}"
-    )
+    {:ok, %SL{sl | data: %Sess{data | request_id: req_id}}, actions}
+  end
 
-    send(
-      {proxy, node},
-      {
-        {
-          %Invocation{
-            request_id: ri,
-            registration_id: id,
-            options: dets,
-            arg_list: al,
-            arg_kw: akw
-          },
-          {self(), Node.self()}
-        },
-        pid
-      }
-    )
+  defp get_live_callee(proxy, []), do: {:error, :no_live_callees}
 
-    {:ok, sl, [{:next_event, :internal, :transition}]}
+  defp get_live_callee(proxy, [{_id, {pid, node}} = c | t]) do
+    case GenServer.call({proxy, node}, {:is_up, pid}) do
+      true -> c
+      false -> get_live_callee(proxy, t)
+    end
   end
 
   @impl true
@@ -234,20 +376,20 @@ defmodule Wampex.Router.Session do
         %SL{data: %Sess{proxy: proxy, invocations: inv, yield: {:yield, id, dets, arg_l, arg_kw}}} =
           data
       ) do
-    {_, {pid, node}} =
+    {call_id, _, {pid, node}} =
       Enum.find(inv, fn
-        {%Invocation{request_id: ^id}, _} -> true
+        {_, %Invocation{request_id: ^id}, _} -> true
         _ -> false
       end)
 
     send(
       {proxy, node},
-      {%Result{request_id: id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid}
+      {%Result{request_id: call_id, arg_list: arg_l, arg_kw: arg_kw, options: dets}, pid}
     )
 
     inv =
       Enum.filter(inv, fn
-        {%Invocation{request_id: ^id}, _} -> false
+        {_, %Invocation{request_id: ^id}, _} -> false
         _ -> false
       end)
 
@@ -255,20 +397,6 @@ defmodule Wampex.Router.Session do
      [{:next_event, :internal, :transition}]}
   end
 
-  @impl true
-  def handle_resource(
-        @handle_subscribe,
-        _,
-        @subscribe,
-        %SL{
-          data: %Sess{transport: tt, transport_pid: t, subscribe: s}
-        } = sl
-      ) do
-    # TODO check for authentication request
-    debug("Handling Subscription #{inspect(s)}")
-    {:ok, sl, [{:next_event, :internal, :transition}]}
-  end
-
   @impl true
   def handle_resource(@handle_abort, _, @abort, data) do
     Logger.warn("Aborting: #{data.data.error}")
@@ -282,7 +410,7 @@ defmodule Wampex.Router.Session do
         @goodbye,
         %SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data
       ) do
-    tt.send_message(t, Peer.goodbye(goodbye))
+    send_to_peer(Peer.goodbye(goodbye), tt, t)
     {:ok, data, []}
   end
 
@@ -294,18 +422,24 @@ defmodule Wampex.Router.Session do
 
   @impl true
   def handle_info(
-        {%Invocation{} = e, from},
+        {call_id, %Invocation{} = e, from},
         _,
         %SL{data: %Sess{transport: tt, transport_pid: t, invocations: inv}} = data
       ) do
     Logger.info("Received Invocation #{inspect(e)}")
-    tt.send_request(t, Dealer.invocation(e))
-    {:ok, %SL{data | data: %Sess{data.data | invocations: [{e, from} | inv]}}, []}
+    send_to_peer(Dealer.invocation(e), tt, t)
+    {:ok, %SL{data | data: %Sess{data.data | invocations: [{call_id, e, from} | inv]}}, []}
+  end
+
+  @impl true
+  def handle_info(%Event{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
+    send_to_peer(Broker.event(e), tt, t)
+    {:ok, data, []}
   end
 
   @impl true
   def handle_info(%Result{} = e, _, %SL{data: %Sess{transport: tt, transport_pid: t}} = data) do
-    tt.send_request(t, Dealer.result(e))
+    send_to_peer(Dealer.result(e), tt, t)
     {:ok, data, []}
   end
 
@@ -321,86 +455,73 @@ defmodule Wampex.Router.Session do
     {:ok, data, []}
   end
 
-  defp handle_response(nil, actions, requests, _), do: {requests, actions}
-
-  defp handle_response(id, actions, requests, response) do
-    Enum.reduce_while(requests, {requests, actions}, fn
-      {^id, nil}, _ ->
-        {:halt, {remove_request(id, requests), actions}}
+  @impl true
+  def handle_termination(reason, _, %SL{
+        data: %Sess{db: db, registrations: regs, subscriptions: subs}
+      }) do
+    filter = fn {id, values}, value ->
+      {id, List.delete(values, value)}
+    end
 
-      {^id, from}, _ ->
-        {:halt, {remove_request(id, requests), [get_response_action(from, response) | actions]}}
+    me = {self(), Node.self()}
 
-      {_, _}, acc ->
-        {:cont, acc}
+    Enum.each(regs, fn {id, proc} ->
+      ClusterKV.update(db, proc, {id, me}, filter)
     end)
   end
 
-  defp get_global_id do
-    Enum.random(0..@max_id)
+  defp get_subscribers({db, key, acc}) do
+    case ClusterKV.get(db, key, 500) do
+      {_, subscribers} -> {db, key, acc ++ subscribers}
+      _ -> {db, key, acc}
+    end
   end
 
-  defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do
-    send(from, {:progress, arg_l, arg_kw})
-    []
-  end
+  defp get_prefix({db, key, acc}) do
+    case ClusterKV.prefix(db, key, ".", 2, 500) do
+      {_, subscribers} ->
+        {db, key, acc ++ subscribers}
 
-  defp get_response_action(from, response), do: {:reply, from, response}
+      l when is_list(l) ->
+        {db, key, acc ++ Enum.flat_map(l, fn {_, subscribers} -> subscribers end)}
 
-  defp maybe_update_response(data, {:update, key, resp}) do
-    {%SL{data | data: Map.put(data.data, key, resp)}, resp}
+      _ ->
+        {db, key, acc}
+    end
   end
 
-  defp maybe_update_response(data, resp), do: {data, resp}
+  defp get_wildcard({db, key, acc}, realm) do
+    case ClusterKV.wildcard(db, realm, key, ".", ":", "") do
+      l when is_list(l) ->
+        {db, key, acc ++ Enum.map(l, fn {_, subscriber} -> subscriber end)}
 
-  defp do_send(r_id, tt, t, message) do
-    {request_id, message} = maybe_inject_request_id(r_id, message)
-    tt.send_request(t, remove_nil_values(message))
-    request_id
+      _ ->
+        {db, key, acc}
+    end
   end
 
-  defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1)
-
-  defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {r_id, message}
-
-  defp maybe_inject_request_id(r_id, message) do
-    request_id = get_request_id(r_id)
-    {request_id, List.insert_at(message, 1, request_id)}
+  defp send_to_peer(msg, transport, pid) do
+    transport.send_request(pid, remove_nil_values(msg))
   end
 
-  defp get_request_id(current_id) when current_id == @max_id do
-    1
+  defp get_global_id do
+    Enum.random(0..@max_id)
   end
 
-  defp get_request_id(current_id) do
-    current_id + 1
+  defp maybe_update_response(data, {:update, key, resp}) do
+    {%SL{data | data: Map.put(data.data, key, resp)}, resp}
   end
 
-  defp remove_request(id, requests) do
-    Enum.filter(requests, fn
-      {^id, _} -> false
-      {_id, _} -> true
-    end)
-  end
+  defp maybe_update_response(data, resp), do: {data, resp}
 
-  defp send_message_queue(r_id, [], _, _, reqs), do: {r_id, reqs}
+  defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1)
 
-  defp send_message_queue(r_id, mq, tt, t, reqs) do
-    mq
-    |> Enum.reverse()
-    |> Enum.reduce({r_id, reqs}, fn {request, from}, {id, requests} ->
-      r = do_send(id, tt, t, request)
-      {r, [{r, from} | requests]}
-    end)
+  defp get_request_id(current_id) when current_id == @max_id do
+    1
   end
 
-  def remove_cast_requests([]), do: []
-
-  def remove_cast_requests(requests) do
-    Enum.filter(requests, fn
-      {_, nil} -> false
-      {_, _} -> true
-    end)
+  defp get_request_id(current_id) do
+    current_id + 1
   end
 
   defp handle_message(msg, roles) do
diff --git a/mix.exs b/mix.exs
index 66459e18d58b8f54bf08c161fc569c0348a6e436..19e52f7c744c21e353e2226de88ec73c89fb6548 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -37,9 +37,9 @@ defmodule Wampex.MixProject do
 
   defp deps do
     [
-      {:cluster_kv,
-       git: "https://gitlab.com/entropealabs/cluster_kv.git",
-       tag: "c274b77acb2711ac0c8a79253cb7870c606f7297"},
+      {:cluster_kv, path: "../cluster_kv"},
+      # git: "https://gitlab.com/entropealabs/cluster_kv.git",
+      # tag: "c274b77acb2711ac0c8a79253cb7870c606f7297"},
       {:cors_plug, "~> 2.0"},
       {:credo, "~> 1.2", only: [:dev, :test], runtime: false},
       {:dialyxir, "~> 0.5.1", only: [:dev, :test], runtime: false},
index a0b99a396db0a636e95cff5e5c027ff19de1643e..9f60d81c1eee0b409621777e344c8199319edb33 100644 (file)
     "Call": {
       "Type": "Task",
       "Resource": "HandleCall",
-      "Next": "Established"
+      "Next": "Established",
+      "Catch": [
+        {
+          "ErrorEquals": ["{:error, :no_live_callees}"],
+          "Next": "Error"
+        }
+      ]
     },
     "Yield": {
       "Type": "Task",
index 949b36a2dc9c8126b8cc2ed818376cbc0eb1b0fc..1c803b3ac9865fb67386cfe69eb143b0b553fc9c 100644 (file)
@@ -12,6 +12,18 @@ defmodule TestSubscriber do
 
   def init({test, name, topic}) do
     {:ok, sub} = Client.subscribe(name, %Subscribe{topic: topic})
+    {:ok, sub} = Client.subscribe(name, %Subscribe{topic: "com.data.test"})
+    {:ok, sub} = Client.subscribe(name, %Subscribe{topic: "com.data"})
+
+    {:ok, sub} =
+      Client.subscribe(name, %Subscribe{topic: "com...temp", options: %{"match" => "wildcard"}})
+
+    {:ok, sub} =
+      Client.subscribe(name, %Subscribe{
+        topic: "com..test.temp",
+        options: %{"match" => "wildcard"}
+      })
+
     send(test, {:subscribed, sub})
     {:ok, {test, name, sub}}
   end
index a1dfa47bb37f07e0cf2bad93c7d4e4cc0b95ae3b..a55961633d3fe2eac0b28fdbeceb53ae72e1570c 100644 (file)
@@ -19,7 +19,7 @@ defmodule WampexTest do
 
   @url "ws://localhost:4000/ws"
   @auth %Authentication{authid: "entone", authmethods: ["wampcra"], secret: "test1234"}
-  @realm %Realm{name: "com.myrealm"}
+  @realm %Realm{name: "myrealm"}
   @roles [Callee, Caller, Publisher, Subscriber]
   @device "as987d9a8sd79a87ds"
 
@@ -281,7 +281,7 @@ defmodule WampexTest do
     assert_receive {:registered, id}
   end
 
-  @tag :client
+  @tag :abort
   test "abort" do
     Process.flag(:trap_exit, true)
     callee_name = TestAbort
@@ -306,7 +306,8 @@ defmodule WampexTest do
           procedure: "com.actuator.#{@device}.light",
           arg_list: [1],
           arg_kw: %{color: "#FFFFFF"}
-        })
+        }),
+        60
       )
 
     assert_receive {:invocation, _, _, _, _, _}
@@ -324,7 +325,7 @@ defmodule WampexTest do
   test "subscriber receives events from publisher" do
     name = TestSubscriberEvents
     Client.start_link(name: name, session: @session)
-    TestSubscriber.start_link(self(), name, "com.data.temp")
+    TestSubscriber.start_link(self(), name, "com.data.test.temp")
     assert_receive {:subscribed, id}
 
     Client.start_link(name: TestPublisher, session: @session)
@@ -332,12 +333,14 @@ defmodule WampexTest do
     Client.cast_send_request(
       TestPublisher,
       Publisher.publish(%Publish{
-        topic: "com.data.temp",
+        topic: "com.data.test.temp",
         arg_list: [12.5, 45.6, 87.5],
         arg_kw: %{loc: "60645"}
       })
     )
 
     assert_receive {:event, _, _, _, _, _}, 2000
+    assert_receive {:event, _, _, _, _, _}, 2000
+    assert_receive {:event, _, _, _, _, _}, 2000
   end
 end