children = [
{DB, [name: db]},
- {Cluster.Supervisor, [topologies, [name: cluster_supervisor]]},
- {Ring, [{:local, ring}, ring_data, []]}
+ {Ring, [{:local, ring}, ring_data, []]},
+ {Cluster.Supervisor, [topologies, [name: cluster_supervisor]]}
]
Supervisor.init(children, strategy: :rest_for_one, max_restarts: 5, max_seconds: 10)
:ring,
:db,
:node,
- init: true,
requests: [],
anti_entropy_interval: 5 * 60_000
]
quorum: integer(),
db: module(),
node: node(),
- init: boolean(),
ring: HashRing.t(),
anti_entropy_interval: integer()
}
@handle_ready,
_,
@ready,
- %SL{data: %Ring{init: false}} = sl
+ sl
) do
Logger.info("Ready")
{:ok, sl, []}
end
- def handle_resource(
- @handle_ready,
- _,
- @ready,
- %SL{data: %Ring{init: true, name: name, node: me} = data} = sl
- ) do
- Logger.info("Ready")
- nodes = Node.list()
-
- Enum.each(nodes, fn n ->
- send({name, n}, {:init_node, me})
- end)
-
- {:ok, %SL{sl | data: %Ring{data | init: false}}, []}
- end
-
def handle_resource(@handle_node_up, _, _, sl) do
{:ok, sl, [{:next_event, :internal, :node_added}]}
end
def handle_info(
{:nodeup, node, info},
_,
- %SL{data: %Ring{ring: r} = data} = sl
+ %SL{data: %Ring{name: n, db: db, ring: r, replicas: repls} = data} = sl
) do
Logger.info("Nodeup: #{inspect(node)} - #{inspect(info)}")
ring = HashRing.add_node(r, node)
- # wait for node to be ready for data, they will send us an :init_node message
+ redistribute_data(n, db, r, ring, repls)
{:ok, %SL{sl | data: %Ring{data | ring: ring}}, [{:next_event, :internal, :node_up}]}
end
{:ok, sl, []}
end
- def handle_info({:init_node, node}, _, %SL{data: %Ring{name: n, ring: r, db: db}} = sl) do
- batch =
- db
- |> DB.stream()
- |> Enum.reduce(%{}, fn {k, v}, acc ->
- d_node = HashRing.key_to_node(r, k)
-
- case d_node do
- ^node -> Map.update(acc, node, [{k, v}], &[{k, v} | &1])
- _ -> acc
- end
- end)
-
- send_batch(n, batch)
- {:ok, sl, []}
- end
-
def handle_info(e, s, sl) do
Logger.info("Unknown Info Event: #{inspect(e)} in state #{s} with data #{inspect(sl)}")
{:ok, sl, []}