Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize frontiers for background confirmation #1982

Merged
80 changes: 74 additions & 6 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1645,10 +1645,11 @@ TEST (node, bootstrap_bulk_push)
// Bootstrapping a forked open block should succeed.
TEST (node, bootstrap_fork_open)
{
nano::system system0 (24000, 2);
auto delay_frontier_confirmation_height_updating = true;
nano::system system0;
auto node0 = system0.add_node (nano::node_config (24000, system0.logging), delay_frontier_confirmation_height_updating);
auto node1 = system0.add_node (nano::node_config (24001, system0.logging), delay_frontier_confirmation_height_updating);
system0.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto node0 (system0.nodes[0]);
auto node1 (system0.nodes[1]);
nano::keypair key0;
nano::send_block send0 (system0.nodes[0]->latest (nano::test_genesis_key.pub), key0.pub, nano::genesis_amount - 500, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0);
nano::open_block open0 (send0.hash (), 1, key0.pub, key0.prv, key0.pub, 0);
Expand Down Expand Up @@ -2448,8 +2449,9 @@ TEST (node, block_processor_reject_state)

TEST (node, block_processor_reject_rolled_back)
{
nano::system system (24000, 1);
auto & node (*system.nodes[0]);
auto delay_frontier_confirmation_height_updating = true;
nano::system system;
auto & node = *system.add_node (nano::node_config (24000, system.logging), delay_frontier_confirmation_height_updating);
nano::genesis genesis;
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node.work_generate_blocking (*send1);
Expand Down Expand Up @@ -2642,7 +2644,7 @@ TEST (node, unchecked_cleanup)
}
}

/** This checks that a node can be opened (without being blocked) when a write lock is held elsewhere */
/** This checks that a node can be opened (without being blocked) when a write lock is held elsewhere */
TEST (node, dont_write_lock_node)
{
auto path = nano::unique_path ();
Expand Down Expand Up @@ -2675,6 +2677,72 @@ TEST (node, dont_write_lock_node)
finished_promise.set_value ();
}

namespace nano
{
TEST (confirmation_height, prioritize_frontiers)
{
// Prevent frontiers being confirmed as it will affect the priorization checking
auto delay_frontier_confirmation_height_updating = true;
nano::system system;
auto node = system.add_node (nano::node_config (24001, system.logging), delay_frontier_confirmation_height_updating);

nano::keypair key1;
nano::keypair key2;
nano::keypair key3;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
nano::block_hash latest1 (system.nodes[0]->latest (nano::test_genesis_key.pub));
system.wallet (0)->insert_adhoc (key1.prv);
system.wallet (0)->insert_adhoc (key2.prv);
system.wallet (0)->insert_adhoc (key3.prv);

// Send different numbers of blocks all accounts
nano::send_block send1 (latest1, key1.pub, node->config.online_weight_minimum.number () + 10000, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest1));
nano::send_block send2 (send1.hash (), key1.pub, node->config.online_weight_minimum.number () + 8500, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send1.hash ()));
nano::send_block send3 (send2.hash (), key1.pub, node->config.online_weight_minimum.number () + 8000, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send2.hash ()));
nano::send_block send4 (send3.hash (), key2.pub, node->config.online_weight_minimum.number () + 7500, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send3.hash ()));
nano::send_block send5 (send4.hash (), key3.pub, system.nodes.front ()->config.online_weight_minimum.number () + 6500, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send4.hash ()));

// Open all accounts and add other sends to get different
nano::open_block open1 (send1.hash (), nano::genesis_account, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub));
nano::send_block send6 (open1.hash (), nano::test_genesis_key.pub, 500, key1.prv, key1.pub, system.work.generate (open1.hash ()));

nano::open_block open2 (send4.hash (), nano::genesis_account, key2.pub, key2.prv, key2.pub, system.work.generate (key2.pub));

nano::open_block open3 (send5.hash (), nano::genesis_account, key3.pub, key3.prv, key3.pub, system.work.generate (key3.pub));
nano::send_block send7 (open3.hash (), nano::test_genesis_key.pub, 500, key3.prv, key3.pub, system.work.generate (open3.hash ()));
nano::send_block send8 (send7.hash (), nano::test_genesis_key.pub, 200, key3.prv, key3.pub, system.work.generate (send7.hash ()));

{
auto transaction = node->store.tx_begin_write ();
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send1).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send2).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send3).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send4).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send5).code);

ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, open1).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send6).code);

ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, open2).code);

ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, open3).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send7).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send8).code);
}

auto transaction = node->store.tx_begin_read ();
node->active.prioritize_frontiers_for_confirmation (transaction, std::chrono::seconds (1));
constexpr auto num_accounts = 4;
ASSERT_EQ (node->active.priority_cementable_frontiers_size (), num_accounts);
// Check the order of accounts is as expected (greatest number of uncemented blocks at the front)
std::array<nano::account, num_accounts> desired_order { nano::genesis_account, key3.pub, key1.pub, key2.pub };
// clang-format off
std::equal (desired_order.begin (), desired_order.end (), node->active.priority_cementable_frontiers.begin (), node->active.priority_cementable_frontiers.end (), [](nano::account const & account, nano::cementable_account const & cementable_account) {
return (account == cementable_account.account);
});
// clang-format on
}
}

TEST (active_difficulty, recalculate_work)
{
Expand Down
120 changes: 97 additions & 23 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,58 @@ void nano::active_transactions::confirm_frontiers (nano::transaction const & tra
{
// Limit maximum count of elections to start
bool representative (node.config.enable_voting && node.wallets.reps_count > 0);
/* Check less frequently for non-representative nodes
~15 minutes for non-representative nodes, 3 minutes for representatives */
int representative_factor = representative ? 3 * 60 : 15 * 60;
/* Check less frequently for non-representative nodes */
auto representative_factor = representative ? 3min : 15min;
// Decrease check time for test network
int test_network_factor = node.network_params.network.is_test_network () ? 1000 : 1;
if (std::chrono::steady_clock::now () >= next_frontier_check)
auto is_test_network = node.network_params.network.is_test_network ();
int test_network_factor = is_test_network ? 1000 : 1;
auto roots_size = size ();
auto max_elections = (max_broadcast_queue / 4);
auto check_time_exceeded = std::chrono::steady_clock::now () >= next_frontier_check;
auto low_active_elections = roots_size < max_elections;
if (check_time_exceeded || (!is_test_network && low_active_elections))
{
size_t max_elections (max_broadcast_queue / 4);
// When the number of active elections is low increase max number of elections for setting confirmation height.
if (max_broadcast_queue > roots_size + max_elections)
{
max_elections = max_broadcast_queue - roots_size;
}

// Spend time prioritizing accounts to reduce voting traffic
prioritize_frontiers_for_confirmation (transaction_a, is_test_network ? std::chrono::milliseconds (50) : std::chrono::seconds (2));

size_t elections_count (0);
for (auto i (node.store.latest_begin (transaction_a, next_frontier_account)), n (node.store.latest_end ()); i != n && !stopped && elections_count < max_elections; ++i)
std::unique_lock<std::mutex> lock (mutex);
while (!priority_cementable_frontiers.empty () && !stopped && elections_count < max_elections)
{
nano::account_info const & info (i->second);
if (info.block_count != info.confirmation_height)
auto cementable_account = *priority_cementable_frontiers.begin ();
priority_cementable_frontiers.erase (priority_cementable_frontiers.begin ());
lock.unlock ();
nano::account_info info;
auto error = node.store.account_get (transaction_a, cementable_account.account, info);
release_assert (!error);

if (info.block_count > info.confirmation_height && !this->node.pending_confirmation_height.is_processing_block (info.head))
{
auto block (node.store.block_get (transaction_a, info.head));
if (!start (block))
auto block (this->node.store.block_get (transaction_a, info.head));
if (!this->start (block))
{
++elections_count;
// Calculate votes for local representatives
if (representative)
{
node.block_processor.generator.add (block->hash ());
this->node.block_processor.generator.add (block->hash ());
}
}
// Update next account
next_frontier_account = i->first.number () + 1;
}
lock.lock ();
}
lock.unlock ();

// 4 times slower check if all frontiers were confirmed
int fully_confirmed_factor = (elections_count <= max_elections) ? 4 : 1;
int fully_confirmed_factor = (elections_count < max_elections) ? 4 : 1;
// Calculate next check time
next_frontier_check = steady_clock::now () + seconds ((representative_factor * fully_confirmed_factor) / test_network_factor);
// Set next account to 0 if all frontiers were confirmed
next_frontier_account = (elections_count <= max_elections) ? 0 : next_frontier_account;
next_frontier_check = steady_clock::now () + (representative_factor * fully_confirmed_factor / test_network_factor);
}
}

Expand Down Expand Up @@ -313,15 +331,59 @@ void nano::active_transactions::request_loop ()
{
request_confirm (lock);
update_active_difficulty (lock);
const auto extra_delay (std::min (roots.size (), max_broadcast_queue) * node.network.broadcast_interval_ms * 2);
condition.wait_for (lock, std::chrono::milliseconds (node.network_params.network.request_interval_ms + extra_delay));
}
}

// This prevents unnecessary waiting if stopped is set in-between the above check and now
if (stopped)
void nano::active_transactions::prioritize_frontiers_for_confirmation (nano::transaction const & transaction_a, std::chrono::milliseconds time_a)
{
// Don't try to prioritize when there are a large number of pending confirmation heights as blocks can be cemented in the meantime, making the prioritization less reliable
nano::timer<std::chrono::milliseconds> timer;
timer.start ();
if (node.pending_confirmation_height.size () < confirmed_frontiers_max_pending_cut_off)
{
auto priority_cementable_frontiers_size = priority_cementable_frontiers.size ();
auto i (node.store.latest_begin (transaction_a, next_frontier_account));
auto n (node.store.latest_end ());
std::unique_lock<std::mutex> lock_a (mutex, std::defer_lock);
for (; i != n && !stopped && (priority_cementable_frontiers_size < max_priority_cementable_frontiers); ++i)
{
break;
auto const & account (i->first);
auto const & info (i->second);
if (info.block_count > info.confirmation_height && !node.pending_confirmation_height.is_processing_block (info.head))
{
lock_a.lock ();
// clang-format off
auto it = std::find_if (priority_cementable_frontiers.begin (), priority_cementable_frontiers.end (), [&account](auto const & cemented_frontier) {
return (account == cemented_frontier.account);
});
// clang-format on

auto num_uncemented = info.block_count - info.confirmation_height;
// If exists already then remove it first
if (it != priority_cementable_frontiers.end ())
{
priority_cementable_frontiers.erase (it);
}
priority_cementable_frontiers.emplace (account, num_uncemented);
priority_cementable_frontiers_size = priority_cementable_frontiers.size ();
lock_a.unlock ();
}

next_frontier_account = account.number () + 1;

if (timer.since_start () >= time_a)
{
break;
}
}

const auto extra_delay (std::min (roots.size (), max_broadcast_queue) * node.network.broadcast_interval_ms * 2);
condition.wait_for (lock, std::chrono::milliseconds (node.network_params.network.request_interval_ms + extra_delay));
// Go back to the beginning when we have reached the end of the accounts
if (i == n)
{
next_frontier_account = 0;
}
}
}

Expand Down Expand Up @@ -703,6 +765,17 @@ void nano::active_transactions::confirm_block (nano::block_hash const & hash_a)
}
}

size_t nano::active_transactions::priority_cementable_frontiers_size ()
{
std::lock_guard<std::mutex> guard (mutex);
return priority_cementable_frontiers.size ();
}

nano::cementable_account::cementable_account (nano::account const & account_a, size_t blocks_uncemented_a) :
account (account_a), blocks_uncemented (blocks_uncemented_a)
{
}

namespace nano
{
std::unique_ptr<seq_con_info_component> collect_seq_con_info (active_transactions & active_transactions, const std::string & name)
Expand All @@ -722,6 +795,7 @@ std::unique_ptr<seq_con_info_component> collect_seq_con_info (active_transaction
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "roots", roots_count, sizeof (decltype (active_transactions.roots)::value_type) }));
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "blocks", blocks_count, sizeof (decltype (active_transactions.blocks)::value_type) }));
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "confirmed", confirmed_count, sizeof (decltype (active_transactions.confirmed)::value_type) }));
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "priority_cementable_frontiers_count", active_transactions.priority_cementable_frontiers_size (), sizeof (nano::cementable_account) }));
return composite;
}

Expand Down
25 changes: 24 additions & 1 deletion nano/node/active_transactions.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <nano/lib/numbers.hpp>
#include <nano/lib/timer.hpp>
#include <nano/secure/common.hpp>

#include <boost/circular_buffer.hpp>
Expand All @@ -9,13 +10,15 @@
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/pool/pool_alloc.hpp>
#include <boost/thread/thread.hpp>

#include <atomic>
#include <condition_variable>
#include <deque>
#include <memory>
#include <queue>
#include <set>
#include <unordered_map>

namespace nano
Expand Down Expand Up @@ -61,6 +64,15 @@ class transaction_counter final
std::mutex mutex;
};

class cementable_account final
{
public:
cementable_account (nano::account const & account_a, size_t blocks_uncemented_a);
nano::account account;
// This is just used for sorting
size_t blocks_uncemented{ 0 };
};

// Core class for determining consensus
// Holds all active blocks i.e. recently added blocks that need confirmation
class active_transactions final
Expand Down Expand Up @@ -123,6 +135,7 @@ class active_transactions final
static size_t constexpr max_broadcast_queue = 1000;
boost::circular_buffer<double> multipliers_cb;
uint64_t trended_active_difficulty;
size_t priority_cementable_frontiers_size ();

private:
// Call action with confirmed block, may be different than what we started with
Expand All @@ -137,8 +150,18 @@ class active_transactions final
std::condition_variable condition;
bool started{ false };
std::atomic<bool> stopped{ false };
static size_t constexpr confirmed_frontiers_max_pending_cut_off = 100;
// clang-format off
std::function<bool (nano::cementable_account const &, nano::cementable_account const &)> comp = [] (nano::cementable_account const & lhs, nano::cementable_account const & rhs) {
return lhs.blocks_uncemented < rhs.blocks_uncemented;
};
// clang-format on
void prioritize_frontiers_for_confirmation (nano::transaction const &, std::chrono::milliseconds);
std::set<nano::cementable_account, decltype (comp), boost::fast_pool_allocator<nano::cementable_account>> priority_cementable_frontiers{ comp };
static size_t constexpr max_priority_cementable_frontiers{ 100000 };
static size_t constexpr confirmed_frontiers_max_pending_cut_off{ 1000 };
boost::thread thread;

friend class confirmation_height_prioritize_frontiers_Test;
};

std::unique_ptr<seq_con_info_component> collect_seq_con_info (active_transactions & active_transactions, const std::string & name);
Expand Down