@handle_anti_entropy "HandleAntiEntropy"
def put(name, key, value) do
- Logger.debug("PUT - #{key}: #{inspect(value)}")
:gen_statem.cast(ClusterKV.ring_name(name), {:put, key, value})
end
def get(name, key, timeout \\ 5000) do
- Logger.debug("GET - #{key}")
:gen_statem.call(ClusterKV.ring_name(name), {:get, key}, timeout)
end
def prefix(name, key, split_on, min) do
- Logger.debug("PREFIX - #{key} / #{inspect(split_on)} : #{min}")
:gen_statem.call(ClusterKV.ring_name(name), {:prefix, key, split_on, min})
end
parts = String.split(key, split_on)
head = Enum.slice(parts, 0..(min - 1))
itr = Enum.slice(parts, min..-2)
- get_prefix(head, key, r, me, repls, n, ref)
+ get_prefix(head, r, me, repls, n, ref)
Enum.reduce(itr, head, fn ns, acc ->
next = acc ++ [ns]
- get_prefix(next, key, r, me, repls, n, ref)
+ get_prefix(next, r, me, repls, n, ref)
next
end)
{:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_down}]}
end
- defp get_prefix(next, key, r, me, repls, n, ref) do
+ defp get_prefix(next, r, me, repls, n, ref) do
prefix = Enum.join(next, ".")
- node = get_node(key, r, me, repls)
+ node = get_node(prefix, r, me, repls)
send({n, node}, {:get_key, prefix, ref, me})
end
- defp get_node(key, r, _node, repls) do
+ defp get_node(key, r, node, repls) do
nodes = HashRing.key_to_nodes(r, key, repls)
- Enum.random(nodes)
- # case node in nodes do
- # true -> node
- # false -> Enum.random(nodes)
- # end
+
+ case node in nodes do
+ true -> node
+ false -> Enum.random(nodes)
+ end
end
defp do_stream(db) do
defp send_sync(name, nodes, key, value) do
Enum.each(nodes, fn n ->
- Logger.info("Sending #{key} to #{name} on #{n}")
send({name, n}, {:sync, [{key, value}]})
end)
end