alias __MODULE__
- @db_options [:set]
+ @db_options [:set, :public]
defstruct [:db]
GenServer.cast(name, {:put, key, value})
end
- @spec put_or_update(name :: module(), key :: binary(), value :: term()) :: :ok
- def put_or_update(name, key, value) do
- GenServer.cast(name, {:put_or_update, key, value})
+ @spec upsert(name :: module(), key :: binary(), value :: term(), fun()) :: :ok
+ def upsert(name, key, value, fun \\ &{elem(&1, 0), Enum.uniq([&2 | elem(&1, 1)])}) do
+ GenServer.cast(name, {:upsert, key, value, fun})
end
def stream(name) do
end
def handle_call(:stream, _from, %DB{db: db} = state) do
- {:reply, Stream.iterate(:ets.first(db), &:ets.next(db, &1)), state}
+ {:reply, get_stream(db), state}
end
def handle_call({:get, key}, _from, %DB{db: db} = state) do
- {:reply,
- case :ets.lookup(db, key) do
- [] -> :not_found
- [other] -> other
- end, state}
+ {:reply, do_get(db, key), state}
end
def handle_cast({:put, key, value}, %DB{db: db} = state) do
- :ets.insert(db, {key, value})
+ :ets.insert(db, {key, [value]})
{:noreply, state}
end
- def handle_cast({:put_or_update, key, value}, %DB{db: db} = state) do
+ def handle_cast({:upsert, key, value, fun}, %DB{db: db} = state) do
case :ets.lookup(db, key) do
- [] -> :ets.insert(db, {key, value})
- [{_, val}] when is_list(val) -> :ets.insert(db, {key, [value | val]})
- [{_, val}] -> :ets.insert(db, {key, [value, val]})
+ [] ->
+ :ets.insert(db, {key, [value]})
+
+ [{_, val} = old] when is_list(val) ->
+ new = fun.(old, value)
+ 1 = :ets.select_replace(db, [{old, [], [{:const, new}]}])
end
{:noreply, state}
end
+
+ defp do_get(db, key) do
+ case :ets.lookup(db, key) do
+ [] -> :not_found
+ [other] -> other
+ end
+ end
+
+ defp get_stream(db) do
+ db
+ |> :ets.first()
+ |> Stream.iterate(&:ets.next(db, &1))
+ |> Stream.take_while(fn
+ :"$end_of_table" -> false
+ _ -> true
+ end)
+ |> Stream.map(fn key ->
+ do_get(db, key)
+ end)
+ end
end
@ready,
%SL{data: %Ring{db: db}} = sl
) do
- {:ok, sl, [{:reply, from, do_stream(db)}]}
+ {:ok, sl, [{:reply, from, DB.stream(db)}]}
end
def handle_call(
%SL{data: %Ring{name: n, node: me, requests: reqs, ring: r, replicas: repls} = data} = sl
) do
node = get_node(key, r, me, repls)
-
ref = make_ref()
send({n, node}, {:get_key, key, ref, me})
{:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, 1, []} | reqs]}}, []}
@ready,
%SL{data: %Ring{name: n, ring: r, replicas: repls}} = sl
) do
- parts =
- key
- |> String.split(split_on)
-
- Enum.take_while(Enum.with_index(parts), fn {k, i} ->
- case k == wildcard do
- true ->
- true
-
- false ->
- k = Enum.join([header, k, i], join)
- nodes = HashRing.key_to_nodes(r, k, repls)
- send_sync(n, nodes, k, {parts, value})
- false
- end
- end)
+ key
+ |> String.split(split_on)
+ |> send_wildcard(header, wildcard, join, value, n, r, repls)
{:ok, sl, []}
end
def handle_cast(_, _, sl), do: {:ok, sl, []}
- def handle_info({:sync, keys}, @ready, %SL{data: %Ring{db: db}} = sl) do
- Enum.each(keys, fn {key, value} ->
- DB.put_or_update(db, key, value)
- end)
-
+ def handle_info({:sync, key, value}, @ready, %SL{data: %Ring{db: db}} = sl) do
+ DB.upsert(db, key, value)
{:ok, sl, []}
end
- def handle_info({:sync, _keys}, _, sl), do: {:ok, sl, []}
+ def handle_info({:sync, _key, _v}, _, sl), do: {:ok, sl, []}
def handle_info({:get_key, key, ref, node}, @ready, %SL{data: %Ring{name: n, db: db}} = sl) do
val = DB.get(db, key)
@ready,
%SL{data: %Ring{name: n, db: db}} = sl
) do
- vals = DB.get(db, key)
- Logger.info("Got Wildcard Value: #{inspect(vals)}")
-
- vals =
- case vals do
+ val =
+ case DB.get(db, key) do
:not_found ->
:not_found
- vals ->
+ {_k, vals} ->
parts = Enum.with_index(parts)
- {_k, vals} = vals
-
- vals =
- case vals do
- v when is_list(v) -> v
- v -> [v]
- end
Enum.filter(vals, fn {p, _val} ->
Logger.info("Checking wilcard vals #{inspect(p)} against #{inspect(parts)}")
Enum.all?(parts, fn {k, i} ->
case Enum.at(p, i) do
^k -> true
- w when w == wildcard -> true
+ ^wildcard -> true
_ -> false
end
end)
end)
end
- vals =
- case vals do
- [] -> :not_found
- [o] -> o
- o -> o
- end
-
- send({n, node}, {:reply, vals, ref})
+ send({n, node}, {:reply, val, ref})
{:ok, sl, []}
end
def handle_info(
{:nodeup, node, info},
_,
- %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl
+ %SL{data: %Ring{name: _n, node: _me, db: _db, ring: r, replicas: _repls} = data} = sl
) do
Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
ring = HashRing.add_node(r, node)
- sync_db(n, me, db, ring, repls)
+ # sync_db(n, me, db, ring, repls)
{:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]}
end
def handle_info(
{:nodedown, node, info},
_,
- %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl
+ %SL{data: %Ring{name: _n, node: _me, db: _db, ring: r, replicas: _repls} = data} = sl
) do
Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
ring = HashRing.remove_node(r, node)
- sync_db(n, me, db, ring, repls)
+ # sync_db(n, me, db, ring, repls)
{:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
end
send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me})
end
- defp get_node(key, r, node, repls) do
- nodes = HashRing.key_to_nodes(r, key, repls)
-
- case node in nodes do
- true -> node
- false -> Enum.random(nodes)
- end
- end
-
- defp do_stream(db) do
- db
- |> DB.stream()
- |> Stream.take_while(fn
- :"$end_of_table" -> false
- _ -> true
- end)
- |> Stream.map(fn key ->
- DB.get(db, key)
- end)
- end
+ def send_wildcard(parts, header, wildcard, join, value, name, ring, repls) do
+ parts
+ |> Enum.with_index()
+ |> Enum.take_while(fn
+ {^wildcard, _} ->
+ true
- defp sync_db(name, me, db, ring, repls) do
- Task.start(fn ->
- db
- |> do_stream()
- |> Stream.chunk_every(10)
- |> Stream.each(fn chunk ->
- Enum.each(chunk, fn {key, v} ->
- nodes =
- ring
- |> HashRing.key_to_nodes(key, repls)
- |> Enum.filter(fn
- ^me -> false
- _ -> true
- end)
-
- Logger.debug("Sync: #{key}: #{inspect(v)}")
- send_sync(name, nodes, key, v)
- end)
- end)
- |> Stream.run()
+ {k, i} ->
+ k = Enum.join([header, k, i], join)
+ nodes = HashRing.key_to_nodes(ring, k, repls)
+ send_sync(name, nodes, k, {parts, value})
end)
end
defp get_final_value(res, val) when is_list(res),
do:
[val | res]
+ |> List.flatten()
|> Enum.filter(fn
:not_found -> false
_ -> true
defp send_sync(name, nodes, key, value) do
Enum.each(nodes, fn n ->
- send({name, n}, {:sync, [{key, value}]})
+ send({name, n}, {:sync, key, value})
end)
end
HashRing.add_node(acc, n)
end)
end
+
+ defp get_node(key, r, node, repls) do
+ nodes = HashRing.key_to_nodes(r, key, repls)
+
+ case node in nodes do
+ true -> node
+ false -> Enum.random(nodes)
+ end
+ end
end