]> Entropealabs - cluster_kv.git/commitdiff
clean up interfaces
authorChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 01:11:10 +0000 (19:11 -0600)
committerChristopher <chris@entropealabs.com>
Sun, 8 Mar 2020 01:11:10 +0000 (19:11 -0600)
lib/cluster_kv/db.ex
lib/cluster_kv/ring.ex

index ffd69cc91cea33fdfbfd444d7a14d1c209d2825d..f3be208f061a50858f7ced2b75de2f60801055d9 100644 (file)
@@ -4,7 +4,7 @@ defmodule ClusterKV.DB do
 
   alias __MODULE__
 
-  @db_options [:set]
+  @db_options [:set, :public]
 
   defstruct [:db]
 
@@ -22,9 +22,9 @@ defmodule ClusterKV.DB do
     GenServer.cast(name, {:put, key, value})
   end
 
-  @spec put_or_update(name :: module(), key :: binary(), value :: term()) :: :ok
-  def put_or_update(name, key, value) do
-    GenServer.cast(name, {:put_or_update, key, value})
+  @spec upsert(name :: module(), key :: binary(), value :: term(), fun()) :: :ok
+  def upsert(name, key, value, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do
+    GenServer.cast(name, {:upsert, key, value, fun})
   end
 
   def stream(name) do
@@ -42,29 +42,48 @@ defmodule ClusterKV.DB do
   end
 
   def handle_call(:stream, _from, %DB{db: db} = state) do
-    {:reply, Stream.iterate(:ets.first(db), &:ets.next(db, &1)), state}
+    {:reply, get_stream(db), state}
   end
 
   def handle_call({:get, key}, _from, %DB{db: db} = state) do
-    {:reply,
-     case :ets.lookup(db, key) do
-       [] -> :not_found
-       [other] -> other
-     end, state}
+    {:reply, do_get(db, key), state}
   end
 
   def handle_cast({:put, key, value}, %DB{db: db} = state) do
-    :ets.insert(db, {key, value})
+    :ets.insert(db, {key, [value]})
     {:noreply, state}
   end
 
-  def handle_cast({:put_or_update, key, value}, %DB{db: db} = state) do
+  def handle_cast({:upsert, key, value, fun}, %DB{db: db} = state) do
     case :ets.lookup(db, key) do
-      [] -> :ets.insert(db, {key, value})
-      [{_, val}] when is_list(val) -> :ets.insert(db, {key, [value | val]})
-      [{_, val}] -> :ets.insert(db, {key, [value, val]})
+      [] ->
+        :ets.insert(db, {key, [value]})
+
+      [{_, val} = old] when is_list(val) ->
+        new = fun.(old, value)
+        1 = :ets.select_replace(db, [{old, [], [{:const, new}]}])
     end
 
     {:noreply, state}
   end
+
+  defp do_get(db, key) do
+    case :ets.lookup(db, key) do
+      [] -> :not_found
+      [other] -> other
+    end
+  end
+
+  defp get_stream(db) do
+    db
+    |> :ets.first()
+    |> Stream.iterate(&:ets.next(db, &1))
+    |> Stream.take_while(fn
+      :"$end_of_table" -> false
+      _ -> true
+    end)
+    |> Stream.map(fn key ->
+      do_get(db, key)
+    end)
+  end
 end
index dfc1420f05d3044e668de9c586f4451238227968..70825ada8bc0189230e4cf25b8ef0b07a83d6b84 100644 (file)
@@ -117,7 +117,7 @@ defmodule ClusterKV.Ring do
         @ready,
         %SL{data: %Ring{db: db}} = sl
       ) do
-    {:ok, sl, [{:reply, from, do_stream(db)}]}
+    {:ok, sl, [{:reply, from, DB.stream(db)}]}
   end
 
   def handle_call(
@@ -127,7 +127,6 @@ defmodule ClusterKV.Ring do
         %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl
       ) do
     node = get_node(key, r, me, repls)
-
     ref = make_ref()
     send({n, node}, {:get_key, key, ref, me})
     {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, 1, []} | reqs]}}, []}
@@ -185,22 +184,9 @@ defmodule ClusterKV.Ring do
         @ready,
         %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
       ) do
-    parts =
-      key
-      |> String.split(split_on)
-
-    Enum.take_while(Enum.with_index(parts), fn {k, i} ->
-      case k == wildcard do
-        true ->
-          true
-
-        false ->
-          k = Enum.join([header, k, i], join)
-          nodes = HashRing.key_to_nodes(r, k, repls)
-          send_sync(n, nodes, k, {parts, value})
-          false
-      end
-    end)
+    key
+    |> String.split(split_on)
+    |> send_wildcard(header, wildcard, join, value, n, r, repls)
 
     {:ok, sl, []}
   end
@@ -219,15 +205,12 @@ defmodule ClusterKV.Ring do
 
   def handle_cast(_, _, sl), do: {:ok, sl, []}
 
-  def handle_info({:sync, keys}, @ready, %SL{data: %Ring{db: db}} = sl) do
-    Enum.each(keys, fn {key, value} ->
-      DB.put_or_update(db, key, value)
-    end)
-
+  def handle_info({:sync, key, value}, @ready, %SL{data: %Ring{db: db}} = sl) do
+    DB.upsert(db, key, value)
     {:ok, sl, []}
   end
 
-  def handle_info({:sync, _keys}, _, sl), do: {:ok, sl, []}
+  def handle_info({:sync, _key, _v}, _, sl), do: {:ok, sl, []}
 
   def handle_info({:get_key, key, ref, node}, @ready, %SL{data: %Ring{name: n, db: db}} = sl) do
     val = DB.get(db, key)
@@ -240,23 +223,13 @@ defmodule ClusterKV.Ring do
         @ready,
         %SL{data: %Ring{name: n, db: db}} = sl
       ) do
-    vals = DB.get(db, key)
-    Logger.info("Got Wildcard Value: #{inspect(vals)}")
-
-    vals =
-      case vals do
+    val =
+      case DB.get(db, key) do
         :not_found ->
           :not_found
 
-        vals ->
+        {_k, vals} ->
           parts = Enum.with_index(parts)
-          {_k, vals} = vals
-
-          vals =
-            case vals do
-              v when is_list(v) -> v
-              v -> [v]
-            end
 
           Enum.filter(vals, fn {p, _val} ->
             Logger.info("Checking wilcard vals #{inspect(p)} against #{inspect(parts)}")
@@ -264,21 +237,14 @@ defmodule ClusterKV.Ring do
             Enum.all?(parts, fn {k, i} ->
               case Enum.at(p, i) do
                 ^k -> true
-                w when w == wildcard -> true
+                ^wildcard -> true
                 _ -> false
               end
             end)
           end)
       end
 
-    vals =
-      case vals do
-        [] -> :not_found
-        [o] -> o
-        o -> o
-      end
-
-    send({n, node}, {:reply, vals, ref})
+    send({n, node}, {:reply, val, ref})
     {:ok, sl, []}
   end
 
@@ -290,22 +256,22 @@ defmodule ClusterKV.Ring do
   def handle_info(
         {:nodeup, node, info},
         _,
-        %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl
+        %SL{data: %Ring{name: _n, node: _me, db: _db, ring: r, replicas: _repls} = data} = sl
       ) do
     Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
     ring = HashRing.add_node(r, node)
-    sync_db(n, me, db, ring, repls)
+    sync_db(n, me, db, ring, repls)
     {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]}
   end
 
   def handle_info(
         {:nodedown, node, info},
         _,
-        %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl
+        %SL{data: %Ring{name: _n, node: _me, db: _db, ring: r, replicas: _repls} = data} = sl
       ) do
     Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
     ring = HashRing.remove_node(r, node)
-    sync_db(n, me, db, ring, repls)
+    sync_db(n, me, db, ring, repls)
     {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
   end
 
@@ -320,47 +286,17 @@ defmodule ClusterKV.Ring do
     send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me})
   end
 
-  defp get_node(key, r, node, repls) do
-    nodes = HashRing.key_to_nodes(r, key, repls)
-
-    case node in nodes do
-      true -> node
-      false -> Enum.random(nodes)
-    end
-  end
-
-  defp do_stream(db) do
-    db
-    |> DB.stream()
-    |> Stream.take_while(fn
-      :"$end_of_table" -> false
-      _ -> true
-    end)
-    |> Stream.map(fn key ->
-      DB.get(db, key)
-    end)
-  end
+  def send_wildcard(parts, header, wildcard, join, value, name, ring, repls) do
+    parts
+    |> Enum.with_index()
+    |> Enum.take_while(fn
+      {^wildcard, _} ->
+        true
 
-  defp sync_db(name, me, db, ring, repls) do
-    Task.start(fn ->
-      db
-      |> do_stream()
-      |> Stream.chunk_every(10)
-      |> Stream.each(fn chunk ->
-        Enum.each(chunk, fn {key, v} ->
-          nodes =
-            ring
-            |> HashRing.key_to_nodes(key, repls)
-            |> Enum.filter(fn
-              ^me -> false
-              _ -> true
-            end)
-
-          Logger.debug("Sync: #{key}: #{inspect(v)}")
-          send_sync(name, nodes, key, v)
-        end)
-      end)
-      |> Stream.run()
+      {k, i} ->
+        k = Enum.join([header, k, i], join)
+        nodes = HashRing.key_to_nodes(ring, k, repls)
+        send_sync(name, nodes, k, {parts, value})
     end)
   end
 
@@ -379,6 +315,7 @@ defmodule ClusterKV.Ring do
   defp get_final_value(res, val) when is_list(res),
     do:
       [val | res]
+      |> List.flatten()
       |> Enum.filter(fn
         :not_found -> false
         _ -> true
@@ -407,7 +344,7 @@ defmodule ClusterKV.Ring do
 
   defp send_sync(name, nodes, key, value) do
     Enum.each(nodes, fn n ->
-      send({name, n}, {:sync, [{key, value}]})
+      send({name, n}, {:sync, key, value})
     end)
   end
 
@@ -416,4 +353,13 @@ defmodule ClusterKV.Ring do
       HashRing.add_node(acc, n)
     end)
   end
+
+  defp get_node(key, r, node, repls) do
+    nodes = HashRing.key_to_nodes(r, key, repls)
+
+    case node in nodes do
+      true -> node
+      false -> Enum.random(nodes)
+    end
+  end
 end