From bd8d01d2760121699f65cc6f756c5d48860583a0 Mon Sep 17 00:00:00 2001 From: Christopher Date: Mon, 9 Mar 2020 09:31:40 -0500 Subject: [PATCH] track nodes in requests so we can remove from current requests if node goes down --- lib/cluster_kv/ring.ex | 81 +++++++++++++++++++++++++++--------------- 1 file changed, 53 insertions(+), 28 deletions(-) diff --git a/lib/cluster_kv/ring.ex b/lib/cluster_kv/ring.ex index e2442c9..f189fc0 100644 --- a/lib/cluster_kv/ring.ex +++ b/lib/cluster_kv/ring.ex @@ -136,7 +136,7 @@ defmodule ClusterKV.Ring do node = get_node(key, r, me, repls) ref = make_ref() send({n, node}, {:get_key, key, ref, me}) - {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, 1, []} | reqs]}}, []} + {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, [node], []} | reqs]}}, []} end def handle_call({:get, _key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} @@ -151,15 +151,16 @@ defmodule ClusterKV.Ring do parts = String.split(key, split_on) head = Enum.slice(parts, 0..(min - 1)) itr = Enum.slice(parts, min..-2) - get_prefix(head, r, me, repls, n, ref) + node = get_prefix(head, r, me, repls, n, ref) - Enum.reduce(itr, head, fn ns, acc -> - next = acc ++ [ns] - get_prefix(next, r, me, repls, n, ref) - next - end) + {_, nodes} = + Enum.reduce(itr, {head, [node]}, fn ns, {acc, nodes} -> + next = acc ++ [ns] + node = get_prefix(next, r, me, repls, n, ref) + {next, [node | nodes]} + end) - {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(itr) + 1, []} | reqs]}}, []} + {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, nodes, []} | reqs]}}, []} end def handle_call({:prefix, _, _, _}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} @@ -176,12 +177,13 @@ defmodule ClusterKV.Ring do parts = String.split(parts, split_on) - Enum.each(Enum.with_index(parts), fn {k, i} -> - k = Enum.join([header, k, i], join) - get_wildcard(k, parts, wildcard, r, me, repls, n, ref) - end) + nodes = + Enum.map(Enum.with_index(parts), fn {k, i} -> + k = Enum.join([header, k, i], join) + get_wildcard(k, parts, wildcard, r, me, repls, n, ref) + end) - {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, length(parts), []} | reqs]}}, []} + {:ok, %SL{sl | data: %Ring{data | requests: [{ref, from, nodes, []} | reqs]}}, []} end def handle_call(_, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]} @@ -235,16 +237,16 @@ defmodule ClusterKV.Ring do {:ok, sl, []} end - def handle_info({:get_key, key, ref, node}, _, %SL{data: %Ring{name: n, db: db}} = sl) do + def handle_info({:get_key, key, ref, node}, _, %SL{data: %Ring{name: n, db: db, node: me}} = sl) do val = DB.get(db, key) - send({n, node}, {:reply, val, ref}) + send({n, node}, {:reply, val, ref, me}) {:ok, sl, []} end def handle_info( {:get_wildcard, key, parts, wildcard, ref, node}, @ready, - %SL{data: %Ring{name: n, db: db}} = sl + %SL{data: %Ring{name: n, db: db, node: me}} = sl ) do val = case DB.get(db, key) do @@ -265,12 +267,12 @@ defmodule ClusterKV.Ring do end) end - send({n, node}, {:reply, val, ref}) + send({n, node}, {:reply, val, ref, me}) {:ok, sl, []} end - def handle_info({:reply, val, ref}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do - {requests, actions} = maybe_reply(ref, reqs, val) + def handle_info({:reply, val, ref, from}, @ready, %SL{data: %Ring{requests: reqs} = data} = sl) do + {requests, actions} = maybe_reply(ref, reqs, val, from) {:ok, %SL{sl | data: %Ring{data | requests: requests}}, actions} end @@ -288,12 +290,24 @@ defmodule ClusterKV.Ring do def handle_info( {:nodedown, node, info}, _, - %SL{data: %Ring{name: n, node: me, db: db, ring: r, replicas: repls} = data} = sl + %SL{ + data: %Ring{name: n, requests: reqs, node: me, db: db, ring: r, replicas: repls} = data + } = sl ) do Logger.info("Nodedown: #{inspect(node)} - #{inspect(info)}") ring = HashRing.remove_node(r, node) redistribute_data(n, me, db, r, ring, repls) - {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]} + reqs = remove_node_from_requests(reqs, node) + + {requests, actions} = + Enum.reduce(reqs, {reqs, [{:next_event, :internal, :node_down}]}, fn {ref, _fr, _nodes, + _res}, + {rs, acts} -> + {requests, actions} = maybe_reply(ref, rs, :not_found, node) + {requests, actions ++ acts} + end) + + {:ok, %SL{sl | data: %Ring{data | ring: ring, requests: requests}}, actions} end def handle_info({:handle_batch, batch}, _, %SL{data: %Ring{db: db}} = sl) do @@ -311,18 +325,20 @@ defmodule ClusterKV.Ring do prefix = Enum.join(next, ".") node = get_node(prefix, r, me, repls) send({n, node}, {:get_key, prefix, ref, me}) + node end defp get_wildcard(key, parts, wildcard, r, me, repls, n, ref) do node = get_node(key, r, me, repls) send({n, node}, {:get_wildcard, key, parts, wildcard, ref, me}) + node end def send_batch(name, batches) do - Logger.info("Sending batches to: #{inspect(Map.keys(batches))}") + Logger.debug("Sending batches to: #{inspect(Map.keys(batches))}") Enum.each(batches, fn {node, batch} -> - Logger.info("Sending #{length(batch)} reccords to #{inspect(node)}") + Logger.debug("Sending #{length(batch)} reccords to #{inspect(node)}") send({name, node}, {:handle_batch, batch}) end) end @@ -364,13 +380,16 @@ defmodule ClusterKV.Ring do end) end - defp maybe_reply(ref, reqs, val) do + defp maybe_reply(ref, reqs, val, node) do case get_request(ref, reqs) do - {_, from, 1, res} -> + {_, from, [_], res} -> + {remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]} + + {_, from, [], res} -> {remove_request(ref, reqs), [{:reply, from, get_final_value(res, val)}]} _ -> - {decrement_request(ref, reqs, val), []} + {decrement_request(ref, reqs, val, node), []} end end @@ -392,9 +411,15 @@ defmodule ClusterKV.Ring do end) end - defp decrement_request(ref, reqs, val) do + defp remove_node_from_requests(reqs, node) do + Enum.map(reqs, fn {ref, fr, nodes, res} -> + {ref, fr, List.delete(nodes, node), res} + end) + end + + defp decrement_request(ref, reqs, val, from) do Enum.map(reqs, fn - {^ref, f, c, res} -> {ref, f, c - 1, [val | res]} + {^ref, f, c, res} -> {ref, f, List.delete(c, from), [val | res]} other -> other end) end -- 2.45.3