Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add populate_backlog rpc command #3860

Merged
merged 15 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions nano/lib/threading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,19 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::unchecked:
thread_role_name_string = "Unchecked";
break;
case nano::thread_role::name::backlog_population:
thread_role_name_string = "Backlog";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}

/*
* We want to constrain the thread names to 15
* characters, since this is the smallest maximum
* length supported by the platforms we support
* (specifically, Linux)
*/
* We want to constrain the thread names to 15
* characters, since this is the smallest maximum
* length supported by the platforms we support
* (specifically, Linux)
*/
debug_assert (thread_role_name_string.size () < 16);
return (thread_role_name_string);
}
Expand All @@ -121,7 +124,7 @@ void nano::thread_role::set (nano::thread_role::name role)
void nano::thread_attributes::set (boost::thread::attributes & attrs)
{
auto attrs_l (&attrs);
attrs_l->set_stack_size (8000000); //8MB
attrs_l->set_stack_size (8000000); // 8MB
}

nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned service_threads_a) :
Expand Down
1 change: 1 addition & 0 deletions nano/lib/threading.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace thread_role
db_parallel_traversal,
election_scheduler,
unchecked,
backlog_population
};

/*
Expand Down
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ add_library(
${platform_sources}
active_transactions.hpp
active_transactions.cpp
backlog_population.hpp
backlog_population.cpp
blockprocessor.hpp
blockprocessor.cpp
bootstrap/bootstrap_attempt.hpp
Expand Down
95 changes: 95 additions & 0 deletions nano/node/backlog_population.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#include <nano/lib/threading.hpp>
#include <nano/node/backlog_population.hpp>
#include <nano/node/election_scheduler.hpp>
#include <nano/node/nodeconfig.hpp>
#include <nano/secure/store.hpp>

nano::backlog_population::backlog_population (nano::node_config & config_a, nano::store & store_a, nano::election_scheduler & scheduler_a) :
config{ config_a },
store{ store_a },
scheduler{ scheduler_a }
{
}

nano::backlog_population::~backlog_population ()
{
stop ();
if (thread.joinable ())
{
thread.join ();
}
}

void nano::backlog_population::start ()
{
if (!thread.joinable ())
{
thread = std::thread{ [this] () { run (); } };
}
}

void nano::backlog_population::stop ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
stopped = true;
notify ();
}

void nano::backlog_population::trigger ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
triggered = true;
notify ();
}

void nano::backlog_population::notify ()
{
condition.notify_all ();
}

bool nano::backlog_population::predicate () const
{
return triggered;
}

void nano::backlog_population::run ()
{
nano::thread_role::set (nano::thread_role::name::backlog_population);
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
if (predicate () || (config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled))
{
triggered = false;
lock.unlock ();
populate_backlog ();
lock.lock ();
}

auto delay = config.network_params.network.is_dev_network () ? std::chrono::seconds{ 1 } : std::chrono::duration_cast<std::chrono::seconds> (std::chrono::minutes{ 5 });

condition.wait_for (lock, delay, [this] () {
return stopped || predicate ();
});
}
}

void nano::backlog_population::populate_backlog ()
{
auto done = false;
uint64_t const chunk_size = 65536;
nano::account next = 0;
uint64_t total = 0;
while (!stopped && !done)
{
auto transaction = store.tx_begin_read ();
auto count = 0;
for (auto i = store.account.begin (transaction, next), n = store.account.end (); !stopped && i != n && count < chunk_size; ++i, ++count, ++total)
{
auto const & account = i->first;
scheduler.activate (account, transaction);
next = account.number () + 1;
}
done = store.account.begin (transaction, next) == store.account.end ();
}
}
44 changes: 44 additions & 0 deletions nano/node/backlog_population.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#pragma once

#include <nano/lib/locks.hpp>

#include <atomic>
#include <condition_variable>
#include <thread>

namespace nano
{
class node_config;
class store;
class election_scheduler;

class backlog_population final
{
public:
explicit backlog_population (node_config & config, store & store, election_scheduler & scheduler);
~backlog_population ();

void start ();
void stop ();
void trigger ();
void notify ();

private:
void run ();
bool predicate () const;

void populate_backlog ();

bool triggered{ false };
std::atomic<bool> stopped{ false };

nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread thread;

private: // Dependencies
node_config & config;
store & store;
election_scheduler & scheduler;
};
}
8 changes: 8 additions & 0 deletions nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5224,6 +5224,13 @@ void nano::json_handler::work_peers_clear ()
response_errors ();
}

void nano::json_handler::populate_backlog ()
{
node.backlog.trigger ();
response_l.put ("success", "");
response_errors ();
}

void nano::inprocess_rpc_handler::process_request (std::string const &, std::string const & body_a, std::function<void (std::string const &)> response_a)
{
// Note that if the rpc action is async, the shared_ptr<json_handler> lifetime will be extended by the action handler
Expand Down Expand Up @@ -5388,6 +5395,7 @@ ipc_json_handler_no_arg_func_map create_ipc_json_handler_no_arg_func_map ()
no_arg_funcs.emplace ("work_peer_add", &nano::json_handler::work_peer_add);
no_arg_funcs.emplace ("work_peers", &nano::json_handler::work_peers);
no_arg_funcs.emplace ("work_peers_clear", &nano::json_handler::work_peers_clear);
no_arg_funcs.emplace ("populate_backlog", &nano::json_handler::populate_backlog);
return no_arg_funcs;
}

Expand Down
1 change: 1 addition & 0 deletions nano/node/json_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class json_handler : public std::enable_shared_from_this<nano::json_handler>
void pending_exists ();
void receivable ();
void receivable_exists ();
void populate_backlog ();
void process ();
void pruned_exists ();
void receive ();
Expand Down
37 changes: 2 additions & 35 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
scheduler{ *this },
aggregator (config, stats, active.generator, active.final_generator, history, ledger, wallets, active),
wallets (wallets_store.init_error (), *this),
backlog{ config, store, scheduler },
startup_time (std::chrono::steady_clock::now ()),
node_seq (seq)
{
Expand Down Expand Up @@ -702,12 +703,7 @@ void nano::node::start ()
port_mapping.start ();
}
wallets.start ();
if (config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled)
{
workers.push_task ([this_l = shared ()] () {
this_l->ongoing_backlog_population ();
});
}
backlog.start ();
}

void nano::node::stop ()
Expand Down Expand Up @@ -1022,15 +1018,6 @@ void nano::node::ongoing_unchecked_cleanup ()
});
}

void nano::node::ongoing_backlog_population ()
{
populate_backlog ();
auto delay = config.network_params.network.is_dev_network () ? std::chrono::seconds{ 1 } : std::chrono::duration_cast<std::chrono::seconds> (std::chrono::minutes{ 5 });
workers.add_timed_task (std::chrono::steady_clock::now () + delay, [this_l = shared ()] () {
this_l->ongoing_backlog_population ();
});
}

bool nano::node::collect_ledger_pruning_targets (std::deque<nano::block_hash> & pruning_targets_a, nano::account & last_account_a, uint64_t const batch_read_size_a, uint64_t const max_depth_a, uint64_t const cutoff_time_a)
{
uint64_t read_operations (0);
Expand Down Expand Up @@ -1794,26 +1781,6 @@ std::pair<uint64_t, decltype (nano::ledger::bootstrap_weights)> nano::node::get_
return { max_blocks, weights };
}

void nano::node::populate_backlog ()
{
auto done = false;
uint64_t const chunk_size = 65536;
nano::account next = 0;
uint64_t total = 0;
while (!stopped && !done)
{
auto transaction = store.tx_begin_read ();
auto count = 0;
for (auto i = store.account.begin (transaction, next), n = store.account.end (); !stopped && i != n && count < chunk_size; ++i, ++count, ++total)
{
auto const & account = i->first;
scheduler.activate (account, transaction);
next = account.number () + 1;
}
done = store.account.begin (transaction, next) == store.account.end ();
}
}

/** Convenience function to easily return the confirmation height of an account. */
uint64_t nano::node::get_confirmation_height (nano::transaction const & transaction_a, nano::account & account_a)
{
Expand Down
5 changes: 3 additions & 2 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <nano/lib/stats.hpp>
#include <nano/lib/work.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/backlog_population.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap/bootstrap.hpp>
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
Expand Down Expand Up @@ -123,7 +124,6 @@ class node final : public std::enable_shared_from_this<nano::node>
void ongoing_bootstrap ();
void ongoing_peer_store ();
void ongoing_unchecked_cleanup ();
void ongoing_backlog_population ();
void backup_wallet ();
void search_receivable_all ();
void bootstrap_wallet ();
Expand Down Expand Up @@ -154,7 +154,6 @@ class node final : public std::enable_shared_from_this<nano::node>
bool epoch_upgrader (nano::raw_key const &, nano::epoch, uint64_t, uint64_t);
void set_bandwidth_params (std::size_t limit, double ratio);
std::pair<uint64_t, decltype (nano::ledger::bootstrap_weights)> get_bootstrap_weights () const;
void populate_backlog ();
uint64_t get_confirmation_height (nano::transaction const &, nano::account &);
nano::write_database_queue write_database_queue;
boost::asio::io_context & io_ctx;
Expand Down Expand Up @@ -198,6 +197,8 @@ class node final : public std::enable_shared_from_this<nano::node>
nano::election_scheduler scheduler;
nano::request_aggregator aggregator;
nano::wallets wallets;
nano::backlog_population backlog;

std::chrono::steady_clock::time_point const startup_time;
std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week
std::atomic<bool> unresponsive_work_peers{ false };
Expand Down
1 change: 1 addition & 0 deletions nano/rpc/rpc_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ std::unordered_set<std::string> create_rpc_control_impls ()
set.emplace ("ledger");
set.emplace ("node_id");
set.emplace ("password_change");
set.emplace ("populate_backlog");
set.emplace ("receive");
set.emplace ("receive_minimum");
set.emplace ("receive_minimum_set");
Expand Down
40 changes: 38 additions & 2 deletions nano/rpc_test/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4994,6 +4994,42 @@ TEST (rpc, work_peers_all)
ASSERT_EQ (0, peers_node.size ());
}

TEST (rpc, populate_backlog)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
// Disable automatic backlog population
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node = add_ipc_enabled_node (system, node_config);

// Create and process a block that won't get automatically scheduled for confirmation
nano::keypair key;
nano::block_builder builder;
auto latest (node->latest (nano::dev::genesis_key.pub));
auto genesis_balance (nano::dev::constants.genesis_amount);
auto send_amount (genesis_balance - 100);
auto send = builder
.send ()
.previous (latest)
.destination (key.pub)
.balance (genesis_balance)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*node->work_generate_blocking (latest))
.build ();
ASSERT_EQ (nano::process_result::progress, node->process (*send).code);
ASSERT_FALSE (node->block_arrival.recent (send->hash ()));

auto const rpc_ctx = add_rpc (system, node);
boost::property_tree::ptree request;
request.put ("action", "populate_backlog");
auto response (wait_response (system, rpc_ctx, request));
std::string success (response.get<std::string> ("success", ""));
ASSERT_TRUE (success.empty ());

// Ensure block got activated and election was started
ASSERT_TIMELY (5s, node->active.active (*send));
}

TEST (rpc, ledger)
{
nano::system system;
Expand Down Expand Up @@ -5726,7 +5762,7 @@ TEST (rpc, online_reps)
boost::optional<std::string> weight (item->second.get_optional<std::string> ("weight"));
ASSERT_FALSE (weight.is_initialized ());
ASSERT_TIMELY (5s, node2->block (send_block->hash ()));
//Test weight option
// Test weight option
request.put ("weight", "true");
auto response2 (wait_response (system, rpc_ctx, request));
auto representatives2 (response2.get_child ("representatives"));
Expand All @@ -5735,7 +5771,7 @@ TEST (rpc, online_reps)
ASSERT_EQ (nano::dev::genesis_key.pub.to_account (), item2->first);
auto weight2 (item2->second.get<std::string> ("weight"));
ASSERT_EQ (node2->weight (nano::dev::genesis_key.pub).convert_to<std::string> (), weight2);
//Test accounts filter
// Test accounts filter
rpc_ctx.io_scope->reset ();
auto new_rep (system.wallet (1)->deterministic_insert ());
auto send (system.wallet (0)->send_action (nano::dev::genesis_key.pub, new_rep, node1->config.receive_minimum.number ()));
Expand Down