# States
@init "Init"
@ready "Ready"
- @quorum_node_up "QuorumNodeUp"
- @quorum_node_down "QuorumNodeDown"
- @ready_node_up "ReadyNodeUp"
- @ready_node_down "ReadyNodeDown"
@quorum "Quorum"
@anti_entropy "AntiEntropy"
# Resources
@handle_init "HandleInit"
@handle_ready "HandleReady"
- @handle_node_up "HandleNodeUp"
@await_quorum "AwaitQuorum"
- @handle_node_down "HandleNodeDown"
@handle_anti_entropy "HandleAntiEntropy"
def put(name, key, value) do
{:ok, sl, actions}
end
- def handle_resource(@handle_ready, _, @ready, sl) do
+ def handle_resource(@handle_ready, _, @ready, %SL{data: %Ring{name: name, node: me}} = sl) do
Logger.info("Ready")
- {:ok, sl, []}
- end
+ nodes = Node.list()
- def handle_resource(@handle_node_up, _, _, sl) do
- {:ok, sl, [{:next_event, :internal, :node_added}]}
- end
+ Enum.each(nodes, fn n ->
+ send({name, n}, {:init_node, me})
+ end)
- def handle_resource(@handle_node_down, _, _, sl) do
- {:ok, sl, [{:next_event, :internal, :node_removed}]}
+ {:ok, sl, []}
end
def handle_call(
def handle_info(
{:nodeup, node, info},
_,
- %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl
+ %SL{data: %Ring{ring: r} = data} = sl
) do
Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
ring = HashRing.add_node(r, node)
- redistribute_data(n, db, r, ring, repls)
+ # wait for node to be ready for data, they will send us an :init_node message
{:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]}
end
{:ok, sl, []}
end
+ def handle_info({:init_node, node}, _, %SL{data: %Ring{name: n, ring: r, db: db}}) do
+ batch =
+ db
+ |> DB.stream()
+ |> Enum.reduce(%{}, fn {k, v}, acc ->
+ d_node = HashRing.key_to_node(r, k)
+
+ case d_node do
+ ^node -> Map.update(acc, node, [{k, v}], &[{k, v}, &1])
+ _ -> acc
+ end
+ end)
+
+ send_batch(n, batch)
+ end
+
def handle_info(e, s, sl) do
Logger.info("Unknown Info Event: #{inspect(e)} in state #{s} with data #{inspect(sl)}")
{:ok, sl, []}
"Type": "Choice",
"Resource": "HandleReady",
"Choices": [
- {
- "StringEquals": ":node_up",
- "Next": "ReadyNodeUp"
- },
- {
- "StringEquals": ":node_down",
- "Next": "ReadyNodeDown"
- },
{
"StringEquals": ":anti_entropy",
"Next": "AntiEntropy"
"InputPath": "",
"OutputPath": ""
},
- "QuorumNodeUp": {
- "Type": "Task",
- "Resource": "HandleNodeUp",
- "TransitionEvent": ":node_added",
- "Catch": [],
- "InputPath": "",
- "OutputPath": "",
- "ResourcePath": "",
- "Next": "Quorum",
- "End": false
- },
- "ReadyNodeUp": {
- "Type": "Task",
- "Resource": "HandleNodeUp",
- "TransitionEvent": ":node_added",
- "Catch": [],
- "InputPath": "",
- "OutputPath": "",
- "ResourcePath": "",
- "Next": "Ready",
- "End": false
- },
"Quorum": {
"Type": "Choice",
"Resource": "AwaitQuorum",
{
"StringEquals": ":quorum",
"Next": "Ready"
- },
- {
- "StringEquals": ":node_up",
- "Next": "QuorumNodeUp"
- },
- {
- "StringEquals": ":node_down",
- "Next": "QuorumNodeDown"
}
],
"InputPath": "",
"OutputPath": ""
},
- "ReadyNodeDown": {
- "Type": "Task",
- "Resource": "HandleNodeDown",
- "TransitionEvent": ":node_removed",
- "Catch": [],
- "InputPath": "",
- "OutputPath": "",
- "ResourcePath": "",
- "Next": "Quorum",
- "End": false
- },
- "QuorumNodeDown": {
- "Type": "Task",
- "Resource": "HandleNodeDown",
- "TransitionEvent": ":node_removed",
- "Catch": [],
- "InputPath": "",
- "OutputPath": "",
- "ResourcePath": "",
- "Next": "Quorum",
- "End": false
- },
"AntiEntropy": {
"Type": "Task",
"Resource": "HandleAntiEntropy",