Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize node wallet frontiers during background confirmations #2154

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 39 additions & 15 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2983,12 +2983,7 @@ TEST (confirmation_height, prioritize_frontiers)
nano::keypair key2;
nano::keypair key3;
nano::keypair key4;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
nano::block_hash latest1 (system.nodes[0]->latest (nano::test_genesis_key.pub));
system.wallet (0)->insert_adhoc (key1.prv);
system.wallet (0)->insert_adhoc (key2.prv);
system.wallet (0)->insert_adhoc (key3.prv);
system.wallet (0)->insert_adhoc (key4.prv);

// Send different numbers of blocks all accounts
nano::send_block send1 (latest1, key1.pub, node->config.online_weight_minimum.number () + 10000, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest1));
Expand Down Expand Up @@ -3036,20 +3031,49 @@ TEST (confirmation_height, prioritize_frontiers)
}

auto transaction = node->store.tx_begin_read ();
node->active.prioritize_frontiers_for_confirmation (transaction, std::chrono::seconds (1));
constexpr auto num_accounts = 5;
ASSERT_EQ (node->active.priority_cementable_frontiers_size (), num_accounts);
// Check the order of accounts is as expected (greatest number of uncemented blocks at the front). key3 and key4 have the same value, the order is unspecified so check both
std::array<nano::account, num_accounts> desired_order_1 { nano::genesis_account, key3.pub, key4.pub, key1.pub, key2.pub };
std::array<nano::account, num_accounts> desired_order_2 { nano::genesis_account, key4.pub, key3.pub, key1.pub, key2.pub };
// clang-format off
auto priority_orders_match = [&priority_cementable_frontiers = node->active.priority_cementable_frontiers](std::array<nano::account, num_accounts> const & desired_order) {
return std::equal (desired_order.begin (), desired_order.end (), priority_cementable_frontiers.get<1> ().begin (), priority_cementable_frontiers.get<1> ().end (), [](nano::account const & account, nano::cementable_account const & cementable_account) {
auto priority_orders_match = [](auto const & cementable_frontiers, auto const & desired_order) {
return std::equal (desired_order.begin (), desired_order.end (), cementable_frontiers.template get<1> ().begin (), cementable_frontiers.template get<1> ().end (), [](nano::account const & account, nano::cementable_account const & cementable_account) {
return (account == cementable_account.account);
});
};
// clang-format on
ASSERT_TRUE (priority_orders_match (desired_order_1) || priority_orders_match (desired_order_2));
{
node->active.prioritize_frontiers_for_confirmation (transaction, std::chrono::seconds (1), std::chrono::seconds (1));
ASSERT_EQ (node->active.priority_cementable_frontiers_size (), num_accounts);
// Check the order of accounts is as expected (greatest number of uncemented blocks at the front). key3 and key4 have the same value, the order is unspecified so check both
std::array<nano::account, num_accounts> desired_order_1{ nano::genesis_account, key3.pub, key4.pub, key1.pub, key2.pub };
std::array<nano::account, num_accounts> desired_order_2{ nano::genesis_account, key4.pub, key3.pub, key1.pub, key2.pub };
ASSERT_TRUE (priority_orders_match (node->active.priority_cementable_frontiers, desired_order_1) || priority_orders_match (node->active.priority_cementable_frontiers, desired_order_2));
}

{
// Add some to the local node wallets and check ordering of both containers
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
system.wallet (0)->insert_adhoc (key1.prv);
system.wallet (0)->insert_adhoc (key2.prv);
node->active.prioritize_frontiers_for_confirmation (transaction, std::chrono::seconds (1), std::chrono::seconds (1));
ASSERT_EQ (node->active.priority_cementable_frontiers_size (), num_accounts - 3);
ASSERT_EQ (node->active.priority_wallet_cementable_frontiers_size (), num_accounts - 2);
std::array<nano::account, 3> local_desired_order{ nano::genesis_account, key1.pub, key2.pub };
ASSERT_TRUE (priority_orders_match (node->active.priority_wallet_cementable_frontiers, local_desired_order));
std::array<nano::account, 2> desired_order_1{ key3.pub, key4.pub };
std::array<nano::account, 2> desired_order_2{ key4.pub, key3.pub };
ASSERT_TRUE (priority_orders_match (node->active.priority_cementable_frontiers, desired_order_1) || priority_orders_match (node->active.priority_cementable_frontiers, desired_order_2));
}

{
// Add the remainder of accounts to node wallets and check size/ordering is correct
system.wallet (0)->insert_adhoc (key3.prv);
system.wallet (0)->insert_adhoc (key4.prv);
node->active.prioritize_frontiers_for_confirmation (transaction, std::chrono::seconds (1), std::chrono::seconds (1));
ASSERT_EQ (node->active.priority_cementable_frontiers_size (), 0);
ASSERT_EQ (node->active.priority_wallet_cementable_frontiers_size (), num_accounts);
std::array<nano::account, num_accounts> desired_order_1{ nano::genesis_account, key3.pub, key4.pub, key1.pub, key2.pub };
std::array<nano::account, num_accounts> desired_order_2{ nano::genesis_account, key4.pub, key3.pub, key1.pub, key2.pub };
ASSERT_TRUE (priority_orders_match (node->active.priority_wallet_cementable_frontiers, desired_order_1) || priority_orders_match (node->active.priority_wallet_cementable_frontiers, desired_order_2));
}

// Check that accounts which already exist have their order modified when the uncemented count changes.
nano::send_block send12 (send9.hash (), nano::test_genesis_key.pub, 100, key3.prv, key3.pub, system.work.generate (send9.hash ()));
Expand All @@ -3068,8 +3092,8 @@ TEST (confirmation_height, prioritize_frontiers)
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send17).code);
}
transaction.refresh ();
node->active.prioritize_frontiers_for_confirmation (transaction, std::chrono::seconds (1));
ASSERT_TRUE (priority_orders_match ({ key3.pub, nano::genesis_account, key4.pub, key1.pub, key2.pub }));
node->active.prioritize_frontiers_for_confirmation (transaction, std::chrono::seconds (1), std::chrono::seconds (1));
ASSERT_TRUE (priority_orders_match (node->active.priority_wallet_cementable_frontiers, std::array<nano::account, num_accounts>{ key3.pub, nano::genesis_account, key4.pub, key1.pub, key2.pub }));

// Check that the active transactions roots contains the frontiers
{
Expand Down
201 changes: 143 additions & 58 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,36 +58,41 @@ void nano::active_transactions::confirm_frontiers (nano::transaction const & tra
}

// Spend time prioritizing accounts to reduce voting traffic
auto time_to_spend_prioritizing = (frontiers_fully_confirmed ? std::chrono::milliseconds (200) : std::chrono::seconds (2));
prioritize_frontiers_for_confirmation (transaction_a, is_test_network ? std::chrono::milliseconds (50) : time_to_spend_prioritizing);
auto time_spent_prioritizing_ledger_accounts = (frontiers_fully_confirmed ? std::chrono::milliseconds (200) : std::chrono::seconds (2));
auto time_spent_prioritizing_wallet_accounts = std::chrono::milliseconds (50);
prioritize_frontiers_for_confirmation (transaction_a, is_test_network ? std::chrono::milliseconds (50) : time_spent_prioritizing_ledger_accounts, time_spent_prioritizing_wallet_accounts);

size_t elections_count (0);
lk.lock ();
while (!priority_cementable_frontiers.empty () && !stopped && elections_count < max_elections)
{
auto cementable_account_front_it = priority_cementable_frontiers.get<1> ().begin ();
auto cementable_account = *cementable_account_front_it;
priority_cementable_frontiers.get<1> ().erase (cementable_account_front_it);
lk.unlock ();
nano::account_info info;
auto error = node.store.account_get (transaction_a, cementable_account.account, info);
release_assert (!error);

if (info.block_count > info.confirmation_height && !node.pending_confirmation_height.is_processing_block (info.head))
auto start_elections_for_prioritized_frontiers = [&transaction_a, &elections_count, max_elections, &lk, &representative, this](prioritize_num_uncemented & cementable_frontiers) {
while (!cementable_frontiers.empty () && !this->stopped && elections_count < max_elections)
{
auto block (node.store.block_get (transaction_a, info.head));
if (!start (block))
auto cementable_account_front_it = cementable_frontiers.get<1> ().begin ();
auto cementable_account = *cementable_account_front_it;
cementable_frontiers.get<1> ().erase (cementable_account_front_it);
lk.unlock ();
nano::account_info info;
auto error = this->node.store.account_get (transaction_a, cementable_account.account, info);
release_assert (!error);

if (info.block_count > info.confirmation_height && !this->node.pending_confirmation_height.is_processing_block (info.head))
{
++elections_count;
// Calculate votes for local representatives
if (representative)
auto block (this->node.store.block_get (transaction_a, info.head));
if (!this->start (block))
{
node.block_processor.generator.add (block->hash ());
++elections_count;
// Calculate votes for local representatives
if (representative)
{
this->node.block_processor.generator.add (block->hash ());
}
}
}
lk.lock ();
}
lk.lock ();
}
};
start_elections_for_prioritized_frontiers (priority_cementable_frontiers);
start_elections_for_prioritized_frontiers (priority_wallet_cementable_frontiers);
frontiers_fully_confirmed = (elections_count < max_elections);
// 4 times slower check if all frontiers were confirmed
auto fully_confirmed_factor = frontiers_fully_confirmed ? 4 : 1;
Expand Down Expand Up @@ -346,73 +351,146 @@ void nano::active_transactions::request_loop ()
}
}

void nano::active_transactions::prioritize_frontiers_for_confirmation (nano::transaction const & transaction_a, std::chrono::milliseconds time_a)
void nano::active_transactions::prioritize_account_for_confirmation (nano::active_transactions::prioritize_num_uncemented & cementable_frontiers_a, size_t & cementable_frontiers_size_a, nano::account const & account_a, nano::account_info const & info_a)
{
if (info_a.block_count > info_a.confirmation_height && !node.pending_confirmation_height.is_processing_block (info_a.head))
{
auto num_uncemented = info_a.block_count - info_a.confirmation_height;
std::lock_guard<std::mutex> guard (mutex);
auto it = cementable_frontiers_a.find (account_a);
if (it != cementable_frontiers_a.end ())
{
if (it->blocks_uncemented != num_uncemented)
{
// Account already exists and there is now a different uncemented block count so update it in the container
cementable_frontiers_a.modify (it, [num_uncemented](nano::cementable_account & info) {
info.blocks_uncemented = num_uncemented;
});
}
}
else
{
assert (cementable_frontiers_size_a <= max_priority_cementable_frontiers);
if (cementable_frontiers_size_a == max_priority_cementable_frontiers)
{
// The maximum amount of frontiers stored has been reached. Check if the current frontier
// has more uncemented blocks than the lowest uncemented frontier in the collection if so replace it.
auto least_uncemented_frontier_it = cementable_frontiers_a.get<1> ().end ();
--least_uncemented_frontier_it;
if (num_uncemented > least_uncemented_frontier_it->blocks_uncemented)
{
cementable_frontiers_a.get<1> ().erase (least_uncemented_frontier_it);
cementable_frontiers_a.emplace (account_a, num_uncemented);
}
}
else
{
cementable_frontiers_a.emplace (account_a, num_uncemented);
}
}
cementable_frontiers_size_a = cementable_frontiers_a.size ();
}
}

void nano::active_transactions::prioritize_frontiers_for_confirmation (nano::transaction const & transaction_a, std::chrono::milliseconds ledger_accounts_time_a, std::chrono::milliseconds wallet_account_time_a)
{
// Don't try to prioritize when there are a large number of pending confirmation heights as blocks can be cemented in the meantime, making the prioritization less reliable
nano::timer<std::chrono::milliseconds> timer;
timer.start ();
if (node.pending_confirmation_height.size () < confirmed_frontiers_max_pending_cut_off)
{
auto i (node.store.latest_begin (transaction_a, next_frontier_account));
auto n (node.store.latest_end ());
std::unique_lock<std::mutex> lk (mutex);
auto priority_cementable_frontiers_size = priority_cementable_frontiers.size ();
auto priority_wallet_cementable_frontiers_size = priority_wallet_cementable_frontiers.size ();
lk.unlock ();
for (; i != n && !stopped; ++i)

nano::timer<std::chrono::milliseconds> wallet_account_timer;
wallet_account_timer.start ();

if (!skip_wallets)
{
auto const & account (i->first);
auto const & info (i->second);
if (info.block_count > info.confirmation_height && !node.pending_confirmation_height.is_processing_block (info.head))
// Prioritize wallet accounts first
{
auto num_uncemented = info.block_count - info.confirmation_height;
lk.lock ();
auto it = priority_cementable_frontiers.find (account);
if (it != priority_cementable_frontiers.end ())
std::lock_guard<std::mutex> lock (node.wallets.mutex);
auto wallet_transaction (node.wallets.tx_begin_read ());
auto const & items = node.wallets.items;
for (auto item_it = items.cbegin (); item_it != items.cend (); ++item_it)
{
if (it->blocks_uncemented != num_uncemented)
// Skip this wallet if it has been traversed already while there are others still awaiting
if (wallet_accounts_already_iterated.find (item_it->first) != wallet_accounts_already_iterated.end ())
{
// Account already exists and there is now a different uncemented block count so update it in the container
priority_cementable_frontiers.modify (it, [num_uncemented](nano::cementable_account & info) {
info.blocks_uncemented = num_uncemented;
});
continue;
}
}
else
{
assert (priority_cementable_frontiers_size <= max_priority_cementable_frontiers);
if (priority_cementable_frontiers_size == max_priority_cementable_frontiers)

nano::account_info info;
auto & wallet (item_it->second);
std::lock_guard<std::recursive_mutex> wallet_lock (wallet->store.mutex);

auto & next_wallet_frontier_account = next_wallet_frontier_accounts.emplace (item_it->first, wallet_store::special_count).first->second;

auto i (wallet->store.begin (wallet_transaction, next_wallet_frontier_account));
auto n (wallet->store.end ());
for (; i != n; ++i)
{
// The maximum amount of frontiers stored has been reached. Check if the current frontier
// has more uncemented blocks than the lowest uncemented frontier in the collection if so replace it.
auto least_uncemented_frontier_it = priority_cementable_frontiers.get<1> ().end ();
--least_uncemented_frontier_it;
if (num_uncemented > least_uncemented_frontier_it->blocks_uncemented)
auto & account (i->first);
if (!node.store.account_get (transaction_a, account, info))
{
priority_cementable_frontiers.get<1> ().erase (least_uncemented_frontier_it);
priority_cementable_frontiers.emplace (account, num_uncemented);
// If it exists in normal priority collection delete from there.
auto it = priority_cementable_frontiers.find (account);
if (it != priority_cementable_frontiers.end ())
{
priority_cementable_frontiers.erase (it);
}

prioritize_account_for_confirmation (priority_wallet_cementable_frontiers, priority_wallet_cementable_frontiers_size, account, info);

if (wallet_account_timer.since_start () >= wallet_account_time_a)
{
break;
}
}
next_wallet_frontier_account = account.number () + 1;
}
else
// Go back to the beginning when we have reached the end of the wallet accounts for this wallet
if (i == n)
{
priority_cementable_frontiers.emplace (account, num_uncemented);
wallet_accounts_already_iterated.emplace (item_it->first);
next_wallet_frontier_accounts.at (item_it->first) = wallet_store::special_count;

// Skip wallet accounts when they have all been traversed
if (std::next (item_it) == items.cend ())
{
wallet_accounts_already_iterated.clear ();
skip_wallets = true;
}
}
}
priority_cementable_frontiers_size = priority_cementable_frontiers.size ();
lk.unlock ();
}
}

next_frontier_account = account.number () + 1;
nano::timer<std::chrono::milliseconds> timer;
timer.start ();

if (timer.since_start () >= time_a)
auto i (node.store.latest_begin (transaction_a, next_frontier_account));
auto n (node.store.latest_end ());
for (; i != n && !stopped; ++i)
{
auto const & account (i->first);
auto const & info (i->second);
if (priority_wallet_cementable_frontiers.find (account) == priority_wallet_cementable_frontiers.end ())
{
prioritize_account_for_confirmation (priority_cementable_frontiers, priority_cementable_frontiers_size, account, info);
}
next_frontier_account = account.number () + 1;
if (timer.since_start () >= ledger_accounts_time_a)
{
break;
}
}

// Go back to the beginning when we have reached the end of the accounts
// Go back to the beginning when we have reached the end of the accounts and start with wallet accounts next time
if (i == n)
{
next_frontier_account = 0;
skip_wallets = false;
}
}
}
Expand Down Expand Up @@ -789,6 +867,12 @@ size_t nano::active_transactions::priority_cementable_frontiers_size ()
return priority_cementable_frontiers.size ();
}

size_t nano::active_transactions::priority_wallet_cementable_frontiers_size ()
{
std::lock_guard<std::mutex> guard (mutex);
return priority_wallet_cementable_frontiers.size ();
}

boost::circular_buffer<double> nano::active_transactions::difficulty_trend ()
{
std::lock_guard<std::mutex> guard (mutex);
Expand Down Expand Up @@ -819,6 +903,7 @@ std::unique_ptr<seq_con_info_component> collect_seq_con_info (active_transaction
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "roots", roots_count, sizeof (decltype (active_transactions.roots)::value_type) }));
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "blocks", blocks_count, sizeof (decltype (active_transactions.blocks)::value_type) }));
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "confirmed", confirmed_count, sizeof (decltype (active_transactions.confirmed)::value_type) }));
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "priority_wallet_cementable_frontiers_count", active_transactions.priority_wallet_cementable_frontiers_size (), sizeof (nano::cementable_account) }));
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "priority_cementable_frontiers_count", active_transactions.priority_cementable_frontiers_size (), sizeof (nano::cementable_account) }));
return composite;
}
Expand Down
Loading