node = get_node(key, r, me, repls)
ref = make_ref()
send({n, node}, {:get_key, key, ref, me})
- {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, 1, []} | reqs]}}, []}
+ {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, [node], []} | reqs]}}, []}
end
def handle_call({:get, _key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
parts = String.split(key, split_on)
head = Enum.slice(parts, 0..(min - 1))
itr = Enum.slice(parts, min..-2)
- get_prefix(head, r, me, repls, n, ref)
+ node = get_prefix(head, r, me, repls, n, ref)
- Enum.reduce(itr, head, fn ns, acc ->
- next = acc ++ [ns]
- get_prefix(next, r, me, repls, n, ref)
- next
- end)
+ {_, nodes} =
+ Enum.reduce(itr, {head, [node]}, fn ns, {acc, nodes} ->
+ next = acc ++ [ns]
+ node = get_prefix(next, r, me, repls, n, ref)
+ {next, [node | nodes]}
+ end)
- {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(itr) + 1, []} | reqs]}}, []}
+ {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, nodes, []} | reqs]}}, []}
end
def handle_call({:prefix, _, _, _}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
parts = String.split(parts, split_on)
- Enum.each(Enum.with_index(parts), fn {k, i} ->
- k = Enum.join([header, k, i], join)
- get_wildcard(k, parts, wildcard, r, me, repls, n, ref)
- end)
+ nodes =
+ Enum.map(Enum.with_index(parts), fn {k, i} ->
+ k = Enum.join([header, k, i], join)
+ get_wildcard(k, parts, wildcard, r, me, repls, n, ref)
+ end)
- {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(parts), []} | reqs]}}, []}
+ {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, nodes, []} | reqs]}}, []}
end
def handle_call(_, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
{:ok, sl, []}
end
- def handle_info({:get_key, key, ref, node}, _, %SL{data: %Ring{name: n, db: db}} = sl) do
+ def handle_info({:get_key, key, ref, node}, _, %SL{data: %Ring{name: n, db: db, node: me}} = sl) do
val = DB.get(db, key)
- send({n, node}, {:reply, val, ref})
+ send({n, node}, {:reply, val, ref, me})
{:ok, sl, []}
end
def handle_info(
{:get_wildcard, key, parts, wildcard, ref, node},
@ready,
- %SL{data: %Ring{name: n, db: db}} = sl
+ %SL{data: %Ring{name: n, db: db, node: me}} = sl
) do
val =
case DB.get(db, key) do
end)
end
- send({n, node}, {:reply, val, ref})
+ send({n, node}, {:reply, val, ref, me})
{:ok, sl, []}
end
- def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do
- {requests, actions} = maybe_reply(ref, reqs, val)
+ def handle_info({:reply, val, ref, from}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do
+ {requests, actions} = maybe_reply(ref, reqs, val, from)
{:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions}
end
def handle_info(
{:nodedown, node, info},
_,
- %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl
+ %SL{
+ data: %Ring{name: n, requests: reqs, node: me, db: db, ring: r, replicas: repls} = data
+ } = sl
) do
Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}")
ring = HashRing.remove_node(r, node)
redistribute_data(n, me, db, r, ring, repls)
- {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
+ reqs = remove_node_from_requests(reqs, node)
+
+ {requests, actions} =
+ Enum.reduce(reqs, {reqs, [{:next_event, :internal, :node_down}]}, fn {ref, _fr, _nodes,
+ _res},
+ {rs, acts} ->
+ {requests, actions} = maybe_reply(ref, rs, :not_found, node)
+ {requests, actions ++ acts}
+ end)
+
+ {:ok, %SL{sl | data: %Ring{data | ring: ring, requests: requests}}, actions}
end
def handle_info({:handle_batch, batch}, _, %SL{data: %Ring{db: db}} = sl) do
prefix = Enum.join(next, ".")
node = get_node(prefix, r, me, repls)
send({n, node}, {:get_key, prefix, ref, me})
+ node
end
defp get_wildcard(key, parts, wildcard, r, me, repls, n, ref) do
node = get_node(key, r, me, repls)
send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me})
+ node
end
def send_batch(name, batches) do
- Logger.info("Sending batches to: #{inspect(Map.keys(batches))}")
+ Logger.debug("Sending batches to: #{inspect(Map.keys(batches))}")
Enum.each(batches, fn {node, batch} ->
- Logger.info("Sending #{length(batch)} reccords to #{inspect(node)}")
+ Logger.debug("Sending #{length(batch)} reccords to #{inspect(node)}")
send({name, node}, {:handle_batch, batch})
end)
end
end)
end
- defp maybe_reply(ref, reqs, val) do
+ defp maybe_reply(ref, reqs, val, node) do
case get_request(ref, reqs) do
- {_, from, 1, res} ->
+ {_, from, [_], res} ->
+ {remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]}
+
+ {_, from, [], res} ->
{remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]}
_ ->
- {decrement_request(ref, reqs, val), []}
+ {decrement_request(ref, reqs, val, node), []}
end
end
end)
end
- defp decrement_request(ref, reqs, val) do
+ defp remove_node_from_requests(reqs, node) do
+ Enum.map(reqs, fn {ref, fr, nodes, res} ->
+ {ref, fr, List.delete(nodes, node), res}
+ end)
+ end
+
+ defp decrement_request(ref, reqs, val, from) do
Enum.map(reqs, fn
- {^ref, f, c, res} -> {ref, f, c - 1, [val | res]}
+ {^ref, f, c, res} -> {ref, f, List.delete(c, from), [val | res]}
other -> other
end)
end