diff --git a/nano/lib/threading.cpp b/nano/lib/threading.cpp index 3b71fd9ac1..b6ddc64ac8 100644 --- a/nano/lib/threading.cpp +++ b/nano/lib/threading.cpp @@ -90,16 +90,19 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::unchecked: thread_role_name_string = "Unchecked"; break; + case nano::thread_role::name::backlog_population: + thread_role_name_string = "Backlog"; + break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } /* - * We want to constrain the thread names to 15 - * characters, since this is the smallest maximum - * length supported by the platforms we support - * (specifically, Linux) - */ + * We want to constrain the thread names to 15 + * characters, since this is the smallest maximum + * length supported by the platforms we support + * (specifically, Linux) + */ debug_assert (thread_role_name_string.size () < 16); return (thread_role_name_string); } @@ -121,7 +124,7 @@ void nano::thread_role::set (nano::thread_role::name role) void nano::thread_attributes::set (boost::thread::attributes & attrs) { auto attrs_l (&attrs); - attrs_l->set_stack_size (8000000); //8MB + attrs_l->set_stack_size (8000000); // 8MB } nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned service_threads_a) : diff --git a/nano/lib/threading.hpp b/nano/lib/threading.hpp index aab4771bff..a06e2eb9f5 100644 --- a/nano/lib/threading.hpp +++ b/nano/lib/threading.hpp @@ -42,6 +42,7 @@ namespace thread_role db_parallel_traversal, election_scheduler, unchecked, + backlog_population }; /* diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index ba02c0f19e..e02354ee3d 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -16,6 +16,8 @@ add_library( ${platform_sources} active_transactions.hpp active_transactions.cpp + backlog_population.hpp + backlog_population.cpp blockprocessor.hpp blockprocessor.cpp bootstrap/bootstrap_attempt.hpp diff --git a/nano/node/backlog_population.cpp b/nano/node/backlog_population.cpp new file mode 100644 index 0000000000..b38d90af5c --- /dev/null +++ b/nano/node/backlog_population.cpp @@ -0,0 +1,96 @@ +#include +#include +#include +#include +#include + +nano::backlog_population::backlog_population (const config & config_a, nano::store & store_a, nano::election_scheduler & scheduler_a) : + config_m{ config_a }, + store_m{ store_a }, + scheduler{ scheduler_a } +{ +} + +nano::backlog_population::~backlog_population () +{ + stop (); + if (thread.joinable ()) + { + thread.join (); + } +} + +void nano::backlog_population::start () +{ + if (!thread.joinable ()) + { + thread = std::thread{ [this] () { run (); } }; + } +} + +void nano::backlog_population::stop () +{ + nano::unique_lock lock{ mutex }; + stopped = true; + notify (); +} + +void nano::backlog_population::trigger () +{ + nano::unique_lock lock{ mutex }; + triggered = true; + notify (); +} + +void nano::backlog_population::notify () +{ + condition.notify_all (); +} + +bool nano::backlog_population::predicate () const +{ + return triggered; +} + +void nano::backlog_population::run () +{ + nano::thread_role::set (nano::thread_role::name::backlog_population); + const auto delay = std::chrono::seconds{ config_m.delay_between_runs_in_seconds }; + nano::unique_lock lock{ mutex }; + while (!stopped) + { + if (predicate () || config_m.ongoing_backlog_population_enabled) + { + triggered = false; + lock.unlock (); + populate_backlog (); + lock.lock (); + } + + condition.wait_for (lock, delay, [this] () { + return stopped || predicate (); + }); + } +} + +void nano::backlog_population::populate_backlog () +{ + auto done = false; + uint64_t const chunk_size = 65536; + nano::account next = 0; + uint64_t total = 0; + while (!stopped && !done) + { + auto transaction = store_m.tx_begin_read (); + auto count = 0; + auto i = store_m.account.begin (transaction, next); + const auto end = store_m.account.end (); + for (; !stopped && i != end && count < chunk_size; ++i, ++count, ++total) + { + auto const & account = i->first; + scheduler.activate (account, transaction); + next = account.number () + 1; + } + done = store_m.account.begin (transaction, next) == end; + } +} diff --git a/nano/node/backlog_population.hpp b/nano/node/backlog_population.hpp new file mode 100644 index 0000000000..a4a352dca7 --- /dev/null +++ b/nano/node/backlog_population.hpp @@ -0,0 +1,58 @@ +#pragma once + +#include + +#include +#include +#include + +namespace nano +{ +class store; +class election_scheduler; + +class backlog_population final +{ +public: + struct config + { + bool ongoing_backlog_population_enabled; + unsigned int delay_between_runs_in_seconds; + }; + + explicit backlog_population (const config & config_a, store & store, election_scheduler & scheduler); + ~backlog_population (); + + void start (); + void stop (); + void trigger (); + + /** Other components call this to notify us about external changes, so we can check our predicate. */ + void notify (); + +private: + void run (); + bool predicate () const; + + void populate_backlog (); + + /** This is a manual trigger, the ongoing backlog population does not use this. + * It can be triggered even when backlog population (frontiers confirmation) is disabled. */ + bool triggered{ false }; + + std::atomic stopped{ false }; + + nano::condition_variable condition; + mutable nano::mutex mutex; + + /** Thread that runs the backlog implementation logic. The thread always runs, even if + * backlog population is disabled, so that it can service a manual trigger (e.g. via RPC). */ + std::thread thread; + + config config_m; + +private: // Dependencies + store & store_m; + election_scheduler & scheduler; +}; +} diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 8095d5c534..d8ea6f7bfb 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -5224,6 +5224,13 @@ void nano::json_handler::work_peers_clear () response_errors (); } +void nano::json_handler::populate_backlog () +{ + node.backlog.trigger (); + response_l.put ("success", ""); + response_errors (); +} + void nano::inprocess_rpc_handler::process_request (std::string const &, std::string const & body_a, std::function response_a) { // Note that if the rpc action is async, the shared_ptr lifetime will be extended by the action handler @@ -5388,6 +5395,7 @@ ipc_json_handler_no_arg_func_map create_ipc_json_handler_no_arg_func_map () no_arg_funcs.emplace ("work_peer_add", &nano::json_handler::work_peer_add); no_arg_funcs.emplace ("work_peers", &nano::json_handler::work_peers); no_arg_funcs.emplace ("work_peers_clear", &nano::json_handler::work_peers_clear); + no_arg_funcs.emplace ("populate_backlog", &nano::json_handler::populate_backlog); return no_arg_funcs; } diff --git a/nano/node/json_handler.hpp b/nano/node/json_handler.hpp index a2e4f1e0c1..4d2a055d14 100644 --- a/nano/node/json_handler.hpp +++ b/nano/node/json_handler.hpp @@ -89,6 +89,7 @@ class json_handler : public std::enable_shared_from_this void pending_exists (); void receivable (); void receivable_exists (); + void populate_backlog (); void process (); void pruned_exists (); void receive (); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index ad0ebc0997..bc0f1c8b92 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -33,6 +33,14 @@ extern unsigned char nano_bootstrap_weights_beta[]; extern std::size_t nano_bootstrap_weights_beta_size; } +nano::backlog_population::config nano::nodeconfig_to_backlog_population_config (const nano::node_config & config) +{ + nano::backlog_population::config cfg; + cfg.ongoing_backlog_population_enabled = config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled; + cfg.delay_between_runs_in_seconds = config.network_params.network.is_dev_network () ? 1u : 300u; + return cfg; +} + void nano::node::keepalive (std::string const & address_a, uint16_t port_a) { auto node_l (shared_from_this ()); @@ -155,6 +163,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co scheduler{ *this }, aggregator (config, stats, active.generator, active.final_generator, history, ledger, wallets, active), wallets (wallets_store.init_error (), *this), + backlog{ nano::nodeconfig_to_backlog_population_config (config), store, scheduler }, startup_time (std::chrono::steady_clock::now ()), node_seq (seq) { @@ -702,12 +711,7 @@ void nano::node::start () port_mapping.start (); } wallets.start (); - if (config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled) - { - workers.push_task ([this_l = shared ()] () { - this_l->ongoing_backlog_population (); - }); - } + backlog.start (); } void nano::node::stop () @@ -1022,15 +1026,6 @@ void nano::node::ongoing_unchecked_cleanup () }); } -void nano::node::ongoing_backlog_population () -{ - populate_backlog (); - auto delay = config.network_params.network.is_dev_network () ? std::chrono::seconds{ 1 } : std::chrono::duration_cast (std::chrono::minutes{ 5 }); - workers.add_timed_task (std::chrono::steady_clock::now () + delay, [this_l = shared ()] () { - this_l->ongoing_backlog_population (); - }); -} - bool nano::node::collect_ledger_pruning_targets (std::deque & pruning_targets_a, nano::account & last_account_a, uint64_t const batch_read_size_a, uint64_t const max_depth_a, uint64_t const cutoff_time_a) { uint64_t read_operations (0); @@ -1794,26 +1789,6 @@ std::pair nano::node::get_ return { max_blocks, weights }; } -void nano::node::populate_backlog () -{ - auto done = false; - uint64_t const chunk_size = 65536; - nano::account next = 0; - uint64_t total = 0; - while (!stopped && !done) - { - auto transaction = store.tx_begin_read (); - auto count = 0; - for (auto i = store.account.begin (transaction, next), n = store.account.end (); !stopped && i != n && count < chunk_size; ++i, ++count, ++total) - { - auto const & account = i->first; - scheduler.activate (account, transaction); - next = account.number () + 1; - } - done = store.account.begin (transaction, next) == store.account.end (); - } -} - /** Convenience function to easily return the confirmation height of an account. */ uint64_t nano::node::get_confirmation_height (nano::transaction const & transaction_a, nano::account & account_a) { diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 7e7dd66d59..f1488596fe 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -83,9 +84,11 @@ class block_arrival final }; std::unique_ptr collect_container_info (block_arrival & block_arrival, std::string const & name); - std::unique_ptr collect_container_info (rep_crawler & rep_crawler, std::string const & name); +// Configs +backlog_population::config nodeconfig_to_backlog_population_config (const node_config & config); + class node final : public std::enable_shared_from_this { public: @@ -123,7 +126,6 @@ class node final : public std::enable_shared_from_this void ongoing_bootstrap (); void ongoing_peer_store (); void ongoing_unchecked_cleanup (); - void ongoing_backlog_population (); void backup_wallet (); void search_receivable_all (); void bootstrap_wallet (); @@ -154,7 +156,6 @@ class node final : public std::enable_shared_from_this bool epoch_upgrader (nano::raw_key const &, nano::epoch, uint64_t, uint64_t); void set_bandwidth_params (std::size_t limit, double ratio); std::pair get_bootstrap_weights () const; - void populate_backlog (); uint64_t get_confirmation_height (nano::transaction const &, nano::account &); nano::write_database_queue write_database_queue; boost::asio::io_context & io_ctx; @@ -198,6 +199,8 @@ class node final : public std::enable_shared_from_this nano::election_scheduler scheduler; nano::request_aggregator aggregator; nano::wallets wallets; + nano::backlog_population backlog; + std::chrono::steady_clock::time_point const startup_time; std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week std::atomic unresponsive_work_peers{ false }; diff --git a/nano/rpc/rpc_handler.cpp b/nano/rpc/rpc_handler.cpp index d939d53589..69e95d00a6 100644 --- a/nano/rpc/rpc_handler.cpp +++ b/nano/rpc/rpc_handler.cpp @@ -162,6 +162,7 @@ std::unordered_set create_rpc_control_impls () set.emplace ("ledger"); set.emplace ("node_id"); set.emplace ("password_change"); + set.emplace ("populate_backlog"); set.emplace ("receive"); set.emplace ("receive_minimum"); set.emplace ("receive_minimum_set"); diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 3b3a31221b..ff574f1686 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -4994,6 +4994,42 @@ TEST (rpc, work_peers_all) ASSERT_EQ (0, peers_node.size ()); } +TEST (rpc, populate_backlog) +{ + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + // Disable automatic backlog population + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + auto node = add_ipc_enabled_node (system, node_config); + + // Create and process a block that won't get automatically scheduled for confirmation + nano::keypair key; + nano::block_builder builder; + auto latest (node->latest (nano::dev::genesis_key.pub)); + auto genesis_balance (nano::dev::constants.genesis_amount); + auto send_amount (genesis_balance - 100); + auto send = builder + .send () + .previous (latest) + .destination (key.pub) + .balance (genesis_balance) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*node->work_generate_blocking (latest)) + .build (); + ASSERT_EQ (nano::process_result::progress, node->process (*send).code); + ASSERT_FALSE (node->block_arrival.recent (send->hash ())); + + auto const rpc_ctx = add_rpc (system, node); + boost::property_tree::ptree request; + request.put ("action", "populate_backlog"); + auto response (wait_response (system, rpc_ctx, request)); + std::string success (response.get ("success", "")); + ASSERT_TRUE (success.empty ()); + + // Ensure block got activated and election was started + ASSERT_TIMELY (5s, node->active.active (*send)); +} + TEST (rpc, ledger) { nano::system system; @@ -5726,7 +5762,7 @@ TEST (rpc, online_reps) boost::optional weight (item->second.get_optional ("weight")); ASSERT_FALSE (weight.is_initialized ()); ASSERT_TIMELY (5s, node2->block (send_block->hash ())); - //Test weight option + // Test weight option request.put ("weight", "true"); auto response2 (wait_response (system, rpc_ctx, request)); auto representatives2 (response2.get_child ("representatives")); @@ -5735,7 +5771,7 @@ TEST (rpc, online_reps) ASSERT_EQ (nano::dev::genesis_key.pub.to_account (), item2->first); auto weight2 (item2->second.get ("weight")); ASSERT_EQ (node2->weight (nano::dev::genesis_key.pub).convert_to (), weight2); - //Test accounts filter + // Test accounts filter rpc_ctx.io_scope->reset (); auto new_rep (system.wallet (1)->deterministic_insert ()); auto send (system.wallet (0)->send_action (nano::dev::genesis_key.pub, new_rep, node1->config.receive_minimum.number ()));