diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index ce5c5368ae..125cc14189 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -12,21 +12,20 @@ using namespace std::chrono_literals; /* - * bootstrap_ascending + * account_scan */ -nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano::block_processor & block_processor_a, nano::ledger & ledger_a, nano::network & network_a, nano::stats & stat_a) : +nano::bootstrap_ascending::account_scan::account_scan (const nano::bootstrap_ascending_config & config_a, nano::bootstrap_ascending::service & service_a, nano::ledger & ledger_a, nano::network_constants & network_consts_a, nano::block_processor & block_processor_a, nano::stats & stats_a) : config{ config_a }, - network_consts{ config.network_params.network }, - block_processor{ block_processor_a }, + service{ service_a }, ledger{ ledger_a }, - network{ network_a }, - stats{ stat_a }, + network_consts{ network_consts_a }, + block_processor{ block_processor_a }, + stats{ stats_a }, accounts{ stats }, iterator{ ledger.store }, throttle{ compute_throttle_size () }, - scoring{ config.bootstrap_ascending, config.network_params.network }, - database_limiter{ config.bootstrap_ascending.database_requests_limit, 1.0 } + database_limiter{ config.database_requests_limit, 1.0 } { block_processor.batch_processed.add ([this] (auto const & batch) { bool should_notify = false; @@ -50,62 +49,123 @@ nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano: }); } -nano::bootstrap_ascending::service::~service () +nano::bootstrap_ascending::account_scan::~account_scan () { // All threads must be stopped before destruction debug_assert (!thread.joinable ()); - debug_assert (!timeout_thread.joinable ()); } -void nano::bootstrap_ascending::service::start () +void nano::bootstrap_ascending::account_scan::start () { debug_assert (!thread.joinable ()); - debug_assert (!timeout_thread.joinable ()); thread = std::thread ([this] () { nano::thread_role::set (nano::thread_role::name::ascending_bootstrap); run (); }); - - timeout_thread = std::thread ([this] () { - nano::thread_role::set (nano::thread_role::name::ascending_bootstrap); - run_timeouts (); - }); } -void nano::bootstrap_ascending::service::stop () +void nano::bootstrap_ascending::account_scan::stop () { - nano::unique_lock lock{ mutex }; - stopped = true; - lock.unlock (); + { + nano::lock_guard lock{ mutex }; + stopped = true; + } condition.notify_all (); - nano::join_or_pass (thread); - nano::join_or_pass (timeout_thread); } -std::size_t nano::bootstrap_ascending::service::priority_size () const +std::size_t nano::bootstrap_ascending::account_scan::priority_size () const { nano::lock_guard lock{ mutex }; return accounts.priority_size (); } -std::size_t nano::bootstrap_ascending::service::blocked_size () const +std::size_t nano::bootstrap_ascending::account_scan::blocked_size () const { nano::lock_guard lock{ mutex }; return accounts.blocked_size (); } -std::size_t nano::bootstrap_ascending::service::score_size () const +void nano::bootstrap_ascending::account_scan::process (const nano::asc_pull_ack::blocks_payload & response, const account_scan::tag & tag) +{ + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply); + + auto result = tag.verify (response); + switch (result) + { + using enum account_scan::tag::verify_result; + + case ok: + { + stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in, response.blocks.size ()); + + for (auto & block : response.blocks) + { + block_processor.add (block, nano::block_processor::block_source::bootstrap); + } + + nano::lock_guard lock{ mutex }; + throttle.add (true); + } + break; + case nothing_new: + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::nothing_new); + + nano::lock_guard lock{ mutex }; + accounts.priority_down (tag.account); + throttle.add (false); + } + break; + case invalid: + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::invalid); + // TODO: Log + } + break; + } +} + +void nano::bootstrap_ascending::account_scan::cleanup () { nano::lock_guard lock{ mutex }; - return scoring.size (); + throttle.resize (compute_throttle_size ()); +} + +void nano::bootstrap_ascending::account_scan::run () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + lock.unlock (); + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop); // TODO: Change stat type + run_one (); + lock.lock (); + throttle_if_needed (lock); + } +} + +void nano::bootstrap_ascending::account_scan::run_one () +{ + // Ensure there is enough space in blockprocessor for queuing new blocks + wait_blockprocessor (); + + // Waits for account either from priority queue or database + auto account = wait_available_account (); + if (account.is_zero ()) + { + return; + } + + account_scan::tag tag{ {}, account }; + service.request (tag); } /** Inspects a block that has been processed by the block processor - Marks an account as blocked if the result code is gap source as there is no reason request additional blocks for this account until the dependency is resolved - Marks an account as forwarded if it has been recently referenced by a block that has been inserted. */ -void nano::bootstrap_ascending::service::inspect (store::transaction const & tx, nano::process_return const & result, nano::block const & block) +void nano::bootstrap_ascending::account_scan::inspect (store::transaction const & tx, nano::process_return const & result, nano::block const & block) { debug_assert (!mutex.try_lock ()); @@ -172,27 +232,7 @@ void nano::bootstrap_ascending::service::inspect (store::transaction const & tx, } } -void nano::bootstrap_ascending::service::wait_blockprocessor () -{ - nano::unique_lock lock{ mutex }; - while (!stopped && block_processor.half_full ()) - { - condition.wait_for (lock, 500ms, [this] () { return stopped; }); // Blockprocessor is relatively slow, sleeping here instead of using conditions - } -} - -std::shared_ptr nano::bootstrap_ascending::service::wait_available_channel () -{ - std::shared_ptr channel; - nano::unique_lock lock{ mutex }; - while (!stopped && !(channel = scoring.channel ())) - { - condition.wait_for (lock, 100ms, [this] () { return stopped; }); - } - return channel; -} - -nano::account nano::bootstrap_ascending::service::available_account () +nano::account nano::bootstrap_ascending::account_scan::available_account () { { auto account = accounts.next (); @@ -217,7 +257,7 @@ nano::account nano::bootstrap_ascending::service::available_account () return { 0 }; } -nano::account nano::bootstrap_ascending::service::wait_available_account () +nano::account nano::bootstrap_ascending::account_scan::wait_available_account () { nano::unique_lock lock{ mutex }; while (!stopped) @@ -236,87 +276,197 @@ nano::account nano::bootstrap_ascending::service::wait_available_account () return { 0 }; } -auto nano::bootstrap_ascending::service::wait_next () -> std::optional +void nano::bootstrap_ascending::account_scan::wait_blockprocessor () { - // Waits for account either from priority queue or database - auto account = wait_available_account (); - if (account.is_zero ()) + nano::unique_lock lock{ mutex }; + while (!stopped && block_processor.half_full ()) { - return {}; + condition.wait_for (lock, 500ms, [this] () { return stopped; }); // Blockprocessor is relatively slow, sleeping here instead of using conditions } +} - account_scan::tag strategy{ {}, account }; - return strategy; +void nano::bootstrap_ascending::account_scan::throttle_if_needed (nano::unique_lock & lock) +{ + debug_assert (lock.owns_lock ()); + if (!iterator.warmup () && throttle.throttled ()) + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::throttled); + condition.wait_for (lock, std::chrono::milliseconds{ config.throttle_wait }, [this] () { return stopped; }); + } } -bool nano::bootstrap_ascending::service::request (const tag_strategy_variant & strategy, std::shared_ptr & channel) +std::size_t nano::bootstrap_ascending::account_scan::compute_throttle_size () const { - async_tag tag{ strategy }; - tag.id = nano::bootstrap_ascending::generate_id (); - tag.time = std::chrono::steady_clock::now (); + // Scales logarithmically with ledger block + // Returns: config.throttle_coefficient * sqrt(block_count) + std::size_t size_new = config.throttle_coefficient * std::sqrt (ledger.cache.block_count.load ()); + return size_new == 0 ? 16 : size_new; +} - on_request.notify (tag, channel); +/** + * Verifies whether the received response is valid. Returns: + * - invalid: when received blocks do not correspond to requested hash/account or they do not make a valid chain + * - nothing_new: when received response indicates that the account chain does not have more blocks + * - ok: otherwise, if all checks pass + */ +auto nano::bootstrap_ascending::account_scan::tag::verify (const nano::asc_pull_ack::blocks_payload & response) const -> verify_result +{ + auto const & blocks = response.blocks; - nano::asc_pull_req request{ network_consts }; - request.id = tag.id; - request.type = nano::asc_pull_type::blocks; + if (blocks.empty ()) + { + return verify_result::nothing_new; + } + if (blocks.size () == 1 && blocks.front ()->hash () == start.as_block_hash ()) + { + return verify_result::nothing_new; + } - request.payload = tag.prepare_request (*this); - request.update_header (); + auto const & first = blocks.front (); + switch (type) + { + case query_type::blocks_by_hash: + { + if (first->hash () != start.as_block_hash ()) + { + // TODO: Stat & log + return verify_result::invalid; + } + } + break; + case query_type::blocks_by_account: + { + // Open & state blocks always contain account field + if (first->account () != start.as_account ()) + { + // TODO: Stat & log + return verify_result::invalid; + } + } + break; + default: + debug_assert (false, "invalid type"); + return verify_result::invalid; + } - track (tag); + // Verify blocks make a valid chain + nano::block_hash previous_hash = blocks.front ()->hash (); + for (int n = 1; n < blocks.size (); ++n) + { + auto & block = blocks[n]; + if (block->previous () != previous_hash) + { + // TODO: Stat & log + return verify_result::invalid; // Blocks do not make a chain + } + previous_hash = block->hash (); + } - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out); + return verify_result::ok; +} - // TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests - channel->send ( - request, nullptr, - nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap); +/* + * bootstrap_ascending + */ - return true; // Request sent +nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano::block_processor & block_processor_a, nano::ledger & ledger_a, nano::network & network_a, nano::stats & stats_a) : + config{ config_a }, + network_consts{ config.network_params.network }, + block_processor{ block_processor_a }, + ledger{ ledger_a }, + network{ network_a }, + stats{ stats_a }, + scoring{ config.bootstrap_ascending, config.network_params.network } +{ } -bool nano::bootstrap_ascending::service::run_one () +nano::bootstrap_ascending::service::~service () { - // Ensure there is enough space in blockprocessor for queuing new blocks - wait_blockprocessor (); + // All threads must be stopped before destruction + debug_assert (!thread.joinable ()); + debug_assert (!timeout_thread.joinable ()); +} - // Waits for channel that is not full - auto channel = wait_available_channel (); - if (!channel) - { - return false; - } - auto strategy = wait_next (); - if (!strategy) +void nano::bootstrap_ascending::service::start () +{ + debug_assert (!thread.joinable ()); + debug_assert (!timeout_thread.joinable ()); + + timeout_thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::ascending_bootstrap); + run_timeouts (); + }); +} + +void nano::bootstrap_ascending::service::stop () +{ + account_scan.stop (); { - return false; + nano::lock_guard lock{ mutex }; + stopped = true; } - bool success = request (*strategy, channel); - return success; + condition.notify_all (); + nano::join_or_pass (thread); + nano::join_or_pass (timeout_thread); } -void nano::bootstrap_ascending::service::throttle_if_needed (nano::unique_lock & lock) +std::size_t nano::bootstrap_ascending::service::score_size () const { - debug_assert (lock.owns_lock ()); - if (!iterator.warmup () && throttle.throttled ()) + nano::lock_guard lock{ mutex }; + return scoring.size (); +} + +void nano::bootstrap_ascending::service::request (const nano::bootstrap_ascending::service::tag_strategy_variant & strategy) +{ + auto channel = wait_available_channel (); + if (!channel) { - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::throttled); - condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; }); + return; } + + request (strategy, channel); } -void nano::bootstrap_ascending::service::run () +void nano::bootstrap_ascending::service::wait_next () { +} + +std::shared_ptr nano::bootstrap_ascending::service::wait_available_channel () +{ + std::shared_ptr channel; nano::unique_lock lock{ mutex }; - while (!stopped) + while (!stopped && !(channel = scoring.channel ())) { - lock.unlock (); - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop); - run_one (); - lock.lock (); - throttle_if_needed (lock); + condition.wait_for (lock, 100ms, [this] () { return stopped; }); } + return channel; +} + +bool nano::bootstrap_ascending::service::request (const tag_strategy_variant & strategy, std::shared_ptr & channel) +{ + async_tag tag{ strategy }; + tag.id = nano::bootstrap_ascending::generate_id (); + tag.time = std::chrono::steady_clock::now (); + + on_request.notify (tag, channel); + + nano::asc_pull_req request{ network_consts }; + request.id = tag.id; + request.type = nano::asc_pull_type::blocks; + + request.payload = tag.prepare_request (*this); + request.update_header (); + + track (tag); + + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out); + + // TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests + channel->send ( + request, nullptr, + nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap); + + return true; // Request sent } void nano::bootstrap_ascending::service::run_timeouts () @@ -326,7 +476,6 @@ void nano::bootstrap_ascending::service::run_timeouts () { scoring.sync (network.list ()); scoring.timeout (); - throttle.resize (compute_throttle_size ()); auto & tags_by_order = tags.get (); @@ -342,6 +491,12 @@ void nano::bootstrap_ascending::service::run_timeouts () stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::timeout); } + lock.unlock (); + + account_scan.cleanup (); + + lock.lock (); + condition.wait_for (lock, 1s, [this] () { return stopped; }); } } @@ -406,104 +561,7 @@ void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & mes void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::blocks_payload & response, const account_scan::tag & tag) { - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply); - - auto result = tag.verify (response); - switch (result) - { - using enum account_scan::tag::verify_result; - - case ok: - { - stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in, response.blocks.size ()); - - for (auto & block : response.blocks) - { - block_processor.add (block, nano::block_processor::block_source::bootstrap); - } - - nano::lock_guard lock{ mutex }; - throttle.add (true); - } - break; - case nothing_new: - { - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::nothing_new); - - nano::lock_guard lock{ mutex }; - accounts.priority_down (tag.account); - throttle.add (false); - } - break; - case invalid: - { - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::invalid); - // TODO: Log - } - break; - } -} - -/** - * Verifies whether the received response is valid. Returns: - * - invalid: when received blocks do not correspond to requested hash/account or they do not make a valid chain - * - nothing_new: when received response indicates that the account chain does not have more blocks - * - ok: otherwise, if all checks pass - */ -auto nano::bootstrap_ascending::account_scan::tag::verify (const nano::asc_pull_ack::blocks_payload & response) const -> verify_result -{ - auto const & blocks = response.blocks; - - if (blocks.empty ()) - { - return verify_result::nothing_new; - } - if (blocks.size () == 1 && blocks.front ()->hash () == start.as_block_hash ()) - { - return verify_result::nothing_new; - } - - auto const & first = blocks.front (); - switch (type) - { - case query_type::blocks_by_hash: - { - if (first->hash () != start.as_block_hash ()) - { - // TODO: Stat & log - return verify_result::invalid; - } - } - break; - case query_type::blocks_by_account: - { - // Open & state blocks always contain account field - if (first->account () != start.as_account ()) - { - // TODO: Stat & log - return verify_result::invalid; - } - } - break; - default: - debug_assert (false, "invalid type"); - return verify_result::invalid; - } - - // Verify blocks make a valid chain - nano::block_hash previous_hash = blocks.front ()->hash (); - for (int n = 1; n < blocks.size (); ++n) - { - auto & block = blocks[n]; - if (block->previous () != previous_hash) - { - // TODO: Stat & log - return verify_result::invalid; // Blocks do not make a chain - } - previous_hash = block->hash (); - } - - return verify_result::ok; + account_scan.process (response, tag); } void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::account_info_payload & response, const lazy_pulling::tag & tag) @@ -520,26 +578,12 @@ void nano::bootstrap_ascending::service::track (async_tag const & tag) tags.get ().insert (tag); } -auto nano::bootstrap_ascending::service::info () const -> nano::bootstrap_ascending::account_sets::info_t -{ - nano::lock_guard lock{ mutex }; - return accounts.info (); -} - -std::size_t nano::bootstrap_ascending::service::compute_throttle_size () const -{ - // Scales logarithmically with ledger block - // Returns: config.throttle_coefficient * sqrt(block_count) - std::size_t size_new = config.bootstrap_ascending.throttle_coefficient * std::sqrt (ledger.cache.block_count.load ()); - return size_new == 0 ? 16 : size_new; -} - std::unique_ptr nano::bootstrap_ascending::service::collect_container_info (std::string const & name) { nano::lock_guard lock{ mutex }; auto composite = std::make_unique (name); composite->add_component (std::make_unique (container_info{ "tags", tags.size (), sizeof (decltype (tags)::value_type) })); - composite->add_component (accounts.collect_container_info ("accounts")); + // composite->add_component (accounts.collect_container_info ("accounts")); return composite; } diff --git a/nano/node/bootstrap_ascending/service.hpp b/nano/node/bootstrap_ascending/service.hpp index c21f2f02b8..6cc563bf02 100644 --- a/nano/node/bootstrap_ascending/service.hpp +++ b/nano/node/bootstrap_ascending/service.hpp @@ -36,6 +36,8 @@ namespace transport namespace nano::bootstrap_ascending { +class service; + template class tag_base { @@ -88,6 +90,59 @@ class account_scan final verify_result verify (nano::asc_pull_ack::blocks_payload const & response) const; }; + +public: + account_scan (bootstrap_ascending_config const &, service &, nano::ledger &, nano::network_constants &, nano::block_processor &, nano::stats &); + ~account_scan (); + + void start (); + void stop (); + + void process (nano::asc_pull_ack::blocks_payload const & response, account_scan::tag const &); + void cleanup (); + + std::size_t blocked_size () const; + std::size_t priority_size () const; + +private: // Dependencies + bootstrap_ascending_config const & config; + service & service; + nano::ledger & ledger; + nano::network_constants & network_consts; + nano::block_processor & block_processor; + nano::stats & stats; + +private: + void run (); + void run_one (); + + /* Inspects a block that has been processed by the block processor */ + void inspect (store::transaction const &, nano::process_return const & result, nano::block const & block); + + /* Waits until a suitable account outside of cool down period is available */ + nano::account available_account (); + nano::account wait_available_account (); + + /* Throttles requesting new blocks, not to overwhelm blockprocessor */ + void wait_blockprocessor (); + + void throttle_if_needed (nano::unique_lock & lock); + // Calculates a lookback size based on the size of the ledger where larger ledgers have a larger sample count + std::size_t compute_throttle_size () const; + +private: + nano::bootstrap_ascending::account_sets accounts; + nano::bootstrap_ascending::buffered_iterator iterator; + nano::bootstrap_ascending::throttle throttle; + + // Requests for accounts from database have much lower hitrate and could introduce strain on the network + // A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue + nano::bandwidth_limiter database_limiter; + + bool stopped{ false }; + mutable nano::mutex mutex; + mutable nano::condition_variable condition; + std::thread thread; }; class lazy_pulling final @@ -112,10 +167,10 @@ class service final */ void process (nano::asc_pull_ack const & message, std::shared_ptr channel); + void wait_next (); + public: // Info std::unique_ptr collect_container_info (std::string const & name); - std::size_t blocked_size () const; - std::size_t priority_size () const; std::size_t score_size () const; private: // Dependencies @@ -132,6 +187,8 @@ class service final using tag_strategy_variant = std::variant; + void request (tag_strategy_variant const &); + public: // Tag struct async_tag { @@ -151,26 +208,13 @@ class service final nano::observer_set on_timeout; private: - /* Inspects a block that has been processed by the block processor */ - void inspect (store::transaction const &, nano::process_return const & result, nano::block const & block); - - void throttle_if_needed (nano::unique_lock & lock); - void run (); - bool run_one (); void run_timeouts (); bool request (tag_strategy_variant const &, std::shared_ptr &); void track (async_tag const & tag); - std::optional wait_next (); - - /* Throttles requesting new blocks, not to overwhelm blockprocessor */ - void wait_blockprocessor (); /* Waits for channel with free capacity for bootstrap messages */ std::shared_ptr wait_available_channel (); - /* Waits until a suitable account outside of cool down period is available */ - nano::account available_account (); - nano::account wait_available_account (); public: nano::asc_pull_req::payload_variant prepare (account_scan::tag &); @@ -179,17 +223,7 @@ class service final void process (nano::asc_pull_ack::blocks_payload const & response, account_scan::tag const &); void process (nano::asc_pull_ack::account_info_payload const & response, lazy_pulling::tag const &); -public: // account_sets - nano::bootstrap_ascending::account_sets::info_t info () const; - private: - nano::bootstrap_ascending::account_sets accounts; - nano::bootstrap_ascending::buffered_iterator iterator; - nano::bootstrap_ascending::throttle throttle; - - // Calculates a lookback size based on the size of the ledger where larger ledgers have a larger sample count - std::size_t compute_throttle_size () const; - // clang-format off class tag_sequenced {}; class tag_id {}; @@ -205,9 +239,6 @@ class service final ordered_tags tags; nano::bootstrap_ascending::peer_scoring scoring; - // Requests for accounts from database have much lower hitrate and could introduce strain on the network - // A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue - nano::bandwidth_limiter database_limiter; bool stopped{ false }; mutable nano::mutex mutex; diff --git a/nano/slow_test/bootstrap.cpp b/nano/slow_test/bootstrap.cpp index 250bc4de8e..5f550cec4a 100644 --- a/nano/slow_test/bootstrap.cpp +++ b/nano/slow_test/bootstrap.cpp @@ -177,8 +177,6 @@ TEST (bootstrap_ascending, profile) rate.observe ("count", [&] () { return client->ledger.cache.block_count.load (); }); rate.observe ("unchecked", [&] () { return client->unchecked.count (); }); rate.observe ("block_processor", [&] () { return client->block_processor.size (); }); - rate.observe ("priority", [&] () { return client->ascendboot.priority_size (); }); - rate.observe ("blocking", [&] () { return client->ascendboot.blocked_size (); }); rate.observe (*client, nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out); rate.observe (*client, nano::stat::type::bootstrap_ascending, nano::stat::detail::reply, nano::stat::dir::in); rate.observe (*client, nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in);