:quorum,
:ring,
:db,
+ :node,
requests: [],
anti_entropy_interval: 5 * 60_000
]
replicas: integer(),
quorum: integer(),
db: module(),
+ node: node(),
ring: HashRing.t(),
anti_entropy_interval: integer()
}
end
def handle_resource(@handle_init, _, @init, %SL{data: %Ring{} = data} = sl) do
+ node = Node.self()
+
ring =
- Node.self()
+ node
|> HashRing.new()
|> get_existing_nodes()
:net_kernel.monitor_nodes(true, node_type: :all)
- {:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :initialized}]}
+
+ {:ok, %SL{sl | data: %Ring{data | node: node, ring: ring}},
+ [{:next_event, :internal, :initialized}]}
end
def handle_resource(@await_quorum, _, @quorum, %SL{data: %Ring{quorum: q, ring: r}} = sl) do
{:get, key},
from,
@ready,
- %SL{data: %Ring{name: n, requests: reqs, ring: r} = data} = sl
+ %SL{data: %Ring{name: n, node: node, requests: reqs, ring: r, replicas: repls} = data} =
+ sl
) do
- node = HashRing.key_to_node(r, key)
+ nodes = HashRing.key_to_nodes(r, key, repls)
+
+ node =
+ case node in nodes do
+ true -> node
+ false -> Enum.random(nodes)
+ end
+
ref = make_ref()
- dest = {n, node}
- send(dest, {:get_key, key, ref, Node.self()})
+ send({n, node}, {:get_key, key, ref, Node.self()})
{:ok, %SL{sl | data: %Ring{data | requests: [{ref, from} | reqs]}}, []}
end
+ def handle_call({:get, key}, from, _, sl), do: {:ok, sl, [{:reply, from, :no_quorum}]}
+
def handle_cast(
{:put, key, value},
@ready,
{:ok, sl, []}
end
+ def handle_cast({:put, key, value}, _, sl), do: {:ok, sl, []}
+
def handle_cast({:sync, keys}, @ready, %SL{data: %Ring{db: db}} = sl) do
Enum.each(keys, fn {key, value} ->
DB.put(db, key, value)
{:ok, sl, []}
end
+ def handle_cast({:sync, keys}, _, sl), do: {:ok, sl, []}
+
def handle_info({:get_key, key, ref, node}, @ready, %SL{data: %Ring{name: n, db: db}} = sl) do
val = DB.get(db, key)
send({n, node}, {:reply, val, ref})