Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove election restart by difficulty #3280

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 0 additions & 57 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -964,63 +964,6 @@ TEST (active_transactions, confirm_new)
ASSERT_TIMELY (5s, node1.ledger.cache.cemented_count == 2 && node2.ledger.cache.cemented_count == 2);
}

TEST (active_transactions, restart_dropped)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node = *system.add_node (node_config);
nano::genesis genesis;
auto send = nano::state_block_builder ()
.account (nano::dev_genesis_key.pub)
.previous (genesis.hash ())
.representative (nano::dev_genesis_key.pub)
.balance (nano::genesis_amount - nano::xrb_ratio)
.link (nano::dev_genesis_key.pub)
.sign (nano::dev_genesis_key.prv, nano::dev_genesis_key.pub)
.work (*system.work.generate (genesis.hash ()))
.build_shared (); // Process only in ledger and simulate dropping the election
ASSERT_EQ (nano::process_result::progress, node.process (*send).code);
// Generate higher difficulty work
ASSERT_TRUE (node.work_generate_blocking (*send, send->difficulty () + 1).is_initialized ());
// Process the same block with updated work
ASSERT_EQ (0, node.active.size ());
node.process_active (send);
node.block_processor.flush ();
node.scheduler.flush ();
ASSERT_EQ (1, node.active.size ());
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::election_restart));
auto ledger_block (node.store.block_get (node.store.tx_begin_read (), send->hash ()));
ASSERT_NE (nullptr, ledger_block);
// Exact same block, including work value must have been re-written
ASSERT_EQ (*send, *ledger_block);
// Drop election
node.active.erase (*send);
ASSERT_EQ (0, node.active.size ());
// Try to restart election with the same difficulty
node.process_active (send);
node.block_processor.flush ();
node.scheduler.flush ();
ASSERT_EQ (0, node.active.size ());
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::election_restart));
// Generate even higher difficulty work
ASSERT_TRUE (node.work_generate_blocking (*send, send->difficulty () + 1).is_initialized ());
// Add voting
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
// Process the same block with updated work
ASSERT_EQ (0, node.active.size ());
node.process_active (send);
node.block_processor.flush ();
node.scheduler.flush ();
ASSERT_EQ (1, node.active.size ());
ASSERT_EQ (1, node.ledger.cache.cemented_count);
ASSERT_EQ (2, node.stats.count (nano::stat::type::election, nano::stat::detail::election_restart));
// Wait for the election to complete
ASSERT_TIMELY (5s, node.ledger.cache.cemented_count == 2);
// Verify the block is eventually updated in the ledger
ASSERT_TIMELY (3s, node.store.block_get (node.store.tx_begin_read (), send->hash ())->block_work () == send->block_work ());
}

// Ensures votes are tallied on election::publish even if no vote is inserted through inactive_votes_cache
TEST (active_transactions, conflicting_block_vote_existing_election)
{
Expand Down
10 changes: 0 additions & 10 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4586,8 +4586,6 @@ TEST (node, deferred_dependent_elections)
node.process_local (open);
node.block_processor.flush ();
ASSERT_FALSE (node.active.active (open->qualified_root ()));
/// However, work is still updated
ASSERT_TIMELY (3s, node.store.block_get (node.store.tx_begin_read (), open->hash ())->block_work () == open->block_work ());

// It is however possible to manually start an election from elsewhere
node.block_confirm (open);
Expand All @@ -4601,8 +4599,6 @@ TEST (node, deferred_dependent_elections)
node.process_local (open);
node.block_processor.flush ();
ASSERT_FALSE (node.active.active (open->qualified_root ()));
/// However, work is still updated
ASSERT_TIMELY (3s, node.store.block_get (node.store.tx_begin_read (), open->hash ())->block_work () == open->block_work ());

// Frontier confirmation also starts elections
ASSERT_NO_ERROR (system.poll_until_true (5s, [&node, &send2] {
Expand Down Expand Up @@ -4652,12 +4648,6 @@ TEST (node, deferred_dependent_elections)
election_send2->force_confirm ();
ASSERT_TIMELY (2s, node.block_confirmed (send2->hash ()));
ASSERT_TIMELY (2s, node.active.active (receive->qualified_root ()));
node.active.erase (*receive);
ASSERT_FALSE (node.active.active (receive->qualified_root ()));
node.work_generate_blocking (*receive, receive->difficulty () + 1);
node.process_local (receive);
node.block_processor.flush ();
ASSERT_TRUE (node.active.active (receive->qualified_root ()));
}
}

Expand Down
28 changes: 0 additions & 28 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -963,34 +963,6 @@ std::shared_ptr<nano::block> nano::active_transactions::winner (nano::block_hash
return result;
}

void nano::active_transactions::restart (nano::transaction const & transaction_a, std::shared_ptr<nano::block> const & block_a)
{
auto hash (block_a->hash ());
auto ledger_block (node.store.block_get (transaction_a, hash));
if (ledger_block != nullptr && ledger_block->block_work () != block_a->block_work () && !node.block_confirmed_or_being_confirmed (transaction_a, hash))
{
if (block_a->difficulty () > ledger_block->difficulty ())
{
// Re-writing the block is necessary to avoid the same work being received later to force restarting the election
// The existing block is re-written, not the arriving block, as that one might not have gone through a full signature check
ledger_block->block_work_set (block_a->block_work ());

// Deferred write
node.block_processor.update (ledger_block);

// Restart election for the upgraded block, previously dropped from elections
if (node.ledger.dependents_confirmed (transaction_a, *ledger_block))
{
node.stats.inc (nano::stat::type::election, nano::stat::detail::election_restart);
auto previous_balance = node.ledger.balance (transaction_a, ledger_block->previous ());
auto block_has_account = ledger_block->type () == nano::block_type::state || ledger_block->type () == nano::block_type::open;
auto account = block_has_account ? ledger_block->account () : ledger_block->sideband ().account;
scheduler.activate (account, transaction_a);
}
}
}
}

std::deque<nano::election_status> nano::active_transactions::list_recently_cemented ()
{
nano::lock_guard<nano::mutex> lock (mutex);
Expand Down
2 changes: 0 additions & 2 deletions nano/node/active_transactions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ class active_transactions final
bool active (nano::qualified_root const &);
std::shared_ptr<nano::election> election (nano::qualified_root const &) const;
std::shared_ptr<nano::block> winner (nano::block_hash const &) const;
// Returns false if the election was restarted
void restart (nano::transaction const &, std::shared_ptr<nano::block> const &);
// Returns a list of elections sorted by difficulty
std::vector<std::shared_ptr<nano::election>> list_active (size_t = std::numeric_limits<size_t>::max ());
void erase (nano::block const &);
Expand Down
118 changes: 44 additions & 74 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,6 @@ void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
condition.notify_all ();
}

void nano::block_processor::update (std::shared_ptr<nano::block> const & block_a)
{
{
nano::lock_guard<nano::mutex> lock (mutex);
updates.push_back (block_a);
}
condition.notify_all ();
}

void nano::block_processor::wait_write ()
{
nano::lock_guard<nano::mutex> lock (mutex);
Expand Down Expand Up @@ -188,7 +179,7 @@ bool nano::block_processor::should_log ()
bool nano::block_processor::have_blocks_ready ()
{
debug_assert (!mutex.try_lock ());
return !blocks.empty () || !forced.empty () || !updates.empty ();
return !blocks.empty () || !forced.empty ();
}

bool nano::block_processor::have_blocks ()
Expand Down Expand Up @@ -245,82 +236,67 @@ void nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
lock_a.lock ();
timer_l.start ();
// Processing blocks
unsigned number_of_blocks_processed (0), number_of_forced_processed (0), number_of_updates_processed (0);
unsigned number_of_blocks_processed (0), number_of_forced_processed (0);
auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); };
auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; };
auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; };
while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !awaiting_write && !store_batch_reached ())
{
if ((blocks.size () + state_block_signature_verification.size () + forced.size () + updates.size () > 64) && should_log ())
if ((blocks.size () + state_block_signature_verification.size () + forced.size () > 64) && should_log ())
{
node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced, %4% updates) in processing queue") % blocks.size () % state_block_signature_verification.size () % forced.size () % updates.size ()));
node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_block_signature_verification.size () % forced.size ()));
}
if (!updates.empty ())
nano::unchecked_info info;
nano::block_hash hash (0);
bool force (false);
if (forced.empty ())
{
auto block (updates.front ());
updates.pop_front ();
lock_a.unlock ();
auto hash (block->hash ());
if (node.store.block_exists (transaction, hash))
{
node.store.block_put (transaction, hash, *block);
}
++number_of_updates_processed;
info = blocks.front ();
blocks.pop_front ();
hash = info.block->hash ();
}
else
{
nano::unchecked_info info;
nano::block_hash hash (0);
bool force (false);
if (forced.empty ())
{
info = blocks.front ();
blocks.pop_front ();
hash = info.block->hash ();
}
else
{
info = nano::unchecked_info (forced.front (), 0, nano::seconds_since_epoch (), nano::signature_verification::unknown);
forced.pop_front ();
hash = info.block->hash ();
force = true;
number_of_forced_processed++;
}
lock_a.unlock ();
if (force)
info = nano::unchecked_info (forced.front (), 0, nano::seconds_since_epoch (), nano::signature_verification::unknown);
forced.pop_front ();
hash = info.block->hash ();
force = true;
number_of_forced_processed++;
}
lock_a.unlock ();
if (force)
{
auto successor (node.ledger.successor (transaction, info.block->qualified_root ()));
if (successor != nullptr && successor->hash () != hash)
{
auto successor (node.ledger.successor (transaction, info.block->qualified_root ()));
if (successor != nullptr && successor->hash () != hash)
// Replace our block with the winner and roll back any dependent blocks
if (node.config.logging.ledger_rollback_logging ())
{
// Replace our block with the winner and roll back any dependent blocks
if (node.config.logging.ledger_rollback_logging ())
{
node.logger.always_log (boost::str (boost::format ("Rolling back %1% and replacing with %2%") % successor->hash ().to_string () % hash.to_string ()));
}
std::vector<std::shared_ptr<nano::block>> rollback_list;
if (node.ledger.rollback (transaction, successor->hash (), rollback_list))
{
node.logger.always_log (nano::severity_level::error, boost::str (boost::format ("Failed to roll back %1% because it or a successor was confirmed") % successor->hash ().to_string ()));
}
else if (node.config.logging.ledger_rollback_logging ())
{
node.logger.always_log (boost::str (boost::format ("%1% blocks rolled back") % rollback_list.size ()));
}
// Deleting from votes cache, stop active transaction
for (auto & i : rollback_list)
node.logger.always_log (boost::str (boost::format ("Rolling back %1% and replacing with %2%") % successor->hash ().to_string () % hash.to_string ()));
}
std::vector<std::shared_ptr<nano::block>> rollback_list;
if (node.ledger.rollback (transaction, successor->hash (), rollback_list))
{
node.logger.always_log (nano::severity_level::error, boost::str (boost::format ("Failed to roll back %1% because it or a successor was confirmed") % successor->hash ().to_string ()));
}
else if (node.config.logging.ledger_rollback_logging ())
{
node.logger.always_log (boost::str (boost::format ("%1% blocks rolled back") % rollback_list.size ()));
}
// Deleting from votes cache, stop active transaction
for (auto & i : rollback_list)
{
node.history.erase (i->root ());
// Stop all rolled back active transactions except initial
if (i->hash () != successor->hash ())
{
node.history.erase (i->root ());
// Stop all rolled back active transactions except initial
if (i->hash () != successor->hash ())
{
node.active.erase (*i);
}
node.active.erase (*i);
}
}
}
number_of_blocks_processed++;
process_one (transaction, post_events, info, force);
}
number_of_blocks_processed++;
process_one (transaction, post_events, info, force);
lock_a.lock ();
}
awaiting_write = false;
Expand Down Expand Up @@ -454,7 +430,6 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
{
node.logger.try_log (boost::str (boost::format ("Old for: %1%") % hash.to_string ()));
}
events_a.events.emplace_back ([this, block = info_a.block, origin_a] (nano::transaction const & post_event_transaction_a) { process_old (post_event_transaction_a, block, origin_a); });
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::old);
break;
}
Expand Down Expand Up @@ -541,11 +516,6 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
return result;
}

void nano::block_processor::process_old (nano::transaction const & transaction_a, std::shared_ptr<nano::block> const & block_a, nano::block_origin const origin_a)
{
node.active.restart (transaction_a, block_a);
}

void nano::block_processor::queue_unchecked (nano::write_transaction const & transaction_a, nano::hash_or_account const & hash_or_account_a)
{
auto unchecked_blocks (node.store.unchecked_get (transaction_a, hash_or_account_a.hash));
Expand Down
3 changes: 0 additions & 3 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class block_processor final
void add (nano::unchecked_info const &);
void add (std::shared_ptr<nano::block> const &, uint64_t = 0);
void force (std::shared_ptr<nano::block> const &);
void update (std::shared_ptr<nano::block> const &);
void wait_write ();
bool should_log ();
bool have_blocks_ready ();
Expand All @@ -74,7 +73,6 @@ class block_processor final
void queue_unchecked (nano::write_transaction const &, nano::hash_or_account const &);
void process_batch (nano::unique_lock<nano::mutex> &);
void process_live (nano::transaction const &, nano::block_hash const &, std::shared_ptr<nano::block> const &, nano::process_return const &, nano::block_origin const = nano::block_origin::remote);
void process_old (nano::transaction const &, std::shared_ptr<nano::block> const &, nano::block_origin const);
void requeue_invalid (nano::block_hash const &, nano::unchecked_info const &);
void process_verified_state_blocks (std::deque<nano::unchecked_info> &, std::vector<int> const &, std::vector<nano::block_hash> const &, std::vector<nano::signature> const &);
bool stopped{ false };
Expand All @@ -83,7 +81,6 @@ class block_processor final
std::chrono::steady_clock::time_point next_log;
std::deque<nano::unchecked_info> blocks;
std::deque<std::shared_ptr<nano::block>> forced;
std::deque<std::shared_ptr<nano::block>> updates;
nano::condition_variable condition;
nano::node & node;
nano::write_database_queue & write_database_queue;
Expand Down