From: Christopher Date: Sun, 8 Mar 2020 01:11:10 +0000 (-0600) Subject: clean up interfaces X-Git-Url: http://git.entropealabs.com/?a=commitdiff_plain;h=802c2ce9827974d01a8abd8e3d15716cdb074949;p=cluster_kv.git clean up interfaces --- diff --git a/lib/cluster_kv/db.ex b/lib/cluster_kv/db.ex index ffd69cc..f3be208 100644 --- a/lib/cluster_kv/db.ex +++ b/lib/cluster_kv/db.ex @@ -4,7 +4,7 @@ defmodule ClusterKV.DB do alias __MODULE__ - @db_options [:set] + @db_options [:set, :public] defstruct [:db] @@ -22,9 +22,9 @@ defmodule ClusterKV.DB do GenServer.cast(name, {:put, key, value}) end - @spec put_or_update(name :: module(), key :: binary(), value :: term()) :: :ok - def put_or_update(name, key, value) do - GenServer.cast(name, {:put_or_update, key, value}) + @spec upsert(name :: module(), key :: binary(), value :: term(), fun()) :: :ok + def upsert(name, key, value, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do + GenServer.cast(name, {:upsert, key, value, fun}) end def stream(name) do @@ -42,29 +42,48 @@ defmodule ClusterKV.DB do end def handle_call(:stream, _from, %DB{db: db} = state) do - {:reply, Stream.iterate(:ets.first(db), &:ets.next(db, &1)), state} + {:reply, get_stream(db), state} end def handle_call({:get, key}, _from, %DB{db: db} = state) do - {:reply, - case :ets.lookup(db, key) do - [] -> :not_found - [other] -> other - end, state} + {:reply, do_get(db, key), state} end def handle_cast({:put, key, value}, %DB{db: db} = state) do - :ets.insert(db, {key, value}) + :ets.insert(db, {key, [value]}) {:noreply, state} end - def handle_cast({:put_or_update, key, value}, %DB{db: db} = state) do + def handle_cast({:upsert, key, value, fun}, %DB{db: db} = state) do case :ets.lookup(db, key) do - [] -> :ets.insert(db, {key, value}) - [{_, val}] when is_list(val) -> :ets.insert(db, {key, [value | val]}) - [{_, val}] -> :ets.insert(db, {key, [value, val]}) + [] -> + :ets.insert(db, {key, [value]}) + + [{_, val} = old] when is_list(val) -> + new = fun.(old, value) + 1 = :ets.select_replace(db, [{old, [], [{:const, new}]}]) end {:noreply, state} end + + defp do_get(db, key) do + case :ets.lookup(db, key) do + [] -> :not_found + [other] -> other + end + end + + defp get_stream(db) do + db + |> :ets.first() + |> Stream.iterate(&:ets.next(db, &1)) + |> Stream.take_while(fn + :"$end_of_table" -> false + _ -> true + end) + |> Stream.map(fn key -> + do_get(db, key) + end) + end end diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index dfc1420..70825ad 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -117,7 +117,7 @@ defmodule ClusterKV.Ring do @ready, %SL{data: %Ring{db: db}} = sl ) do - {:ok, sl, [{:reply, from, do_stream(db)}]} + {:ok, sl, [{:reply, from, DB.stream(db)}]} end def handle_call( @@ -127,7 +127,6 @@ defmodule ClusterKV.Ring do %SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl ) do node = get_node(key, r, me, repls) - ref = make_ref() send({n, node}, {:get_key, key, ref, me}) {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, 1, []} | reqs]}}, []} @@ -185,22 +184,9 @@ defmodule ClusterKV.Ring do @ready, %SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl ) do - parts = - key - |> String.split(split_on) - - Enum.take_while(Enum.with_index(parts), fn {k, i} -> - case k == wildcard do - true -> - true - - false -> - k = Enum.join([header, k, i], join) - nodes = HashRing.key_to_nodes(r, k, repls) - send_sync(n, nodes, k, {parts, value}) - false - end - end) + key + |> String.split(split_on) + |> send_wildcard(header, wildcard, join, value, n, r, repls) {:ok, sl, []} end @@ -219,15 +205,12 @@ defmodule ClusterKV.Ring do def handle_cast(_, _, sl), do: {:ok, sl, []} - def handle_info({:sync, keys}, @ready, %SL{data: %Ring{db: db}} = sl) do - Enum.each(keys, fn {key, value} -> - DB.put_or_update(db, key, value) - end) - + def handle_info({:sync, key, value}, @ready, %SL{data: %Ring{db: db}} = sl) do + DB.upsert(db, key, value) {:ok, sl, []} end - def handle_info({:sync, _keys}, _, sl), do: {:ok, sl, []} + def handle_info({:sync, _key, _v}, _, sl), do: {:ok, sl, []} def handle_info({:get_key, key, ref, node}, @ready, %SL{data: %Ring{name: n, db: db}} = sl) do val = DB.get(db, key) @@ -240,23 +223,13 @@ defmodule ClusterKV.Ring do @ready, %SL{data: %Ring{name: n, db: db}} = sl ) do - vals = DB.get(db, key) - Logger.info("Got Wildcard Value: #{inspect(vals)}") - - vals = - case vals do + val = + case DB.get(db, key) do :not_found -> :not_found - vals -> + {_k, vals} -> parts = Enum.with_index(parts) - {_k, vals} = vals - - vals = - case vals do - v when is_list(v) -> v - v -> [v] - end Enum.filter(vals, fn {p, _val} -> Logger.info("Checking wilcard vals #{inspect(p)} against #{inspect(parts)}") @@ -264,21 +237,14 @@ defmodule ClusterKV.Ring do Enum.all?(parts, fn {k, i} -> case Enum.at(p, i) do ^k -> true - w when w == wildcard -> true + ^wildcard -> true _ -> false end end) end) end - vals = - case vals do - [] -> :not_found - [o] -> o - o -> o - end - - send({n, node}, {:reply, vals, ref}) + send({n, node}, {:reply, val, ref}) {:ok, sl, []} end @@ -290,22 +256,22 @@ defmodule ClusterKV.Ring do def handle_info( {:nodeup, node, info}, _, - %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl + %SL{data: %Ring{name: _n, node: _me, db: _db, ring: r, replicas: _repls} = data} = sl ) do Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}") ring = HashRing.add_node(r, node) - sync_db(n, me, db, ring, repls) + # sync_db(n, me, db, ring, repls) {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]} end def handle_info( {:nodedown, node, info}, _, - %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl + %SL{data: %Ring{name: _n, node: _me, db: _db, ring: r, replicas: _repls} = data} = sl ) do Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}") ring = HashRing.remove_node(r, node) - sync_db(n, me, db, ring, repls) + # sync_db(n, me, db, ring, repls) {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]} end @@ -320,47 +286,17 @@ defmodule ClusterKV.Ring do send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me}) end - defp get_node(key, r, node, repls) do - nodes = HashRing.key_to_nodes(r, key, repls) - - case node in nodes do - true -> node - false -> Enum.random(nodes) - end - end - - defp do_stream(db) do - db - |> DB.stream() - |> Stream.take_while(fn - :"$end_of_table" -> false - _ -> true - end) - |> Stream.map(fn key -> - DB.get(db, key) - end) - end + def send_wildcard(parts, header, wildcard, join, value, name, ring, repls) do + parts + |> Enum.with_index() + |> Enum.take_while(fn + {^wildcard, _} -> + true - defp sync_db(name, me, db, ring, repls) do - Task.start(fn -> - db - |> do_stream() - |> Stream.chunk_every(10) - |> Stream.each(fn chunk -> - Enum.each(chunk, fn {key, v} -> - nodes = - ring - |> HashRing.key_to_nodes(key, repls) - |> Enum.filter(fn - ^me -> false - _ -> true - end) - - Logger.debug("Sync: #{key}: #{inspect(v)}") - send_sync(name, nodes, key, v) - end) - end) - |> Stream.run() + {k, i} -> + k = Enum.join([header, k, i], join) + nodes = HashRing.key_to_nodes(ring, k, repls) + send_sync(name, nodes, k, {parts, value}) end) end @@ -379,6 +315,7 @@ defmodule ClusterKV.Ring do defp get_final_value(res, val) when is_list(res), do: [val | res] + |> List.flatten() |> Enum.filter(fn :not_found -> false _ -> true @@ -407,7 +344,7 @@ defmodule ClusterKV.Ring do defp send_sync(name, nodes, key, value) do Enum.each(nodes, fn n -> - send({name, n}, {:sync, [{key, value}]}) + send({name, n}, {:sync, key, value}) end) end @@ -416,4 +353,13 @@ defmodule ClusterKV.Ring do HashRing.add_node(acc, n) end) end + + defp get_node(key, r, node, repls) do + nodes = HashRing.key_to_nodes(r, key, repls) + + case node in nodes do + true -> node + false -> Enum.random(nodes) + end + end end