diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index bb64797d3e..317b98dabc 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -1439,23 +1439,20 @@ TEST (confirmation_height, single) } // Check confirmation heights after - { - auto transaction = system.nodes[0]->store.tx_begin_write (); - ASSERT_FALSE (system.nodes[0]->store.account_get (transaction, nano::test_genesis_key.pub, account_info)); - ASSERT_EQ (2, account_info.confirmation_height); + auto transaction = system.nodes[0]->store.tx_begin_write (); + ASSERT_FALSE (system.nodes[0]->store.account_get (transaction, nano::test_genesis_key.pub, account_info)); + ASSERT_EQ (2, account_info.confirmation_height); - auto transaction1 = system.nodes[1]->store.tx_begin_read (); - ASSERT_FALSE (system.nodes[1]->store.account_get (transaction1, nano::test_genesis_key.pub, account_info)); - ASSERT_EQ (2, account_info.confirmation_height); + auto transaction1 = system.nodes[1]->store.tx_begin_read (); + ASSERT_FALSE (system.nodes[1]->store.account_get (transaction1, nano::test_genesis_key.pub, account_info)); + ASSERT_EQ (2, account_info.confirmation_height); - // Rollback should fail as this transaction has been cemented - ASSERT_TRUE (system.nodes[0]->ledger.rollback (transaction, block1->hash ())); - } + // Rollback should fail as this transaction has been cemented + ASSERT_TRUE (system.nodes[0]->ledger.rollback (transaction, block1->hash ())); } -TEST (confirmation_height, multiple) +TEST (confirmation_height, multiple_accounts) { - auto amount (std::numeric_limits::max ()); bool delay_frontier_confirmation_height_updating = true; nano::system system (24000, 2, delay_frontier_confirmation_height_updating); nano::keypair key1; @@ -1468,9 +1465,9 @@ TEST (confirmation_height, multiple) system.wallet (1)->insert_adhoc (key3.prv); // Send to all accounts - nano::send_block send1 (latest1, key1.pub, 60000 * nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest1)); - nano::send_block send2 (send1.hash (), key2.pub, 60000 * nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send1.hash ())); - nano::send_block send3 (send2.hash (), key3.pub, 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send2.hash ())); + nano::send_block send1 (latest1, key1.pub, system.nodes.front ()->config.online_weight_minimum.number () + 300, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest1)); + nano::send_block send2 (send1.hash (), key2.pub, system.nodes.front ()->config.online_weight_minimum.number () + 200, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send1.hash ())); + nano::send_block send3 (send2.hash (), key3.pub, system.nodes.front ()->config.online_weight_minimum.number () + 100, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send2.hash ())); // Open all accounts nano::open_block open1 (send1.hash (), nano::genesis_account, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub)); @@ -1552,6 +1549,8 @@ TEST (confirmation_height, multiple) ASSERT_EQ (2, account_info.confirmation_height); ASSERT_EQ (2, account_info.block_count); + ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in), 10); + // The accounts for key1 and key2 have 1 more block in the chain than is confirmed. // So this can be rolled back, but the one before that cannot. Check that this is the case { @@ -1642,7 +1641,6 @@ TEST (confirmation_height, gap_live) nano::system system (24000, 2, delay_frontier_confirmation_height_updating); nano::keypair destination; system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); - nano::block_hash latest1 (system.nodes[0]->latest (nano::test_genesis_key.pub)); system.wallet (1)->insert_adhoc (destination.prv); nano::genesis genesis; @@ -1710,6 +1708,232 @@ TEST (confirmation_height, gap_live) } } +TEST (confirmation_height, send_receive_between_2_accounts) +{ + bool delay_frontier_confirmation_height_updating = true; + nano::system system (24000, 1, delay_frontier_confirmation_height_updating); + auto node = system.nodes.front (); + nano::keypair key1; + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + nano::block_hash latest (node->latest (nano::test_genesis_key.pub)); + system.wallet (0)->insert_adhoc (key1.prv); + + nano::send_block send1 (latest, key1.pub, node->config.online_weight_minimum.number () + 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest)); + nano::open_block open1 (send1.hash (), nano::genesis_account, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub)); + + nano::send_block send2 (open1.hash (), nano::genesis_account, 1000, key1.prv, key1.pub, system.work.generate (open1.hash ())); + nano::send_block send3 (send2.hash (), nano::genesis_account, 900, key1.prv, key1.pub, system.work.generate (send2.hash ())); + nano::send_block send4 (send3.hash (), nano::genesis_account, 500, key1.prv, key1.pub, system.work.generate (send3.hash ())); + + nano::receive_block receive1 (send1.hash (), send2.hash (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send1.hash ())); + nano::receive_block receive2 (receive1.hash (), send3.hash (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (receive1.hash ())); + nano::receive_block receive3 (receive2.hash (), send4.hash (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (receive2.hash ())); + + nano::send_block send5 (receive3.hash (), key1.pub, node->config.online_weight_minimum.number () + 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (receive3.hash ())); + auto receive4 = std::make_shared (send4.hash (), send5.hash (), key1.prv, key1.pub, system.work.generate (send4.hash ())); + // Unpocketed send + nano::keypair key2; + nano::send_block send6 (send5.hash (), key2.pub, node->config.online_weight_minimum.number (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send5.hash ())); + { + auto transaction = node->store.tx_begin_write (); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send1).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, open1).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send2).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive1).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send3).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send4).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive2).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive3).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send5).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send6).code); + } + + node->process_active (receive4); + node->block_processor.flush (); + + system.deadline_set (10s); + while (true) + { + auto transaction = node->store.tx_begin_read (); + if (node->ledger.block_confirmed (transaction, receive4->hash ())) + { + break; + } + + ASSERT_NO_ERROR (system.poll ()); + } + + auto transaction (node->store.tx_begin ()); + + nano::account_info account_info; + ASSERT_FALSE (node->store.account_get (transaction, nano::test_genesis_key.pub, account_info)); + ASSERT_EQ (6, account_info.confirmation_height); + ASSERT_EQ (7, account_info.block_count); + + ASSERT_FALSE (node->store.account_get (transaction, key1.pub, account_info)); + ASSERT_EQ (5, account_info.confirmation_height); + ASSERT_EQ (5, account_info.block_count); + + ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in), 10); +} + +TEST (confirmation_height, send_receive_self) +{ + bool delay_frontier_confirmation_height_updating = true; + nano::system system (24000, 1, delay_frontier_confirmation_height_updating); + auto node = system.nodes.front (); + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + nano::block_hash latest (node->latest (nano::test_genesis_key.pub)); + + nano::send_block send1 (latest, nano::test_genesis_key.pub, nano::genesis_amount - 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest)); + nano::receive_block receive1 (send1.hash (), send1.hash (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send1.hash ())); + nano::send_block send2 (receive1.hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (receive1.hash ())); + nano::send_block send3 (send2.hash (), nano::test_genesis_key.pub, nano::genesis_amount - 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send2.hash ())); + + nano::receive_block receive2 (send3.hash (), send2.hash (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send3.hash ())); + auto receive3 = std::make_shared (receive2.hash (), send3.hash (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (receive2.hash ())); + + // Send to another account to prevent automatic receiving on the genesis account + nano::keypair key1; + nano::send_block send4 (receive3->hash (), key1.pub, node->config.online_weight_minimum.number (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (receive3->hash ())); + { + auto transaction = node->store.tx_begin_write (); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send1).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive1).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send2).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send3).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive2).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *receive3).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send4).code); + } + + node->block_confirm (receive3); + + system.deadline_set (10s); + while (true) + { + auto transaction = node->store.tx_begin_read (); + if (node->ledger.block_confirmed (transaction, receive3->hash ())) + { + break; + } + + ASSERT_NO_ERROR (system.poll ()); + } + + auto transaction (node->store.tx_begin ()); + nano::account_info account_info; + ASSERT_FALSE (node->store.account_get (transaction, nano::test_genesis_key.pub, account_info)); + ASSERT_EQ (7, account_info.confirmation_height); + ASSERT_EQ (8, account_info.block_count); + ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in), 6); +} + +TEST (confirmation_height, all_block_types) +{ + bool delay_frontier_confirmation_height_updating = true; + nano::system system (24000, 1, delay_frontier_confirmation_height_updating); + auto node = system.nodes.front (); + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + nano::block_hash latest (node->latest (nano::test_genesis_key.pub)); + nano::keypair key1; + nano::keypair key2; + auto & store = node->store; + nano::send_block send (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest)); + nano::send_block send1 (send.hash (), key2.pub, nano::genesis_amount - nano::Gxrb_ratio * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send.hash ())); + + nano::open_block open (send.hash (), nano::test_genesis_key.pub, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub)); + nano::state_block state_open (key2.pub, 0, 0, nano::Gxrb_ratio, send1.hash (), key2.prv, key2.pub, system.work.generate (key2.pub)); + + nano::send_block send2 (open.hash (), key2.pub, 0, key1.prv, key1.pub, system.work.generate (open.hash ())); + nano::state_block state_receive (key2.pub, state_open.hash (), 0, nano::Gxrb_ratio * 2, send2.hash (), key2.prv, key2.pub, system.work.generate (state_open.hash ())); + + nano::state_block state_send (key2.pub, state_receive.hash (), 0, nano::Gxrb_ratio, key1.pub, key2.prv, key2.pub, system.work.generate (state_receive.hash ())); + nano::receive_block receive (send2.hash (), state_send.hash (), key1.prv, key1.pub, system.work.generate (send2.hash ())); + + nano::change_block change (receive.hash (), key2.pub, key1.prv, key1.pub, system.work.generate (receive.hash ())); + + nano::state_block state_change (key2.pub, state_send.hash (), nano::test_genesis_key.pub, nano::Gxrb_ratio, 0, key2.prv, key2.pub, system.work.generate (state_send.hash ())); + + nano::keypair epoch_key; + node->ledger.epoch_signer = epoch_key.pub; + + nano::state_block epoch (key2.pub, state_change.hash (), nano::test_genesis_key.pub, nano::Gxrb_ratio, node->ledger.epoch_link, epoch_key.prv, epoch_key.pub, system.work.generate (state_change.hash ())); + + nano::state_block epoch1 (key1.pub, change.hash (), key2.pub, nano::Gxrb_ratio, node->ledger.epoch_link, epoch_key.prv, epoch_key.pub, system.work.generate (change.hash ())); + nano::state_block state_send1 (key1.pub, epoch1.hash (), 0, nano::Gxrb_ratio - 1, key2.pub, key1.prv, key1.pub, system.work.generate (epoch1.hash ())); + nano::state_block state_receive2 (key2.pub, epoch.hash (), 0, nano::Gxrb_ratio + 1, state_send1.hash (), key2.prv, key2.pub, system.work.generate (epoch.hash ())); + + auto state_send2 = std::make_shared (key2.pub, state_receive2.hash (), 0, nano::Gxrb_ratio, key1.pub, key2.prv, key2.pub, system.work.generate (state_receive2.hash ())); + nano::state_block state_send3 (key2.pub, state_send2->hash (), 0, nano::Gxrb_ratio - 1, key1.pub, key2.prv, key2.pub, system.work.generate (state_send2->hash ())); + + nano::state_block state_send4 (key1.pub, state_send1.hash (), 0, nano::Gxrb_ratio - 2, nano::test_genesis_key.pub, key1.prv, key1.pub, system.work.generate (state_send1.hash ())); + nano::state_block state_receive3 (nano::genesis_account, send1.hash (), nano::genesis_account, nano::genesis_amount - nano::Gxrb_ratio * 2 + 1, state_send4.hash (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send1.hash ())); + + { + auto transaction (store.tx_begin_write ()); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send1).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, open).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, state_open).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send2).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, state_receive).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, state_send).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, change).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, state_change).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, epoch).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, epoch1).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, state_send1).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, state_receive2).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *state_send2).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, state_send3).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, state_send4).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, state_receive3).code); + } + + node->block_confirm (state_send2); + + system.deadline_set (10s); + while (true) + { + auto transaction = node->store.tx_begin_read (); + if (node->ledger.block_confirmed (transaction, state_send2->hash ())) + { + break; + } + + ASSERT_NO_ERROR (system.poll ()); + } + + auto transaction (node->store.tx_begin ()); + nano::account_info account_info; + ASSERT_FALSE (node->store.account_get (transaction, nano::test_genesis_key.pub, account_info)); + ASSERT_EQ (3, account_info.confirmation_height); + ASSERT_LE (4, account_info.block_count); + + ASSERT_FALSE (node->store.account_get (transaction, key1.pub, account_info)); + ASSERT_EQ (6, account_info.confirmation_height); + ASSERT_LE (7, account_info.block_count); + + ASSERT_FALSE (node->store.account_get (transaction, key2.pub, account_info)); + ASSERT_EQ (7, account_info.confirmation_height); + ASSERT_LE (8, account_info.block_count); + + ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in), 15); +} + TEST (bootstrap, tcp_listener_timeout_empty) { nano::system system (24000, 1); diff --git a/nano/lib/errors.cpp b/nano/lib/errors.cpp index 705ab4595e..c3776ab395 100644 --- a/nano/lib/errors.cpp +++ b/nano/lib/errors.cpp @@ -146,6 +146,8 @@ std::string nano::error_rpc_messages::message (int ev) const return "Representative account and previous hash required"; case nano::error_rpc::block_create_requirements_send: return "Destination account, previous hash, current balance and amount required"; + case nano::error_rpc::confirmation_height_not_processing: + return "There are no blocks currently being processed for adding confirmation height"; case nano::error_rpc::confirmation_not_found: return "Active confirmation not found"; case nano::error_rpc::difficulty_limit: diff --git a/nano/lib/errors.hpp b/nano/lib/errors.hpp index dcf57556f1..41a0dfe435 100644 --- a/nano/lib/errors.hpp +++ b/nano/lib/errors.hpp @@ -89,6 +89,7 @@ enum class error_rpc block_create_requirements_receive, block_create_requirements_change, block_create_requirements_send, + confirmation_height_not_processing, confirmation_not_found, difficulty_limit, invalid_balance, diff --git a/nano/lib/utility.cpp b/nano/lib/utility.cpp index 75035bf229..57b6993fec 100644 --- a/nano/lib/utility.cpp +++ b/nano/lib/utility.cpp @@ -105,6 +105,9 @@ namespace thread_role case nano::thread_role::name::rpc_process_container: thread_role_name_string = "RPC process"; break; + case nano::thread_role::name::confirmation_height_processing: + thread_role_name_string = "Conf height"; + break; } /* diff --git a/nano/lib/utility.hpp b/nano/lib/utility.hpp index 1bba5bfa96..62c2a443b7 100644 --- a/nano/lib/utility.hpp +++ b/nano/lib/utility.hpp @@ -88,7 +88,8 @@ namespace thread_role signature_checking, rpc_request_processor, rpc_process_container, - work_watcher + work_watcher, + confirmation_height_processing }; /* * Get/Set the identifier for the current thread diff --git a/nano/nano_node/entry.cpp b/nano/nano_node/entry.cpp index 00029d3cf1..8301cea3c9 100644 --- a/nano/nano_node/entry.cpp +++ b/nano/nano_node/entry.cpp @@ -98,7 +98,7 @@ int main (int argc, char * const * argv) ("debug_random_feed", "Generates output to RNG test suites") ("debug_validate_blocks", "Check all blocks for correct hash, signature, work value") ("debug_peers", "Display peer IPv6:port connections") - ("debug_ipc", "Read an IPC command in JSON from stdin and invoke it. Network operations will have no effect") + ("debug_cemented_block_count", "Displays the number of cemented (confirmed) blocks") ("platform", boost::program_options::value (), "Defines the for OpenCL commands") ("device", boost::program_options::value (), "Defines for OpenCL command") ("threads", boost::program_options::value (), "Defines count for OpenCL command") @@ -960,6 +960,19 @@ int main (int argc, char * const * argv) std::cout << boost::str (boost::format ("%1%\n") % nano::endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ())); } } + else if (vm.count ("debug_cemented_block_count")) + { + nano::inactive_node node (data_path); + auto transaction (node.node->store.tx_begin ()); + + uint64_t sum = 0; + for (auto i (node.node->store.latest_begin (transaction)), n (node.node->store.latest_end ()); i != n; ++i) + { + nano::account_info info (i->second); + sum += info.confirmation_height; + } + std::cout << "Total cemented block count: " << sum << std::endl; + } else if (vm.count ("version")) { if (NANO_VERSION_PATCH == 0) diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 9ecc3a8672..2d5d346147 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -12,6 +12,9 @@ endif () add_library (node ${platform_sources} + active_transactions.hpp + active_transactions.cpp + blockprocessor.cpp blockprocessor.hpp blockprocessor.cpp bootstrap.hpp @@ -20,8 +23,10 @@ add_library (node cli.cpp common.hpp common.cpp - daemonconfig.cpp + confirmation_height_processor.hpp + confirmation_height_processor.cpp daemonconfig.hpp + daemonconfig.cpp ipc.hpp ipc.cpp ipcconfig.hpp diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp new file mode 100644 index 0000000000..66dbc35b3c --- /dev/null +++ b/nano/node/active_transactions.cpp @@ -0,0 +1,640 @@ +#include + +#include +#include + +size_t constexpr nano::active_transactions::max_broadcast_queue; + +using namespace std::chrono; + +nano::active_transactions::active_transactions (nano::node & node_a, bool delay_frontier_confirmation_height_updating) : +node (node_a), +difficulty_cb (20, node.network_params.network.publish_threshold), +trended_active_difficulty (node.network_params.network.publish_threshold), +next_frontier_check (steady_clock::now () + (delay_frontier_confirmation_height_updating ? 60s : 0s)), +thread ([this]() { + nano::thread_role::set (nano::thread_role::name::request_loop); + request_loop (); +}) +{ + std::unique_lock lock (mutex); + while (!started) + { + condition.wait (lock); + } +} + +nano::active_transactions::~active_transactions () +{ + stop (); +} + +void nano::active_transactions::confirm_frontiers (nano::transaction const & transaction_a) +{ + // Limit maximum count of elections to start + bool representative (node.config.enable_voting && node.wallets.reps_count > 0); + /* Check less frequently for non-representative nodes + ~15 minutes for non-representative nodes, 3 minutes for representatives */ + int representative_factor = representative ? 3 * 60 : 15 * 60; + // Decrease check time for test network + int test_network_factor = node.network_params.network.is_test_network () ? 1000 : 1; + if (std::chrono::steady_clock::now () >= next_frontier_check) + { + size_t max_elections (max_broadcast_queue / 4); + size_t elections_count (0); + for (auto i (node.store.latest_begin (transaction_a, next_frontier_account)), n (node.store.latest_end ()); i != n && !stopped && elections_count < max_elections; ++i) + { + nano::account_info info (i->second); + if (info.block_count != info.confirmation_height) + { + auto block (node.store.block_get (transaction_a, info.head)); + if (!start (block)) + { + ++elections_count; + // Calculate votes for local representatives + if (representative) + { + node.block_processor.generator.add (block->hash ()); + } + } + // Update next account + next_frontier_account = i->first.number () + 1; + } + } + // 4 times slower check if all frontiers were confirmed + int fully_confirmed_factor = (elections_count <= max_elections) ? 4 : 1; + // Calculate next check time + next_frontier_check = steady_clock::now () + seconds ((representative_factor * fully_confirmed_factor) / test_network_factor); + // Set next account to 0 if all frontiers were confirmed + next_frontier_account = (elections_count <= max_elections) ? 0 : next_frontier_account; + } +} + +void nano::active_transactions::request_confirm (std::unique_lock & lock_a) +{ + std::unordered_set inactive; + auto transaction (node.store.tx_begin_read ()); + unsigned unconfirmed_count (0); + unsigned unconfirmed_announcements (0); + std::unordered_map, std::vector>> requests_bundle; + std::deque> rebroadcast_bundle; + std::deque, std::shared_ptr>>>> confirm_req_bundle; + + auto roots_size (roots.size ()); + for (auto i (roots.get<1> ().begin ()), n (roots.get<1> ().end ()); i != n; ++i) + { + auto root (i->root); + auto election_l (i->election); + if ((election_l->confirmed || election_l->stopped) && election_l->announcements >= announcement_min - 1) + { + if (election_l->confirmed) + { + confirmed.push_back (election_l->status); + if (confirmed.size () > election_history_size) + { + confirmed.pop_front (); + } + } + inactive.insert (root); + } + else + { + if (election_l->announcements > announcement_long) + { + ++unconfirmed_count; + unconfirmed_announcements += election_l->announcements; + // Log votes for very long unconfirmed elections + if (election_l->announcements % 50 == 1) + { + auto tally_l (election_l->tally (transaction)); + election_l->log_votes (tally_l); + } + /* Escalation for long unconfirmed elections + Start new elections for previous block & source + if there are less than 100 active elections */ + if (election_l->announcements % announcement_long == 1 && roots_size < 100 && !node.network_params.network.is_test_network ()) + { + std::shared_ptr previous; + auto previous_hash (election_l->status.winner->previous ()); + if (!previous_hash.is_zero ()) + { + previous = node.store.block_get (transaction, previous_hash); + if (previous != nullptr) + { + add (std::move (previous)); + } + } + /* If previous block not existing/not commited yet, block_source can cause segfault for state blocks + So source check can be done only if previous != nullptr or previous is 0 (open account) */ + if (previous_hash.is_zero () || previous != nullptr) + { + auto source_hash (node.ledger.block_source (transaction, *election_l->status.winner)); + if (!source_hash.is_zero ()) + { + auto source (node.store.block_get (transaction, source_hash)); + if (source != nullptr) + { + add (std::move (source)); + } + } + } + election_l->update_dependent (); + } + } + if (election_l->announcements < announcement_long || election_l->announcements % announcement_long == 1) + { + if (node.ledger.could_fit (transaction, *election_l->status.winner)) + { + // Broadcast winner + if (rebroadcast_bundle.size () < max_broadcast_queue) + { + rebroadcast_bundle.push_back (election_l->status.winner); + } + } + else + { + if (election_l->announcements != 0) + { + election_l->stop (); + } + } + } + if (election_l->announcements % 4 == 1) + { + auto rep_channels (std::make_shared>> ()); + auto reps (node.rep_crawler.representatives (std::numeric_limits::max ())); + + // Add all rep endpoints that haven't already voted. We use a set since multiple + // reps may exist on an endpoint. + std::unordered_set> channels; + for (auto & rep : reps) + { + if (election_l->last_votes.find (rep.account) == election_l->last_votes.end ()) + { + channels.insert (rep.channel); + + if (node.config.logging.vote_logging ()) + { + node.logger.try_log ("Representative did not respond to confirm_req, retrying: ", rep.account.to_account ()); + } + } + } + + rep_channels->insert (rep_channels->end (), channels.begin (), channels.end ()); + + if ((!rep_channels->empty () && node.rep_crawler.total_weight () > node.config.online_weight_minimum.number ()) || roots_size > 5) + { + // broadcast_confirm_req_base modifies reps, so we clone it once to avoid aliasing + if (!node.network_params.network.is_test_network ()) + { + if (confirm_req_bundle.size () < max_broadcast_queue) + { + confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, rep_channels)); + } + } + else + { + for (auto & rep : *rep_channels) + { + auto rep_request (requests_bundle.find (rep)); + auto block (election_l->status.winner); + auto root_hash (std::make_pair (block->hash (), block->root ())); + if (rep_request == requests_bundle.end ()) + { + if (requests_bundle.size () < max_broadcast_queue) + { + std::vector> insert_vector = { root_hash }; + requests_bundle.insert (std::make_pair (rep, insert_vector)); + } + } + else if (rep_request->second.size () < max_broadcast_queue * nano::network::confirm_req_hashes_max) + { + rep_request->second.push_back (root_hash); + } + } + } + } + else + { + if (!node.network_params.network.is_test_network ()) + { + auto deque_l (node.network.udp_channels.random_set (100)); + auto vec (std::make_shared>> ()); + for (auto i : deque_l) + { + vec->push_back (i); + } + confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, vec)); + } + else + { + for (auto & rep : *rep_channels) + { + auto rep_request (requests_bundle.find (rep)); + auto block (election_l->status.winner); + auto root_hash (std::make_pair (block->hash (), block->root ())); + if (rep_request == requests_bundle.end ()) + { + std::vector> insert_vector = { root_hash }; + requests_bundle.insert (std::make_pair (rep, insert_vector)); + } + else + { + rep_request->second.push_back (root_hash); + } + } + } + } + } + } + ++election_l->announcements; + } + lock_a.unlock (); + // Rebroadcast unconfirmed blocks + if (!rebroadcast_bundle.empty ()) + { + node.network.flood_block_batch (rebroadcast_bundle); + } + // Batch confirmation request + if (!node.network_params.network.is_live_network () && !requests_bundle.empty ()) + { + node.network.broadcast_confirm_req_batch (requests_bundle, 50); + } + //confirm_req broadcast + if (!confirm_req_bundle.empty ()) + { + node.network.broadcast_confirm_req_batch (confirm_req_bundle); + } + // Confirm frontiers when there aren't many confirmations already pending + if (node.pending_confirmation_height.size () < confirmed_frontiers_max_pending_cut_off) + { + confirm_frontiers (transaction); + } + lock_a.lock (); + // Erase inactive elections + for (auto i (inactive.begin ()), n (inactive.end ()); i != n; ++i) + { + auto root_it (roots.find (*i)); + assert (root_it != roots.end ()); + for (auto & block : root_it->election->blocks) + { + auto erased (blocks.erase (block.first)); + (void)erased; + assert (erased == 1); + } + for (auto & dependent_block : root_it->election->dependent_blocks) + { + adjust_difficulty (dependent_block); + } + roots.erase (*i); + } + if (unconfirmed_count > 0) + { + node.logger.try_log (boost::str (boost::format ("%1% blocks have been unconfirmed averaging %2% announcements") % unconfirmed_count % (unconfirmed_announcements / unconfirmed_count))); + } +} + +void nano::active_transactions::request_loop () +{ + std::unique_lock lock (mutex); + started = true; + + lock.unlock (); + condition.notify_all (); + lock.lock (); + + while (!stopped) + { + request_confirm (lock); + update_active_difficulty (lock); + + // This prevents unnecessary waiting if stopped is set in-between the above check and now + if (stopped) + { + break; + } + + const auto extra_delay (std::min (roots.size (), max_broadcast_queue) * node.network.broadcast_interval_ms * 2); + condition.wait_for (lock, std::chrono::milliseconds (node.network_params.network.request_interval_ms + extra_delay)); + } +} + +void nano::active_transactions::stop () +{ + std::unique_lock lock (mutex); + while (!started) + { + condition.wait (lock); + } + stopped = true; + lock.unlock (); + condition.notify_all (); + if (thread.joinable ()) + { + thread.join (); + } + lock.lock (); + roots.clear (); +} + +bool nano::active_transactions::start (std::shared_ptr block_a, std::function)> const & confirmation_action_a) +{ + std::lock_guard lock (mutex); + return add (block_a, confirmation_action_a); +} + +bool nano::active_transactions::add (std::shared_ptr block_a, std::function)> const & confirmation_action_a) +{ + auto error (true); + if (!stopped) + { + auto root (block_a->qualified_root ()); + auto existing (roots.find (root)); + if (existing == roots.end ()) + { + auto election (std::make_shared (node, block_a, confirmation_action_a)); + uint64_t difficulty (0); + auto error (nano::work_validate (*block_a, &difficulty)); + release_assert (!error); + roots.insert (nano::conflict_info{ root, difficulty, difficulty, election }); + blocks.insert (std::make_pair (block_a->hash (), election)); + adjust_difficulty (block_a->hash ()); + } + error = existing != roots.end (); + } + return error; +} + +// Validate a vote and apply it to the current election if one exists +bool nano::active_transactions::vote (std::shared_ptr vote_a, bool single_lock) +{ + std::shared_ptr election; + bool replay (false); + bool processed (false); + { + std::unique_lock lock; + if (!single_lock) + { + lock = std::unique_lock (mutex); + } + for (auto vote_block : vote_a->blocks) + { + nano::election_vote_result result; + if (vote_block.which ()) + { + auto block_hash (boost::get (vote_block)); + auto existing (blocks.find (block_hash)); + if (existing != blocks.end ()) + { + result = existing->second->vote (vote_a->account, vote_a->sequence, block_hash); + } + } + else + { + auto block (boost::get> (vote_block)); + auto existing (roots.find (block->qualified_root ())); + if (existing != roots.end ()) + { + result = existing->election->vote (vote_a->account, vote_a->sequence, block->hash ()); + } + } + replay = replay || result.replay; + processed = processed || result.processed; + } + } + if (processed) + { + node.network.flood_vote (vote_a); + } + return replay; +} + +bool nano::active_transactions::active (nano::qualified_root const & root_a) +{ + std::lock_guard lock (mutex); + return roots.find (root_a) != roots.end (); +} + +bool nano::active_transactions::active (nano::block const & block_a) +{ + return active (block_a.qualified_root ()); +} + +void nano::active_transactions::update_difficulty (nano::block const & block_a) +{ + std::lock_guard lock (mutex); + auto existing (roots.find (block_a.qualified_root ())); + if (existing != roots.end ()) + { + uint64_t difficulty; + auto error (nano::work_validate (block_a, &difficulty)); + assert (!error); + if (difficulty > existing->difficulty) + { + roots.modify (existing, [difficulty](nano::conflict_info & info_a) { + info_a.difficulty = difficulty; + }); + adjust_difficulty (block_a.hash ()); + } + } +} + +void nano::active_transactions::adjust_difficulty (nano::block_hash const & hash_a) +{ + assert (!mutex.try_lock ()); + std::deque> remaining_blocks; + remaining_blocks.emplace_back (hash_a, 0); + std::unordered_set processed_blocks; + std::vector> elections_list; + uint128_t sum (0); + while (!remaining_blocks.empty ()) + { + auto const & item (remaining_blocks.front ()); + auto hash (item.first); + auto level (item.second); + if (processed_blocks.find (hash) == processed_blocks.end ()) + { + auto existing (blocks.find (hash)); + if (existing != blocks.end () && !existing->second->confirmed && !existing->second->stopped && existing->second->status.winner->hash () == hash) + { + auto previous (existing->second->status.winner->previous ()); + if (!previous.is_zero ()) + { + remaining_blocks.emplace_back (previous, level + 1); + } + auto source (existing->second->status.winner->source ()); + if (!source.is_zero () && source != previous) + { + remaining_blocks.emplace_back (source, level + 1); + } + auto link (existing->second->status.winner->link ()); + if (!link.is_zero () && !node.ledger.is_epoch_link (link) && link != previous) + { + remaining_blocks.emplace_back (link, level + 1); + } + for (auto & dependent_block : existing->second->dependent_blocks) + { + remaining_blocks.emplace_back (dependent_block, level - 1); + } + processed_blocks.insert (hash); + nano::qualified_root root (previous, existing->second->status.winner->root ()); + auto existing_root (roots.find (root)); + if (existing_root != roots.end ()) + { + sum += existing_root->difficulty; + elections_list.emplace_back (root, level); + } + } + } + remaining_blocks.pop_front (); + } + if (elections_list.size () > 1) + { + uint64_t average (static_cast (sum / elections_list.size ())); + // Potential overflow check + uint64_t divider (1); + if (elections_list.size () > 1000000 && (average - node.network_params.network.publish_threshold) > elections_list.size ()) + { + divider = ((average - node.network_params.network.publish_threshold) / elections_list.size ()) + 1; + } + // Set adjusted difficulty + for (auto & item : elections_list) + { + auto existing_root (roots.find (item.first)); + uint64_t difficulty_a (average + (item.second / divider)); + roots.modify (existing_root, [difficulty_a](nano::conflict_info & info_a) { + info_a.adjusted_difficulty = difficulty_a; + }); + } + } + // Set adjusted difficulty equals to difficulty + else if (elections_list.size () == 1) + { + auto existing_root (roots.find (elections_list.begin ()->first)); + if (existing_root->difficulty != existing_root->adjusted_difficulty) + { + roots.modify (existing_root, [](nano::conflict_info & info_a) { + info_a.adjusted_difficulty = info_a.difficulty; + }); + } + } +} + +void nano::active_transactions::update_active_difficulty (std::unique_lock & lock_a) +{ + assert (lock_a.mutex () == &mutex && lock_a.owns_lock ()); + uint64_t difficulty (node.network_params.network.publish_threshold); + if (!roots.empty ()) + { + uint128_t min = roots.get<1> ().begin ()->adjusted_difficulty; + assert (min >= node.network_params.network.publish_threshold); + uint128_t max = (--roots.get<1> ().end ())->adjusted_difficulty; + assert (max >= node.network_params.network.publish_threshold); + difficulty = static_cast ((min + max) / 2); + } + assert (difficulty >= node.network_params.network.publish_threshold); + difficulty_cb.push_front (difficulty); + auto sum (std::accumulate (node.active.difficulty_cb.begin (), node.active.difficulty_cb.end (), uint128_t (0))); + difficulty = static_cast (sum / difficulty_cb.size ()); + assert (difficulty >= node.network_params.network.publish_threshold); + trended_active_difficulty = difficulty; +} + +uint64_t nano::active_transactions::active_difficulty () +{ + std::lock_guard lock (mutex); + return trended_active_difficulty; +} + +// List of active blocks in elections +std::deque> nano::active_transactions::list_blocks (bool single_lock) +{ + std::deque> result; + std::unique_lock lock; + if (!single_lock) + { + lock = std::unique_lock (mutex); + } + for (auto i (roots.begin ()), n (roots.end ()); i != n; ++i) + { + result.push_back (i->election->status.winner); + } + return result; +} + +std::deque nano::active_transactions::list_confirmed () +{ + std::lock_guard lock (mutex); + return confirmed; +} + +void nano::active_transactions::erase (nano::block const & block_a) +{ + std::lock_guard lock (mutex); + if (roots.find (block_a.qualified_root ()) != roots.end ()) + { + roots.erase (block_a.qualified_root ()); + node.logger.try_log (boost::str (boost::format ("Election erased for block block %1% root %2%") % block_a.hash ().to_string () % block_a.root ().to_string ())); + } +} + +bool nano::active_transactions::empty () +{ + std::lock_guard lock (mutex); + return roots.empty (); +} + +size_t nano::active_transactions::size () +{ + std::lock_guard lock (mutex); + return roots.size (); +} + +bool nano::active_transactions::publish (std::shared_ptr block_a) +{ + std::lock_guard lock (mutex); + auto existing (roots.find (block_a->qualified_root ())); + auto result (true); + if (existing != roots.end ()) + { + result = existing->election->publish (block_a); + if (!result) + { + blocks.insert (std::make_pair (block_a->hash (), existing->election)); + } + } + return result; +} + +void nano::active_transactions::confirm_block (nano::block_hash const & hash_a) +{ + std::lock_guard lock (mutex); + auto existing (blocks.find (hash_a)); + if (existing != blocks.end () && !existing->second->confirmed && !existing->second->stopped && existing->second->status.winner->hash () == hash_a) + { + existing->second->confirm_once (); + } +} + +namespace nano +{ +std::unique_ptr collect_seq_con_info (active_transactions & active_transactions, const std::string & name) +{ + size_t roots_count = 0; + size_t blocks_count = 0; + size_t confirmed_count = 0; + + { + std::lock_guard guard (active_transactions.mutex); + roots_count = active_transactions.roots.size (); + blocks_count = active_transactions.blocks.size (); + confirmed_count = active_transactions.confirmed.size (); + } + + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (seq_con_info{ "roots", roots_count, sizeof (decltype (active_transactions.roots)::value_type) })); + composite->add_component (std::make_unique (seq_con_info{ "blocks", blocks_count, sizeof (decltype (active_transactions.blocks)::value_type) })); + composite->add_component (std::make_unique (seq_con_info{ "confirmed", confirmed_count, sizeof (decltype (active_transactions.confirmed)::value_type) })); + return composite; +} +} diff --git a/nano/node/active_transactions.hpp b/nano/node/active_transactions.hpp new file mode 100644 index 0000000000..ded2a33f19 --- /dev/null +++ b/nano/node/active_transactions.hpp @@ -0,0 +1,119 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace nano +{ +class node; +class block; +class vote; +class election; +class transaction; + +class conflict_info final +{ +public: + nano::qualified_root root; + uint64_t difficulty; + uint64_t adjusted_difficulty; + std::shared_ptr election; +}; + +class election_status final +{ +public: + std::shared_ptr winner; + nano::amount tally; + std::chrono::milliseconds election_end; + std::chrono::milliseconds election_duration; +}; + +// Core class for determining consensus +// Holds all active blocks i.e. recently added blocks that need confirmation +class active_transactions final +{ +public: + explicit active_transactions (nano::node &, bool delay_frontier_confirmation_height_updating = false); + ~active_transactions (); + // Start an election for a block + // Call action with confirmed block, may be different than what we started with + // clang-format off + bool start (std::shared_ptr, std::function)> const & = [](std::shared_ptr) {}); + // clang-format on + // If this returns true, the vote is a replay + // If this returns false, the vote may or may not be a replay + bool vote (std::shared_ptr, bool = false); + // Is the root of this block in the roots container + bool active (nano::block const &); + bool active (nano::qualified_root const &); + void update_difficulty (nano::block const &); + void adjust_difficulty (nano::block_hash const &); + void update_active_difficulty (std::unique_lock &); + uint64_t active_difficulty (); + std::deque> list_blocks (bool = false); + void erase (nano::block const &); + bool empty (); + size_t size (); + void stop (); + bool publish (std::shared_ptr block_a); + void confirm_block (nano::block_hash const &); + boost::multi_index_container< + nano::conflict_info, + boost::multi_index::indexed_by< + boost::multi_index::hashed_unique< + boost::multi_index::member>, + boost::multi_index::ordered_non_unique< + boost::multi_index::member, + std::greater>>> + roots; + std::unordered_map> blocks; + std::deque list_confirmed (); + std::deque confirmed; + nano::node & node; + std::mutex mutex; + // Maximum number of conflicts to vote on per interval, lowest root hash first + static unsigned constexpr announcements_per_interval = 32; + // Minimum number of block announcements + static unsigned constexpr announcement_min = 2; + // Threshold to start logging blocks haven't yet been confirmed + static unsigned constexpr announcement_long = 20; + static size_t constexpr election_history_size = 2048; + static size_t constexpr max_broadcast_queue = 1000; + boost::circular_buffer difficulty_cb; + uint64_t trended_active_difficulty; + +private: + // Call action with confirmed block, may be different than what we started with + // clang-format off + bool add (std::shared_ptr, std::function)> const & = [](std::shared_ptr) {}); + // clang-format on + void request_loop (); + void request_confirm (std::unique_lock &); + void confirm_frontiers (nano::transaction const &); + nano::account next_frontier_account{ 0 }; + std::chrono::steady_clock::time_point next_frontier_check{ std::chrono::steady_clock::now () }; + std::condition_variable condition; + bool started{ false }; + std::atomic stopped{ false }; + static size_t constexpr confirmed_frontiers_max_pending_cut_off = 100; + boost::thread thread; +}; + +std::unique_ptr collect_seq_con_info (active_transactions & active_transactions, const std::string & name); +} diff --git a/nano/node/confirmation_height_processor.cpp b/nano/node/confirmation_height_processor.cpp new file mode 100644 index 0000000000..20e23d503a --- /dev/null +++ b/nano/node/confirmation_height_processor.cpp @@ -0,0 +1,378 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +nano::confirmation_height_processor::confirmation_height_processor (nano::pending_confirmation_height & pending_confirmation_height_a, nano::block_store & store_a, nano::stat & stats_a, nano::active_transactions & active_a, nano::block_hash const & epoch_link_a, nano::logger_mt & logger_a) : +pending_confirmations (pending_confirmation_height_a), +store (store_a), +stats (stats_a), +active (active_a), +epoch_link (epoch_link_a), +logger (logger_a), +thread ([this]() { + nano::thread_role::set (nano::thread_role::name::confirmation_height_processing); + this->run (); +}) +{ +} + +nano::confirmation_height_processor::~confirmation_height_processor () +{ + stop (); +} + +void nano::confirmation_height_processor::stop () +{ + stopped = true; + condition.notify_one (); + if (thread.joinable ()) + { + thread.join (); + } +} + +void nano::confirmation_height_processor::run () +{ + std::unique_lock lk (pending_confirmations.mutex); + while (!stopped) + { + if (!pending_confirmations.pending.empty ()) + { + pending_confirmations.current_hash = *pending_confirmations.pending.begin (); + pending_confirmations.pending.erase (pending_confirmations.current_hash); + // Copy the hash so can be used outside owning the lock + auto current_pending_block = pending_confirmations.current_hash; + lk.unlock (); + add_confirmation_height (current_pending_block); + lk.lock (); + pending_confirmations.current_hash = 0; + } + else + { + condition.wait (lk); + } + } +} + +void nano::confirmation_height_processor::add (nano::block_hash const & hash_a) +{ + { + std::lock_guard lk (pending_confirmations.mutex); + pending_confirmations.pending.insert (hash_a); + } + condition.notify_one (); +} + +// This only check top-level blocks having their confirmation height sets, not anything below +bool nano::confirmation_height_processor::is_processing_block (nano::block_hash const & hash_a) +{ + return pending_confirmations.is_processing_block (hash_a); +} + +/** + * For all the blocks below this height which have been implicitly confirmed check if they + * are open/receive blocks, and if so follow the source blocks and iteratively repeat to genesis. + * To limit write locking and to keep the confirmation height ledger correctly synced, confirmations are + * written from the ground upwards in batches. + */ +void nano::confirmation_height_processor::add_confirmation_height (nano::block_hash const & hash_a) +{ + boost::optional receive_details; + auto current = hash_a; + nano::account_info account_info; + std::deque pending_writes; + assert (receive_source_pairs_size == 0); + + // Store the highest confirmation heights for accounts in pending_writes to reduce unnecessary iterating + std::unordered_map confirmation_height_pending_write_cache; + + release_assert (receive_source_pairs.empty ()); + auto error = false; + // Traverse account chain and all sources for receive blocks iteratively + do + { + if (!receive_source_pairs.empty ()) + { + receive_details = receive_source_pairs.back ().receive_details; + current = receive_source_pairs.back ().source_hash; + } + else + { + // If receive_details is set then this is the final iteration and we are back to the original chain. + // We need to confirm any blocks below the original hash (incl self) and the first receive block + // (if the original block is not already a receive) + if (receive_details) + { + current = hash_a; + receive_details = boost::none; + } + } + + auto transaction (store.tx_begin_read ()); + auto block_height (store.block_account_height (transaction, current)); + nano::account account (store.block_account (transaction, current)); + release_assert (!store.account_get (transaction, account, account_info)); + auto confirmation_height = account_info.confirmation_height; + + auto account_it = confirmation_height_pending_write_cache.find (account); + if (account_it != confirmation_height_pending_write_cache.cend () && account_it->second > confirmation_height) + { + confirmation_height = account_it->second; + } + + auto count_before_open_receive = receive_source_pairs.size (); + if (block_height > confirmation_height) + { + if ((block_height - confirmation_height) > 20000) + { + logger.always_log ("Iterating over a large account chain for setting confirmation height. The top block: ", current.to_string ()); + } + + collect_unconfirmed_receive_and_sources_for_account (block_height, confirmation_height, current, account, transaction); + } + + // If this adds no more open_receive blocks, then we can now confirm this account as well as the linked open/receive block + // Collect as pending any writes to the database and do them in bulk after a certain time. + auto confirmed_receives_pending = (count_before_open_receive != receive_source_pairs.size ()); + if (!confirmed_receives_pending) + { + if (block_height > confirmation_height) + { + // Check whether the previous block has been seen. If so, the rest of sends below have already been seen so don't count them + if (account_it != confirmation_height_pending_write_cache.cend ()) + { + account_it->second = block_height; + } + else + { + confirmation_height_pending_write_cache.emplace (account, block_height); + } + + pending_writes.emplace_back (account, current, block_height, block_height - confirmation_height); + } + + if (receive_details) + { + // Check whether the previous block has been seen. If so, the rest of sends below have already been seen so don't count them + auto const & receive_account = receive_details->account; + auto receive_account_it = confirmation_height_pending_write_cache.find (receive_account); + if (receive_account_it != confirmation_height_pending_write_cache.cend ()) + { + // Get current height + auto current_height = receive_account_it->second; + receive_account_it->second = receive_details->height; + receive_details->num_blocks_confirmed = receive_details->height - current_height; + } + else + { + confirmation_height_pending_write_cache.emplace (receive_account, receive_details->height); + } + + pending_writes.push_back (*receive_details); + } + + if (!receive_source_pairs.empty ()) + { + // Pop from the end + receive_source_pairs.erase (receive_source_pairs.end () - 1); + --receive_source_pairs_size; + } + } + + // Check whether writing to the database should be done now + auto total_pending_write_block_count = std::accumulate (pending_writes.cbegin (), pending_writes.cend (), uint64_t (0), [](uint64_t total, conf_height_details const & conf_height_details_a) { + return total += conf_height_details_a.num_blocks_confirmed; + }); + + if ((total_pending_write_block_count >= batch_write_size || receive_source_pairs.empty ()) && !pending_writes.empty ()) + { + error = write_pending (pending_writes, total_pending_write_block_count); + // Don't set any more blocks as confirmed from the original hash if an inconsistency is found + if (error) + { + receive_source_pairs.clear (); + receive_source_pairs_size = 0; + break; + } + assert (pending_writes.empty ()); + } + // Exit early when the processor has been stopped, otherwise this function may take a + // while (and hence keep the process running) if updating a long chain. + if (stopped) + { + break; + } + } while (!receive_source_pairs.empty () || current != hash_a); +} + +/* + * Returns true if there was an error in finding one of the blocks to write a confirmation height for, false otherwise + */ +bool nano::confirmation_height_processor::write_pending (std::deque & all_pending_a, int64_t total_pending_write_block_count_a) +{ + nano::account_info account_info; + auto total_pending_write_block_count (total_pending_write_block_count_a); + + // Write in batches + while (total_pending_write_block_count > 0) + { + uint64_t num_block_writes = 0; + auto transaction (store.tx_begin_write ()); + while (!all_pending_a.empty ()) + { + const auto & pending = all_pending_a.front (); + auto error = store.account_get (transaction, pending.account, account_info); + release_assert (!error); + if (pending.height > account_info.confirmation_height) + { +#ifndef NDEBUG + // Do more thorough checking in Debug mode, indicates programming error. + nano::block_sideband sideband; + auto block = store.block_get (transaction, pending.hash, &sideband); + assert (block != nullptr); + assert (sideband.height == pending.height); +#else + auto block = store.block_get (transaction, pending.hash); +#endif + // Check that the block still exists as there may have been changes outside this processor. + if (!block) + { + logger.always_log ("Failed to write confirmation height for: ", pending.hash.to_string ()); + stats.inc (nano::stat::type::confirmation_height, nano::stat::detail::invalid_block); + return true; + } + + stats.add (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in, pending.height - account_info.confirmation_height); + assert (pending.num_blocks_confirmed == pending.height - account_info.confirmation_height); + account_info.confirmation_height = pending.height; + store.account_put (transaction, pending.account, account_info); + } + total_pending_write_block_count -= pending.num_blocks_confirmed; + num_block_writes += pending.num_blocks_confirmed; + all_pending_a.erase (all_pending_a.begin ()); + + if (num_block_writes >= batch_write_size) + { + // Commit changes periodically to reduce time holding write locks for long chains + break; + } + } + } + return false; +} + +void nano::confirmation_height_processor::collect_unconfirmed_receive_and_sources_for_account (uint64_t block_height_a, uint64_t confirmation_height_a, nano::block_hash const & hash_a, nano::account const & account_a, nano::transaction & transaction_a) +{ + auto hash (hash_a); + auto num_to_confirm = block_height_a - confirmation_height_a; + + // Store heights of blocks + constexpr auto height_not_set = std::numeric_limits::max (); + auto next_height = height_not_set; + while (num_to_confirm > 0 && !hash.is_zero ()) + { + active.confirm_block (hash); + nano::block_sideband sideband; + auto block (store.block_get (transaction_a, hash, &sideband)); + if (block) + { + auto source (block->source ()); + if (source.is_zero ()) + { + source = block->link (); + } + + if (!source.is_zero () && source != epoch_link && store.source_exists (transaction_a, source)) + { + // Set the height for the receive block above (if there is one) + if (next_height != height_not_set) + { + receive_source_pairs.back ().receive_details.num_blocks_confirmed = next_height - sideband.height; + } + + receive_source_pairs.emplace_back (conf_height_details{ account_a, hash, sideband.height, height_not_set }, source); + ++receive_source_pairs_size; + next_height = sideband.height; + } + + hash = block->previous (); + } + --num_to_confirm; + } + + // Update the number of blocks confirmed by the last receive block + if (!receive_source_pairs.empty ()) + { + receive_source_pairs.back ().receive_details.num_blocks_confirmed = receive_source_pairs.back ().receive_details.height - confirmation_height_a; + } +} + +namespace nano +{ +confirmation_height_processor::conf_height_details::conf_height_details (nano::account const & account_a, nano::block_hash const & hash_a, uint64_t height_a, uint64_t num_blocks_confirmed_a) : +account (account_a), +hash (hash_a), +height (height_a), +num_blocks_confirmed (num_blocks_confirmed_a) +{ +} + +confirmation_height_processor::receive_source_pair::receive_source_pair (confirmation_height_processor::conf_height_details const & receive_details_a, const block_hash & source_a) : +receive_details (receive_details_a), +source_hash (source_a) +{ +} + +std::unique_ptr collect_seq_con_info (confirmation_height_processor & confirmation_height_processor_a, const std::string & name_a) +{ + size_t receive_source_pairs_count = confirmation_height_processor_a.receive_source_pairs_size; + auto composite = std::make_unique (name_a); + composite->add_component (std::make_unique (seq_con_info{ "receive_source_pairs", receive_source_pairs_count, sizeof (decltype (confirmation_height_processor_a.receive_source_pairs)::value_type) })); + return composite; +} +} + +size_t nano::pending_confirmation_height::size () +{ + std::lock_guard lk (mutex); + return pending.size (); +} + +bool nano::pending_confirmation_height::is_processing_block (nano::block_hash const & hash_a) +{ + // First check the hash currently being processed + std::lock_guard lk (mutex); + if (!current_hash.is_zero () && current_hash == hash_a) + { + return true; + } + + // Check remaining pending confirmations + return pending.find (hash_a) != pending.cend (); +} + +nano::block_hash nano::pending_confirmation_height::current () +{ + std::lock_guard lk (mutex); + return current_hash; +} + +namespace nano +{ +std::unique_ptr collect_seq_con_info (pending_confirmation_height & pending_confirmation_height_a, const std::string & name_a) +{ + size_t pending_count = pending_confirmation_height_a.size (); + auto composite = std::make_unique (name_a); + composite->add_component (std::make_unique (seq_con_info{ "pending", pending_count, sizeof (nano::block_hash) })); + return composite; +} +} diff --git a/nano/node/confirmation_height_processor.hpp b/nano/node/confirmation_height_processor.hpp new file mode 100644 index 0000000000..7dfbb4bed5 --- /dev/null +++ b/nano/node/confirmation_height_processor.hpp @@ -0,0 +1,89 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace nano +{ +class block_store; +class stat; +class active_transactions; +class transaction; +class logger_mt; + +class pending_confirmation_height +{ +public: + size_t size (); + bool is_processing_block (nano::block_hash const &); + nano::block_hash current (); + +private: + std::mutex mutex; + std::unordered_set pending; + /** This is the last block popped off the confirmation height pending collection */ + nano::block_hash current_hash{ 0 }; + friend class confirmation_height_processor; +}; + +std::unique_ptr collect_seq_con_info (pending_confirmation_height &, const std::string &); + +class confirmation_height_processor final +{ +public: + confirmation_height_processor (pending_confirmation_height &, nano::block_store &, nano::stat &, nano::active_transactions &, nano::block_hash const &, nano::logger_mt &); + ~confirmation_height_processor (); + void add (nano::block_hash const &); + void stop (); + bool is_processing_block (nano::block_hash const &); + + /** The maximum amount of blocks to write at once */ + static uint64_t constexpr batch_write_size = 4096; + +private: + class conf_height_details final + { + public: + conf_height_details (nano::account const &, nano::block_hash const &, uint64_t, uint64_t); + + nano::account account; + nano::block_hash hash; + uint64_t height; + uint64_t num_blocks_confirmed; + }; + + class receive_source_pair final + { + public: + receive_source_pair (conf_height_details const &, const nano::block_hash &); + + conf_height_details receive_details; + nano::block_hash source_hash; + }; + + std::condition_variable condition; + nano::pending_confirmation_height & pending_confirmations; + std::atomic stopped{ false }; + nano::block_store & store; + nano::stat & stats; + nano::active_transactions & active; + nano::block_hash const & epoch_link; + nano::logger_mt & logger; + std::atomic receive_source_pairs_size{ 0 }; + std::vector receive_source_pairs; + std::thread thread; + + void run (); + void add_confirmation_height (nano::block_hash const &); + void collect_unconfirmed_receive_and_sources_for_account (uint64_t, uint64_t, nano::block_hash const &, nano::account const &, nano::transaction &); + bool write_pending (std::deque &, int64_t); + + friend std::unique_ptr collect_seq_con_info (confirmation_height_processor &, const std::string &); +}; + +std::unique_ptr collect_seq_con_info (confirmation_height_processor &, const std::string &); +} diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 432ef56857..a335877598 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -29,6 +29,7 @@ void construct_json (nano::seq_con_info_component * component, boost::property_t using ipc_json_handler_no_arg_func_map = std::unordered_map>; ipc_json_handler_no_arg_func_map create_ipc_json_handler_no_arg_func_map (); auto ipc_json_handler_no_arg_funcs = create_ipc_json_handler_no_arg_func_map (); +bool block_confirmed (nano::node & node, nano::transaction & transaction, nano::block_hash const & hash, bool include_active, bool include_only_confirmed); } nano::json_handler::json_handler (nano::node & node_a, nano::node_rpc_config const & node_rpc_config_a, std::string const & body_a, std::function const & response_a, std::function stop_callback_a) : @@ -768,6 +769,7 @@ void nano::json_handler::accounts_pending () auto threshold (threshold_optional_impl ()); const bool source = request.get ("source", false); const bool include_active = request.get ("include_active", false); + const bool include_only_confirmed = request.get ("include_only_confirmed", false); const bool sorting = request.get ("sorting", false); auto simple (threshold.is_zero () && !source && !sorting); // if simple, response is a list of hashes for each account boost::property_tree::ptree pending; @@ -781,7 +783,7 @@ void nano::json_handler::accounts_pending () for (auto i (node.store.pending_begin (transaction, nano::pending_key (account, 0))); nano::pending_key (i->first).account == account && peers_l.size () < count; ++i) { nano::pending_key key (i->first); - if (include_active || node.ledger.block_confirmed (transaction, key.hash)) + if (block_confirmed (node, transaction, key.hash, include_active, include_only_confirmed)) { if (simple) { @@ -892,7 +894,7 @@ void nano::json_handler::block_info () response_l.put ("balance", balance.convert_to ()); response_l.put ("height", std::to_string (sideband.height)); response_l.put ("local_timestamp", std::to_string (sideband.timestamp)); - auto confirmed (node.ledger.block_confirmed (transaction, hash)); + auto confirmed (node.block_confirmed_or_being_confirmed (transaction, hash)); response_l.put ("confirmed", confirmed); bool json_block_l = request.get ("json_block", false); @@ -930,7 +932,7 @@ void nano::json_handler::block_confirm () auto block_l (node.store.block_get (transaction, hash)); if (block_l != nullptr) { - if (!node.ledger.block_confirmed (transaction, hash)) + if (!node.block_confirmed_or_being_confirmed (transaction, hash)) { // Start new confirmation for unconfirmed block node.block_confirm (std::move (block_l)); @@ -1047,7 +1049,7 @@ void nano::json_handler::blocks_info () entry.put ("balance", balance.convert_to ()); entry.put ("height", std::to_string (sideband.height)); entry.put ("local_timestamp", std::to_string (sideband.timestamp)); - auto confirmed (node.ledger.block_confirmed (transaction, hash)); + auto confirmed (node.block_confirmed_or_being_confirmed (transaction, hash)); entry.put ("confirmed", confirmed); if (json_block_l) @@ -1601,6 +1603,20 @@ void nano::json_handler::confirmation_active () response_errors (); } +void nano::json_handler::confirmation_height_currently_processing () +{ + auto hash = node.pending_confirmation_height.current (); + if (!hash.is_zero ()) + { + response_l.put ("hash", node.pending_confirmation_height.current ().to_string ()); + } + else + { + ec = nano::error_rpc::confirmation_height_not_processing; + } + response_errors (); +} + void nano::json_handler::confirmation_history () { boost::property_tree::ptree elections; @@ -2442,6 +2458,7 @@ void nano::json_handler::pending () const bool source = request.get ("source", false); const bool min_version = request.get ("min_version", false); const bool include_active = request.get ("include_active", false); + const bool include_only_confirmed = request.get ("include_only_confirmed", false); const bool sorting = request.get ("sorting", false); auto simple (threshold.is_zero () && !source && !min_version && !sorting); // if simple, response is a list of hashes if (!ec) @@ -2451,7 +2468,7 @@ void nano::json_handler::pending () for (auto i (node.store.pending_begin (transaction, nano::pending_key (account, 0))); nano::pending_key (i->first).account == account && peers_l.size () < count; ++i) { nano::pending_key key (i->first); - if (include_active || node.ledger.block_confirmed (transaction, key.hash)) + if (block_confirmed (node, transaction, key.hash, include_active, include_only_confirmed)) { if (simple) { @@ -2510,6 +2527,7 @@ void nano::json_handler::pending_exists () { auto hash (hash_impl ()); const bool include_active = request.get ("include_active", false); + const bool include_only_confirmed = request.get ("include_only_confirmed", false); if (!ec) { auto transaction (node.store.tx_begin_read ()); @@ -2522,7 +2540,7 @@ void nano::json_handler::pending_exists () { exists = node.store.pending_exists (transaction, nano::pending_key (destination, hash)); } - exists = exists && (include_active || !node.active.active (*block)); + exists = exists && (block_confirmed (node, transaction, block->hash (), include_active, include_only_confirmed)); response_l.put ("exists", exists ? "1" : "0"); } else @@ -4054,6 +4072,7 @@ void nano::json_handler::wallet_pending () const bool source = request.get ("source", false); const bool min_version = request.get ("min_version", false); const bool include_active = request.get ("include_active", false); + const bool include_only_confirmed = request.get ("include_only_confirmed", false); if (!ec) { boost::property_tree::ptree pending; @@ -4066,7 +4085,7 @@ void nano::json_handler::wallet_pending () for (auto ii (node.store.pending_begin (block_transaction, nano::pending_key (account, 0))); nano::pending_key (ii->first).account == account && peers_l.size () < count; ++ii) { nano::pending_key key (ii->first); - if (include_active || node.ledger.block_confirmed (block_transaction, key.hash)) + if (block_confirmed (node, block_transaction, key.hash, include_active, include_only_confirmed)) { if (threshold.is_zero () && !source) { @@ -4486,6 +4505,7 @@ ipc_json_handler_no_arg_func_map create_ipc_json_handler_no_arg_func_map () no_arg_funcs.emplace ("delegators_count", &nano::json_handler::delegators_count); no_arg_funcs.emplace ("deterministic_key", &nano::json_handler::deterministic_key); no_arg_funcs.emplace ("confirmation_active", &nano::json_handler::confirmation_active); + no_arg_funcs.emplace ("confirmation_height_currently_processing", &nano::json_handler::confirmation_height_currently_processing); no_arg_funcs.emplace ("confirmation_history", &nano::json_handler::confirmation_history); no_arg_funcs.emplace ("confirmation_info", &nano::json_handler::confirmation_info); no_arg_funcs.emplace ("confirmation_quorum", &nano::json_handler::confirmation_quorum); @@ -4559,4 +4579,27 @@ ipc_json_handler_no_arg_func_map create_ipc_json_handler_no_arg_func_map () no_arg_funcs.emplace ("work_peers_clear", &nano::json_handler::work_peers_clear); return no_arg_funcs; } + +/** Due to the asynchronous nature of updating confirmation heights, it can also be necessary to check active roots */ +bool block_confirmed (nano::node & node, nano::transaction & transaction, nano::block_hash const & hash, bool include_active, bool include_only_confirmed) +{ + bool is_confirmed = false; + if (include_active && !include_only_confirmed) + { + is_confirmed = true; + } + // Check whether the confirmation height is set + else if (node.block_confirmed_or_being_confirmed (transaction, hash)) + { + is_confirmed = true; + } + // This just checks it's not currently undergoing an active transaction + else if (!include_only_confirmed) + { + auto block (node.store.block_get (transaction, hash)); + is_confirmed = (block != nullptr && !node.active.active (*block)); + } + + return is_confirmed; +} } diff --git a/nano/node/json_handler.hpp b/nano/node/json_handler.hpp index ba913a53eb..6243a9d498 100644 --- a/nano/node/json_handler.hpp +++ b/nano/node/json_handler.hpp @@ -55,6 +55,7 @@ class json_handler : public std::enable_shared_from_this void confirmation_history (); void confirmation_info (); void confirmation_quorum (); + void confirmation_height_currently_processing (); void delegators (); void delegators_count (); void deterministic_key (); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index f90faa5eb1..847ca3620d 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -18,7 +18,6 @@ double constexpr nano::node::price_max; double constexpr nano::node::free_cutoff; -size_t constexpr nano::active_transactions::max_broadcast_queue; size_t constexpr nano::block_arrival::arrival_size_min; std::chrono::seconds constexpr nano::block_arrival::arrival_time_min; @@ -1052,6 +1051,7 @@ wallets (init_a.wallet_init, *this), stats (config.stat_config), vote_uniquer (block_uniquer), active (*this, delay_frontier_confirmation_height_updating), +confirmation_height_processor (pending_confirmation_height, store, ledger.stats, active, ledger.epoch_link, logger), payment_observer_processor (observers.blocks), startup_time (std::chrono::steady_clock::now ()) { @@ -1378,7 +1378,7 @@ void nano::node::process_fork (nano::transaction const & transaction_a, std::sha if (!store.block_exists (transaction_a, block_a->type (), block_a->hash ()) && store.root_exists (transaction_a, block_a->root ())) { std::shared_ptr ledger_block (ledger.forked_block (transaction_a, *block_a)); - if (ledger_block && !ledger.block_confirmed (transaction_a, ledger_block->hash ())) + if (ledger_block && !block_confirmed_or_being_confirmed (transaction_a, ledger_block->hash ())) { std::weak_ptr this_w (shared_from_this ()); if (!active.start (ledger_block, [this_w, root](std::shared_ptr) { @@ -1431,6 +1431,8 @@ std::unique_ptr collect_seq_con_info (node & node, const composite->add_component (collect_seq_con_info (node.votes_cache, "votes_cache")); composite->add_component (collect_seq_con_info (node.block_uniquer, "block_uniquer")); composite->add_component (collect_seq_con_info (node.vote_uniquer, "vote_uniquer")); + composite->add_component (collect_seq_con_info (node.confirmation_height_processor, "confirmation_height_processor")); + composite->add_component (collect_seq_con_info (node.pending_confirmation_height, "pending_confirmation_height")); return composite; } } @@ -1604,6 +1606,7 @@ void nano::node::stop () block_processor_thread.join (); } vote_processor.stop (); + confirmation_height_processor.stop (); active.stop (); network.stop (); if (websocket_server) @@ -2192,6 +2195,11 @@ void nano::node::block_confirm (std::shared_ptr block_a) } } +bool nano::node::block_confirmed_or_being_confirmed (nano::transaction const & transaction_a, nano::block_hash const & hash_a) +{ + return ledger.block_confirmed (transaction_a, hash_a) || confirmation_height_processor.is_processing_block (hash_a); +} + nano::uint128_t nano::node::delta () const { auto result ((online_reps.online_stake () / 100) * config.online_weight_quorum); @@ -2301,7 +2309,7 @@ void nano::node::process_confirmed (std::shared_ptr block_a, uint8_ auto hash (block_a->hash ()); if (ledger.block_exists (block_a->type (), hash)) { - add_confirmation_heights (hash); + confirmation_height_processor.add (hash); auto transaction (store.tx_begin_read ()); receive_confirmed (transaction, block_a, hash); @@ -2795,47 +2803,6 @@ size_t nano::election::last_votes_size () return last_votes.size (); } -void nano::active_transactions::confirm_frontiers (nano::transaction const & transaction_a) -{ - // Limit maximum count of elections to start - bool representative (node.config.enable_voting && node.wallets.reps_count > 0); - /* Check less frequently for non-representative nodes - ~15 minutes for non-representative nodes, 3 minutes for representatives */ - int representative_factor = representative ? 3 * 60 : 15 * 60; - // Decrease check time for test network - int test_network_factor = node.network_params.network.is_test_network () ? 1000 : 1; - if (std::chrono::steady_clock::now () >= next_frontier_check) - { - size_t max_elections (max_broadcast_queue / 4); - size_t elections_count (0); - for (auto i (node.store.latest_begin (transaction_a, next_frontier_account)), n (node.store.latest_end ()); i != n && !stopped && elections_count < max_elections; ++i) - { - nano::account_info info (i->second); - if (info.block_count != info.confirmation_height) - { - auto block (node.store.block_get (transaction_a, info.head)); - if (!start (block)) - { - ++elections_count; - // Calculate votes for local representatives - if (representative) - { - node.block_processor.generator.add (block->hash ()); - } - } - // Update next account - next_frontier_account = i->first.number () + 1; - } - } - // 4 times slower check if all frontiers were confirmed - int fully_confirmed_factor = (elections_count <= max_elections) ? 4 : 1; - // Calculate next check time - next_frontier_check = std::chrono::steady_clock::now () + std::chrono::seconds ((representative_factor * fully_confirmed_factor) / test_network_factor); - // Set next account to 0 if all frontiers were confirmed - next_frontier_account = (elections_count <= max_elections) ? 0 : next_frontier_account; - } -} - void nano::election::update_dependent () { assert (!node.active.mutex.try_lock ()); @@ -2869,671 +2836,6 @@ void nano::election::update_dependent () } } -void nano::active_transactions::request_confirm (std::unique_lock & lock_a) -{ - std::unordered_set inactive; - auto transaction (node.store.tx_begin_read ()); - unsigned unconfirmed_count (0); - unsigned unconfirmed_announcements (0); - std::unordered_map, std::vector>> requests_bundle; - std::deque> rebroadcast_bundle; - std::deque, std::shared_ptr>>>> confirm_req_bundle; - - auto roots_size (roots.size ()); - for (auto i (roots.get<1> ().begin ()), n (roots.get<1> ().end ()); i != n; ++i) - { - auto root (i->root); - auto election_l (i->election); - if ((election_l->confirmed || election_l->stopped) && election_l->announcements >= announcement_min - 1) - { - if (election_l->confirmed) - { - confirmed.push_back (election_l->status); - if (confirmed.size () > election_history_size) - { - confirmed.pop_front (); - } - } - inactive.insert (root); - } - else - { - if (election_l->announcements > announcement_long) - { - ++unconfirmed_count; - unconfirmed_announcements += election_l->announcements; - // Log votes for very long unconfirmed elections - if (election_l->announcements % 50 == 1) - { - auto tally_l (election_l->tally (transaction)); - election_l->log_votes (tally_l); - } - /* Escalation for long unconfirmed elections - Start new elections for previous block & source - if there are less than 100 active elections */ - if (election_l->announcements % announcement_long == 1 && roots_size < 100 && !node.network_params.network.is_test_network ()) - { - std::shared_ptr previous; - auto previous_hash (election_l->status.winner->previous ()); - if (!previous_hash.is_zero ()) - { - previous = node.store.block_get (transaction, previous_hash); - if (previous != nullptr) - { - add (std::move (previous)); - } - } - /* If previous block not existing/not commited yet, block_source can cause segfault for state blocks - So source check can be done only if previous != nullptr or previous is 0 (open account) */ - if (previous_hash.is_zero () || previous != nullptr) - { - auto source_hash (node.ledger.block_source (transaction, *election_l->status.winner)); - if (!source_hash.is_zero ()) - { - auto source (node.store.block_get (transaction, source_hash)); - if (source != nullptr) - { - add (std::move (source)); - } - } - } - election_l->update_dependent (); - } - } - if (election_l->announcements < announcement_long || election_l->announcements % announcement_long == 1) - { - if (node.ledger.could_fit (transaction, *election_l->status.winner)) - { - // Broadcast winner - if (rebroadcast_bundle.size () < max_broadcast_queue) - { - rebroadcast_bundle.push_back (election_l->status.winner); - } - } - else - { - if (election_l->announcements != 0) - { - election_l->stop (); - } - } - } - if (election_l->announcements % 4 == 1) - { - auto rep_channels (std::make_shared>> ()); - auto reps (node.rep_crawler.representatives (std::numeric_limits::max ())); - - // Add all rep endpoints that haven't already voted. We use a set since multiple - // reps may exist on an endpoint. - std::unordered_set> channels; - for (auto & rep : reps) - { - if (election_l->last_votes.find (rep.account) == election_l->last_votes.end ()) - { - channels.insert (rep.channel); - - if (node.config.logging.vote_logging ()) - { - node.logger.try_log ("Representative did not respond to confirm_req, retrying: ", rep.account.to_account ()); - } - } - } - - rep_channels->insert (rep_channels->end (), channels.begin (), channels.end ()); - - if ((!rep_channels->empty () && node.rep_crawler.total_weight () > node.config.online_weight_minimum.number ()) || roots_size > 5) - { - // broadcast_confirm_req_base modifies reps, so we clone it once to avoid aliasing - if (!node.network_params.network.is_test_network ()) - { - if (confirm_req_bundle.size () < max_broadcast_queue) - { - confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, rep_channels)); - } - } - else - { - for (auto & rep : *rep_channels) - { - auto rep_request (requests_bundle.find (rep)); - auto block (election_l->status.winner); - auto root_hash (std::make_pair (block->hash (), block->root ())); - if (rep_request == requests_bundle.end ()) - { - if (requests_bundle.size () < max_broadcast_queue) - { - std::vector> insert_vector = { root_hash }; - requests_bundle.insert (std::make_pair (rep, insert_vector)); - } - } - else if (rep_request->second.size () < max_broadcast_queue * nano::network::confirm_req_hashes_max) - { - rep_request->second.push_back (root_hash); - } - } - } - } - else - { - if (!node.network_params.network.is_test_network ()) - { - auto deque_l (node.network.udp_channels.random_set (100)); - auto vec (std::make_shared>> ()); - for (auto i : deque_l) - { - vec->push_back (i); - } - confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, vec)); - } - else - { - for (auto & rep : *rep_channels) - { - auto rep_request (requests_bundle.find (rep)); - auto block (election_l->status.winner); - auto root_hash (std::make_pair (block->hash (), block->root ())); - if (rep_request == requests_bundle.end ()) - { - std::vector> insert_vector = { root_hash }; - requests_bundle.insert (std::make_pair (rep, insert_vector)); - } - else - { - rep_request->second.push_back (root_hash); - } - } - } - } - } - } - ++election_l->announcements; - } - lock_a.unlock (); - // Rebroadcast unconfirmed blocks - if (!rebroadcast_bundle.empty ()) - { - node.network.flood_block_batch (rebroadcast_bundle); - } - // Batch confirmation request - if (!node.network_params.network.is_live_network () && !requests_bundle.empty ()) - { - node.network.broadcast_confirm_req_batch (requests_bundle, 50); - } - //confirm_req broadcast - if (!confirm_req_bundle.empty ()) - { - node.network.broadcast_confirm_req_batch (confirm_req_bundle); - } - // Confirm frontiers - confirm_frontiers (transaction); - lock_a.lock (); - // Erase inactive elections - for (auto i (inactive.begin ()), n (inactive.end ()); i != n; ++i) - { - auto root_it (roots.find (*i)); - assert (root_it != roots.end ()); - for (auto & block : root_it->election->blocks) - { - auto erased (blocks.erase (block.first)); - (void)erased; - assert (erased == 1); - } - for (auto & dependent_block : root_it->election->dependent_blocks) - { - adjust_difficulty (dependent_block); - } - roots.erase (*i); - } - if (unconfirmed_count > 0) - { - node.logger.try_log (boost::str (boost::format ("%1% blocks have been unconfirmed averaging %2% announcements") % unconfirmed_count % (unconfirmed_announcements / unconfirmed_count))); - } -} - -void nano::active_transactions::request_loop () -{ - std::unique_lock lock (mutex); - started = true; - - lock.unlock (); - condition.notify_all (); - lock.lock (); - - while (!stopped) - { - request_confirm (lock); - update_active_difficulty (lock); - // This prevents unnecessary waiting if stopped is set in-between the above check and now - if (stopped) - { - break; - } - - const auto extra_delay (std::min (roots.size (), max_broadcast_queue) * node.network.broadcast_interval_ms * 2); - condition.wait_for (lock, std::chrono::milliseconds (node.network_params.network.request_interval_ms + extra_delay)); - } -} - -void nano::active_transactions::stop () -{ - std::unique_lock lock (mutex); - while (!started) - { - condition.wait (lock); - } - stopped = true; - lock.unlock (); - condition.notify_all (); - if (thread.joinable ()) - { - thread.join (); - } - lock.lock (); - roots.clear (); -} - -bool nano::active_transactions::start (std::shared_ptr block_a, std::function)> const & confirmation_action_a) -{ - std::lock_guard lock (mutex); - return add (block_a, confirmation_action_a); -} - -bool nano::active_transactions::add (std::shared_ptr block_a, std::function)> const & confirmation_action_a) -{ - auto error (true); - if (!stopped) - { - auto root (block_a->qualified_root ()); - auto existing (roots.find (root)); - if (existing == roots.end ()) - { - auto election (std::make_shared (node, block_a, confirmation_action_a)); - uint64_t difficulty (0); - auto error (nano::work_validate (*block_a, &difficulty)); - release_assert (!error); - roots.insert (nano::conflict_info{ root, difficulty, difficulty, election }); - blocks.insert (std::make_pair (block_a->hash (), election)); - adjust_difficulty (block_a->hash ()); - } - error = existing != roots.end (); - } - return error; -} - -// Validate a vote and apply it to the current election if one exists -bool nano::active_transactions::vote (std::shared_ptr vote_a, bool single_lock) -{ - std::shared_ptr election; - bool replay (false); - bool processed (false); - { - std::unique_lock lock; - if (!single_lock) - { - lock = std::unique_lock (mutex); - } - for (auto vote_block : vote_a->blocks) - { - nano::election_vote_result result; - if (vote_block.which ()) - { - auto block_hash (boost::get (vote_block)); - auto existing (blocks.find (block_hash)); - if (existing != blocks.end ()) - { - result = existing->second->vote (vote_a->account, vote_a->sequence, block_hash); - } - } - else - { - auto block (boost::get> (vote_block)); - auto existing (roots.find (block->qualified_root ())); - if (existing != roots.end ()) - { - result = existing->election->vote (vote_a->account, vote_a->sequence, block->hash ()); - } - } - replay = replay || result.replay; - processed = processed || result.processed; - } - } - if (processed) - { - node.network.flood_vote (vote_a); - } - return replay; -} - -bool nano::active_transactions::active (nano::qualified_root const & root_a) -{ - std::lock_guard lock (mutex); - return roots.find (root_a) != roots.end (); -} - -bool nano::active_transactions::active (nano::block const & block_a) -{ - return active (block_a.qualified_root ()); -} - -void nano::active_transactions::update_difficulty (nano::block const & block_a) -{ - std::lock_guard lock (mutex); - auto existing (roots.find (block_a.qualified_root ())); - if (existing != roots.end ()) - { - uint64_t difficulty; - auto error (nano::work_validate (block_a, &difficulty)); - assert (!error); - if (difficulty > existing->difficulty) - { - roots.modify (existing, [difficulty](nano::conflict_info & info_a) { - info_a.difficulty = difficulty; - }); - adjust_difficulty (block_a.hash ()); - } - } -} - -void nano::active_transactions::adjust_difficulty (nano::block_hash const & hash_a) -{ - assert (!mutex.try_lock ()); - std::deque> remaining_blocks; - remaining_blocks.emplace_back (hash_a, 0); - std::unordered_set processed_blocks; - std::vector> elections_list; - uint128_t sum (0); - while (!remaining_blocks.empty ()) - { - auto const & item (remaining_blocks.front ()); - auto hash (item.first); - auto level (item.second); - if (processed_blocks.find (hash) == processed_blocks.end ()) - { - auto existing (blocks.find (hash)); - if (existing != blocks.end () && !existing->second->confirmed && !existing->second->stopped && existing->second->status.winner->hash () == hash) - { - auto previous (existing->second->status.winner->previous ()); - if (!previous.is_zero ()) - { - remaining_blocks.emplace_back (previous, level + 1); - } - auto source (existing->second->status.winner->source ()); - if (!source.is_zero () && source != previous) - { - remaining_blocks.emplace_back (source, level + 1); - } - auto link (existing->second->status.winner->link ()); - if (!link.is_zero () && !node.ledger.is_epoch_link (link) && link != previous) - { - remaining_blocks.emplace_back (link, level + 1); - } - for (auto & dependent_block : existing->second->dependent_blocks) - { - remaining_blocks.emplace_back (dependent_block, level - 1); - } - processed_blocks.insert (hash); - nano::qualified_root root (previous, existing->second->status.winner->root ()); - auto existing_root (roots.find (root)); - if (existing_root != roots.end ()) - { - sum += existing_root->difficulty; - elections_list.emplace_back (root, level); - } - } - } - remaining_blocks.pop_front (); - } - if (elections_list.size () > 1) - { - uint64_t average (static_cast (sum / elections_list.size ())); - // Potential overflow check - uint64_t divider (1); - if (elections_list.size () > 1000000 && (average - node.network_params.network.publish_threshold) > elections_list.size ()) - { - divider = ((average - node.network_params.network.publish_threshold) / elections_list.size ()) + 1; - } - // Set adjusted difficulty - for (auto & item : elections_list) - { - auto existing_root (roots.find (item.first)); - uint64_t difficulty_a (average + (item.second / divider)); - roots.modify (existing_root, [difficulty_a](nano::conflict_info & info_a) { - info_a.adjusted_difficulty = difficulty_a; - }); - } - } - // Set adjusted difficulty equals to difficulty - else if (elections_list.size () == 1) - { - auto existing_root (roots.find (elections_list.begin ()->first)); - if (existing_root->difficulty != existing_root->adjusted_difficulty) - { - roots.modify (existing_root, [](nano::conflict_info & info_a) { - info_a.adjusted_difficulty = info_a.difficulty; - }); - } - } -} - -void nano::active_transactions::update_active_difficulty (std::unique_lock & lock_a) -{ - assert (lock_a.mutex () == &mutex && lock_a.owns_lock ()); - uint64_t difficulty (node.network_params.network.publish_threshold); - if (!roots.empty ()) - { - uint128_t min = roots.get<1> ().begin ()->adjusted_difficulty; - assert (min >= node.network_params.network.publish_threshold); - uint128_t max = (--roots.get<1> ().end ())->adjusted_difficulty; - assert (max >= node.network_params.network.publish_threshold); - difficulty = static_cast ((min + max) / 2); - } - assert (difficulty >= node.network_params.network.publish_threshold); - difficulty_cb.push_front (difficulty); - auto sum (std::accumulate (node.active.difficulty_cb.begin (), node.active.difficulty_cb.end (), uint128_t (0))); - difficulty = static_cast (sum / difficulty_cb.size ()); - assert (difficulty >= node.network_params.network.publish_threshold); - trended_active_difficulty = difficulty; -} - -uint64_t nano::active_transactions::active_difficulty () -{ - std::lock_guard lock (mutex); - return trended_active_difficulty; -} - -// List of active blocks in elections -std::deque> nano::active_transactions::list_blocks (bool single_lock) -{ - std::deque> result; - std::unique_lock lock; - if (!single_lock) - { - lock = std::unique_lock (mutex); - } - for (auto i (roots.begin ()), n (roots.end ()); i != n; ++i) - { - result.push_back (i->election->status.winner); - } - return result; -} - -std::deque nano::active_transactions::list_confirmed () -{ - std::lock_guard lock (mutex); - return confirmed; -} - -void nano::active_transactions::erase (nano::block const & block_a) -{ - std::lock_guard lock (mutex); - if (roots.find (block_a.qualified_root ()) != roots.end ()) - { - roots.erase (block_a.qualified_root ()); - node.logger.try_log (boost::str (boost::format ("Election erased for block block %1% root %2%") % block_a.hash ().to_string () % block_a.root ().to_string ())); - } -} - -bool nano::active_transactions::empty () -{ - std::lock_guard lock (mutex); - return roots.empty (); -} - -size_t nano::active_transactions::size () -{ - std::lock_guard lock (mutex); - return roots.size (); -} - -nano::active_transactions::active_transactions (nano::node & node_a, bool delay_frontier_confirmation_height_updating) : -node (node_a), -difficulty_cb (20, node.network_params.network.publish_threshold), -trended_active_difficulty (node.network_params.network.publish_threshold), -next_frontier_check (std::chrono::steady_clock::now () + (delay_frontier_confirmation_height_updating ? std::chrono::seconds (60) : std::chrono::seconds (0))), -started (false), -stopped (false), -thread ([this]() { - nano::thread_role::set (nano::thread_role::name::request_loop); - request_loop (); -}) -{ - std::unique_lock lock (mutex); - while (!started) - { - condition.wait (lock); - } -} - -nano::active_transactions::~active_transactions () -{ - stop (); -} - -bool nano::active_transactions::publish (std::shared_ptr block_a) -{ - std::lock_guard lock (mutex); - auto existing (roots.find (block_a->qualified_root ())); - auto result (true); - if (existing != roots.end ()) - { - result = existing->election->publish (block_a); - if (!result) - { - blocks.insert (std::make_pair (block_a->hash (), existing->election)); - } - } - return result; -} - -void nano::active_transactions::confirm_block (nano::block_hash const & hash_a) -{ - std::lock_guard lock (mutex); - auto existing (blocks.find (hash_a)); - if (existing != blocks.end () && !existing->second->confirmed && !existing->second->stopped && existing->second->status.winner->hash () == hash_a) - { - existing->second->confirm_once (); - } -} - -namespace nano -{ -std::unique_ptr collect_seq_con_info (active_transactions & active_transactions, const std::string & name) -{ - size_t roots_count = 0; - size_t blocks_count = 0; - size_t confirmed_count = 0; - - { - std::lock_guard guard (active_transactions.mutex); - roots_count = active_transactions.roots.size (); - blocks_count = active_transactions.blocks.size (); - confirmed_count = active_transactions.confirmed.size (); - } - - auto composite = std::make_unique (name); - composite->add_component (std::make_unique (seq_con_info{ "roots", roots_count, sizeof (decltype (active_transactions.roots)::value_type) })); - composite->add_component (std::make_unique (seq_con_info{ "blocks", blocks_count, sizeof (decltype (active_transactions.blocks)::value_type) })); - composite->add_component (std::make_unique (seq_con_info{ "confirmed", confirmed_count, sizeof (decltype (active_transactions.confirmed)::value_type) })); - return composite; -} -} - -/** - * For all the blocks below this height which have been implicitly confirmed check if they - * are open/receive blocks, and if so follow the source blocks and iteratively repeat to genesis. - */ -void nano::node::add_confirmation_heights (nano::block_hash const & hash_a) -{ - auto transaction (store.tx_begin_write ()); - std::stack> open_receive_blocks; - auto current = hash_a; - - nano::genesis genesis; - do - { - if (!open_receive_blocks.empty ()) - { - current = open_receive_blocks.top (); - open_receive_blocks.pop (); - } - - auto hash (current); - auto block_height (store.block_account_height (transaction, hash)); - assert (block_height >= 0); - nano::account_info account_info; - nano::account account (ledger.account (transaction, hash)); - release_assert (!store.account_get (transaction, account, account_info)); - auto confirmation_height = account_info.confirmation_height; - if (block_height > confirmation_height) - { - account_info.confirmation_height = block_height; - store.account_put (transaction, account, account_info); - - // Get the difference and check if any of these are recieve blocks - auto num_confirmed_blocks = block_height - confirmation_height; - - // Start from the most recent one and work our way through - for (uint64_t i = 0; i < num_confirmed_blocks && !current.is_zero (); ++i) - { - auto block (store.block_get (transaction, current)); - if (block != nullptr) - { - // Confirm blocks back - active.confirm_block (current); - // First check legacy receive/open - if (block->type () == nano::block_type::receive || (block->type () == nano::block_type::open && current != genesis.hash ())) - { - open_receive_blocks.push (block->source ()); - } - else - { - // Then check state blocks - auto state = std::dynamic_pointer_cast (block); - if (state != nullptr) - { - nano::block_hash previous (state->hashables.previous); - if (!previous.is_zero ()) - { - if (state->hashables.balance.number () >= ledger.balance (transaction, previous) && !state->hashables.link.is_zero () && !ledger.is_epoch_link (state->hashables.link)) - { - open_receive_blocks.push (state->hashables.link); - } - } - // State open blocks are always receive or epoch - else if (!ledger.is_epoch_link (state->hashables.link)) - { - open_receive_blocks.push (state->hashables.link); - } - } - } - - current = block->previous (); - } - } - } - } while (!open_receive_blocks.empty ()); -} - int nano::node::store_version () { auto transaction (store.tx_begin_read ()); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index e2d727a752..22a2206d5a 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -1,8 +1,10 @@ #pragma once #include +#include #include #include +#include #include #include #include @@ -34,14 +36,6 @@ namespace nano { class channel; class node; -class election_status final -{ -public: - std::shared_ptr winner; - nano::amount tally; - std::chrono::milliseconds election_end; - std::chrono::milliseconds election_duration; -}; class vote_info final { public: @@ -88,86 +82,6 @@ class election final : public std::enable_shared_from_this unsigned announcements; std::unordered_set dependent_blocks; }; -class conflict_info final -{ -public: - nano::qualified_root root; - uint64_t difficulty; - uint64_t adjusted_difficulty; - std::shared_ptr election; -}; -// Core class for determining consensus -// Holds all active blocks i.e. recently added blocks that need confirmation -class active_transactions final -{ -public: - explicit active_transactions (nano::node &, bool delay_frontier_confirmation_height_updating = false); - ~active_transactions (); - // Start an election for a block - // Call action with confirmed block, may be different than what we started with - // clang-format off - bool start (std::shared_ptr, std::function)> const & = [](std::shared_ptr) {}); - // clang-format on - // If this returns true, the vote is a replay - // If this returns false, the vote may or may not be a replay - bool vote (std::shared_ptr, bool = false); - // Is the root of this block in the roots container - bool active (nano::block const &); - bool active (nano::qualified_root const &); - void update_difficulty (nano::block const &); - void adjust_difficulty (nano::block_hash const &); - void update_active_difficulty (std::unique_lock &); - uint64_t active_difficulty (); - std::deque> list_blocks (bool = false); - void erase (nano::block const &); - bool empty (); - size_t size (); - void stop (); - bool publish (std::shared_ptr block_a); - void confirm_block (nano::block_hash const &); - boost::multi_index_container< - nano::conflict_info, - boost::multi_index::indexed_by< - boost::multi_index::hashed_unique< - boost::multi_index::member>, - boost::multi_index::ordered_non_unique< - boost::multi_index::member, - std::greater>>> - roots; - std::unordered_map> blocks; - std::deque list_confirmed (); - std::deque confirmed; - nano::node & node; - std::mutex mutex; - // Maximum number of conflicts to vote on per interval, lowest root hash first - static unsigned constexpr announcements_per_interval = 32; - // Minimum number of block announcements - static unsigned constexpr announcement_min = 2; - // Threshold to start logging blocks haven't yet been confirmed - static unsigned constexpr announcement_long = 20; - static size_t constexpr election_history_size = 2048; - static size_t constexpr max_broadcast_queue = 1000; - boost::circular_buffer difficulty_cb; - uint64_t trended_active_difficulty; - -private: - // Call action with confirmed block, may be different than what we started with - // clang-format off - bool add (std::shared_ptr, std::function)> const & = [](std::shared_ptr) {}); - // clang-format on - void request_loop (); - void request_confirm (std::unique_lock &); - void confirm_frontiers (nano::transaction const &); - nano::account next_frontier_account{ 0 }; - std::chrono::steady_clock::time_point next_frontier_check{ std::chrono::steady_clock::now () }; - std::condition_variable condition; - bool started; - std::atomic stopped; - boost::thread thread; -}; - -std::unique_ptr collect_seq_con_info (active_transactions & active_transactions, const std::string & name); - class operation final { public: @@ -457,6 +371,7 @@ class node final : public std::enable_shared_from_this void work_generate (nano::uint256_union const &, std::function); void add_initial_peers (); void block_confirm (std::shared_ptr); + bool block_confirmed_or_being_confirmed (nano::transaction const &, nano::block_hash const &); void process_fork (nano::transaction const &, std::shared_ptr); bool validate_block_by_previous (nano::transaction const &, std::shared_ptr); void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr, std::shared_ptr, std::shared_ptr); @@ -498,15 +413,14 @@ class node final : public std::enable_shared_from_this nano::keypair node_id; nano::block_uniquer block_uniquer; nano::vote_uniquer vote_uniquer; + nano::pending_confirmation_height pending_confirmation_height; // Used by both active and confirmation height processor nano::active_transactions active; + nano::confirmation_height_processor confirmation_height_processor; nano::payment_observer_processor payment_observer_processor; const std::chrono::steady_clock::time_point startup_time; std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week static double constexpr price_max = 16.0; static double constexpr free_cutoff = 1024.0; - -private: - void add_confirmation_heights (nano::block_hash const & hash); }; std::unique_ptr collect_seq_con_info (node & node, const std::string & name); diff --git a/nano/node/stats.cpp b/nano/node/stats.cpp index 6e762a3eae..415fa5f80d 100644 --- a/nano/node/stats.cpp +++ b/nano/node/stats.cpp @@ -364,6 +364,8 @@ std::string nano::stat::type_to_string (uint32_t key) case nano::stat::type::message: res = "message"; break; + case nano::stat::type::confirmation_height: + res = "confirmation_height"; } return res; } @@ -506,6 +508,11 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::outdated_version: res = "outdated_version"; break; + case nano::stat::detail::invalid_block: + res = "invalid_block"; + break; + case nano::stat::detail::blocks_confirmed: + res = "blocks_confirmed"; } return res; } diff --git a/nano/node/stats.hpp b/nano/node/stats.hpp index f355c48649..360f3dd8b5 100644 --- a/nano/node/stats.hpp +++ b/nano/node/stats.hpp @@ -228,7 +228,8 @@ class stat final http_callback, peering, ipc, - udp + udp, + confirmation_height }; /** Optional detail type */ @@ -296,6 +297,10 @@ class stat final // peering handshake, + + // confirmation height + blocks_confirmed, + invalid_block }; /** Direction of the stat. If the direction is irrelevant, use in */ diff --git a/nano/node/wallet.cpp b/nano/node/wallet.cpp index e504888f6d..593198abb8 100644 --- a/nano/node/wallet.cpp +++ b/nano/node/wallet.cpp @@ -1233,7 +1233,7 @@ bool nano::wallet::search_pending () { wallets.node.logger.try_log (boost::str (boost::format ("Found a pending block %1% for account %2%") % hash.to_string () % pending.source.to_account ())); auto block (wallets.node.store.block_get (block_transaction, hash)); - if (wallets.node.ledger.block_confirmed (block_transaction, hash)) + if (wallets.node.block_confirmed_or_being_confirmed (block_transaction, hash)) { // Receive confirmed block auto node_l (wallets.node.shared ()); diff --git a/nano/rpc/rpc_handler.cpp b/nano/rpc/rpc_handler.cpp index afe7af2a29..c1c6206980 100644 --- a/nano/rpc/rpc_handler.cpp +++ b/nano/rpc/rpc_handler.cpp @@ -117,6 +117,7 @@ std::unordered_set create_rpc_control_impls () set.emplace ("accounts_create"); set.emplace ("block_create"); set.emplace ("bootstrap_lazy"); + set.emplace ("confirmation_height_currently_processing"); set.emplace ("keepalive"); set.emplace ("ledger"); set.emplace ("node_id"); diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 0d0d0ed162..6001e47838 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -12,6 +12,8 @@ #include #include +using namespace std::chrono_literals; + namespace { class test_response @@ -98,6 +100,27 @@ void enable_ipc_transport_tcp (nano::ipc::ipc_config_tcp_socket & transport_tcp) static nano::network_constants network_constants; enable_ipc_transport_tcp (transport_tcp, network_constants.default_ipc_port); } + +void reset_confirmation_height (nano::block_store & store, nano::account const & account) +{ + auto transaction = store.tx_begin_write (); + nano::account_info account_info; + store.account_get (transaction, account, account_info); + account_info.confirmation_height = 0; + store.account_put (transaction, account, account_info); +} + +void check_block_response_count (nano::system & system, nano::rpc & rpc, boost::property_tree::ptree & request, uint64_t size_count) +{ + test_response response (request, rpc.config.port, system.io_ctx); + system.deadline_set (5s); + while (response.status == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (200, response.status); + ASSERT_EQ (size_count, response.json.get_child ("blocks").front ().second.size ()); +} } TEST (rpc, account_balance) @@ -2245,6 +2268,27 @@ TEST (rpc, pending) ASSERT_EQ (amounts[block1->hash ()], 100); ASSERT_EQ (sources[block1->hash ()], nano::test_genesis_key.pub); } + + request.put ("account", key1.pub.to_account ()); + request.put ("source", "false"); + request.put ("min_version", "false"); + + auto check_block_response_count = [&system, &request, &rpc](size_t size) { + test_response response (request, rpc.config.port, system.io_ctx); + system.deadline_set (5s); + while (response.status == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + + ASSERT_EQ (200, response.status); + ASSERT_EQ (size, response.json.get_child ("blocks").size ()); + }; + + request.put ("include_only_confirmed", "true"); + check_block_response_count (1); + reset_confirmation_height (system.nodes.front ()->store, block1->account ()); + check_block_response_count (0); } TEST (rpc, search_pending) @@ -3542,6 +3586,11 @@ TEST (rpc, accounts_pending) ASSERT_EQ (amounts[block1->hash ()], 100); ASSERT_EQ (sources[block1->hash ()], nano::test_genesis_key.pub); } + + request.put ("include_only_confirmed", "true"); + check_block_response_count (system, rpc, request, 1); + reset_confirmation_height (system.nodes.front ()->store, block1->account ()); + check_block_response_count (system, rpc, request, 0); } TEST (rpc, blocks) @@ -3691,27 +3740,30 @@ TEST (rpc, pending_exists) nano::rpc rpc (system.io_ctx, rpc_config, ipc_rpc_processor); rpc.start (); boost::property_tree::ptree request; + + auto pending_exists = [&system, &request, &rpc](const char * exists_a) { + test_response response0 (request, rpc.config.port, system.io_ctx); + system.deadline_set (5s); + while (response0.status == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (200, response0.status); + std::string exists_text (response0.json.get ("exists")); + ASSERT_EQ (exists_a, exists_text); + }; + request.put ("action", "pending_exists"); request.put ("hash", hash0.to_string ()); - test_response response0 (request, rpc.config.port, system.io_ctx); - system.deadline_set (5s); - while (response0.status == 0) - { - ASSERT_NO_ERROR (system.poll ()); - } - ASSERT_EQ (200, response0.status); - std::string exists_text (response0.json.get ("exists")); - ASSERT_EQ ("0", exists_text); + pending_exists ("0"); + request.put ("hash", block1->hash ().to_string ()); - test_response response1 (request, rpc.config.port, system.io_ctx); - system.deadline_set (5s); - while (response1.status == 0) - { - ASSERT_NO_ERROR (system.poll ()); - } - ASSERT_EQ (200, response1.status); - std::string exists_text1 (response1.json.get ("exists")); - ASSERT_EQ ("1", exists_text1); + pending_exists ("1"); + + request.put ("include_only_confirmed", "true"); + pending_exists ("1"); + reset_confirmation_height (system.nodes.front ()->store, block1->account ()); + pending_exists ("0"); } TEST (rpc, wallet_pending) @@ -3816,6 +3868,21 @@ TEST (rpc, wallet_pending) } ASSERT_EQ (amounts[block1->hash ()], 100); ASSERT_EQ (sources[block1->hash ()], nano::test_genesis_key.pub); + + request.put ("include_only_confirmed", "true"); + check_block_response_count (system0, rpc, request, 1); + reset_confirmation_height (system0.nodes.front ()->store, block1->account ()); + + { + test_response response (request, rpc.config.port, system0.io_ctx); + system0.deadline_set (5s); + while (response.status == 0) + { + ASSERT_NO_ERROR (system0.poll ()); + } + ASSERT_EQ (200, response.status); + ASSERT_EQ (0, response.json.get_child ("blocks").size ()); + } } TEST (rpc, receive_minimum) @@ -5164,6 +5231,98 @@ TEST (rpc, online_reps) system.nodes[1]->stop (); } +// If this test fails, try increasing the num_blocks size. +TEST (rpc, confirmation_height_currently_processing) +{ + // The chains should be longer than the batch_write_size to test the amount of blocks confirmed is correct. + bool delay_frontier_confirmation_height_updating = true; + nano::system system (24000, 1, delay_frontier_confirmation_height_updating); + auto node = system.nodes.front (); + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + + // Do enough blocks to reliably call RPC before the confirmation height has finished + constexpr auto num_blocks = 500; + auto previous_genesis_chain_hash = node->latest (nano::test_genesis_key.pub); + { + auto transaction = node->store.tx_begin_write (); + for (auto i = num_blocks; i > 0; --i) + { + nano::send_block send (previous_genesis_chain_hash, nano::genesis_account, nano::genesis_amount - nano::Gxrb_ratio + i + 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous_genesis_chain_hash)); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code); + previous_genesis_chain_hash = send.hash (); + } + + nano::keypair key1; + nano::send_block send (previous_genesis_chain_hash, key1.pub, nano::genesis_amount - nano::Gxrb_ratio - 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous_genesis_chain_hash)); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code); + previous_genesis_chain_hash = send.hash (); + } + + std::shared_ptr frontier; + { + auto transaction = node->store.tx_begin_read (); + frontier = node->store.block_get (transaction, previous_genesis_chain_hash); + } + + // Begin process for confirming the block (and setting confirmation height) + node->block_confirm (frontier); + + boost::property_tree::ptree request; + request.put ("action", "confirmation_height_currently_processing"); + + enable_ipc_transport_tcp (node->config.ipc_config.transport_tcp); + nano::ipc::ipc_server ipc_server (*node); + nano::rpc_config rpc_config (true); + nano::ipc_rpc_processor ipc_rpc_processor (system.io_ctx, rpc_config); + nano::rpc rpc (system.io_ctx, rpc_config, ipc_rpc_processor); + rpc.start (); + + system.deadline_set (10s); + while (!node->pending_confirmation_height.is_processing_block (previous_genesis_chain_hash)) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Make the request + { + test_response response (request, rpc.config.port, system.io_ctx); + system.deadline_set (10s); + while (response.status == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (200, response.status); + auto hash (response.json.get ("hash")); + ASSERT_EQ (frontier->hash ().to_string (), hash); + } + + // Wait until confirmation has been set + system.deadline_set (10s); + while (true) + { + auto transaction = node->store.tx_begin_read (); + if (node->ledger.block_confirmed (transaction, frontier->hash ())) + { + break; + } + + ASSERT_NO_ERROR (system.poll ()); + } + + // Make the same request, it should now return an error + { + test_response response (request, rpc.config.port, system.io_ctx); + system.deadline_set (10s); + while (response.status == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (200, response.status); + std::error_code ec (nano::error_rpc::confirmation_height_not_processing); + ASSERT_EQ (response.json.get ("error"), ec.message ()); + } +} + TEST (rpc, confirmation_history) { nano::system system (24000, 1); @@ -5806,8 +5965,13 @@ TEST (rpc, block_confirmed) auto send = std::make_shared (latest, key.pub, 10, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest)); node->process_active (send); node->block_processor.flush (); + system.deadline_set (10s); + while (!node->confirmation_height_processor.is_processing_block (send->hash ())) + { + ASSERT_NO_ERROR (system.poll ()); + } - // Wait until it has been confirmed by the network + // Wait until the confirmation height has been set system.deadline_set (10s); while (true) { @@ -5820,6 +5984,9 @@ TEST (rpc, block_confirmed) ASSERT_NO_ERROR (system.poll ()); } + // Should no longer be processing the block after confirmation is set + ASSERT_FALSE (node->confirmation_height_processor.is_processing_block (send->hash ())); + // Requesting confirmation for this should now succeed request.put ("hash", send->hash ().to_string ()); test_response response3 (request, rpc.config.port, system.io_ctx); diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 11243033c5..c2eb097e2d 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -443,3 +443,84 @@ TEST (node, mass_vote_by_hash) system.nodes[0]->block_processor.add (*i, nano::seconds_since_epoch ()); } } + +TEST (confirmation_height, long_chains) +{ + // The chains should be longer than the batch_write_size to test the amount of blocks confirmed is correct. + bool delay_frontier_confirmation_height_updating = true; + nano::system system (24000, 1, delay_frontier_confirmation_height_updating); + auto node = system.nodes.front (); + nano::keypair key1; + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + nano::block_hash latest (node->latest (nano::test_genesis_key.pub)); + system.wallet (0)->insert_adhoc (key1.prv); + + auto num_blocks = nano::confirmation_height_processor::batch_write_size * 2 + 50; // Give it a slight offset so it's not completely evenly divisible + + // First open the other account + nano::send_block send (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio + num_blocks + 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest)); + nano::open_block open (send.hash (), nano::genesis_account, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub)); + { + auto transaction = node->store.tx_begin_write (); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, open).code); + } + + // Bulk send from genesis account to destination account + auto previous_genesis_chain_hash = send.hash (); + auto previous_destination_chain_hash = open.hash (); + { + auto transaction = node->store.tx_begin_write (); + for (auto i = num_blocks - 1; i > 0; --i) + { + nano::send_block send (previous_genesis_chain_hash, key1.pub, nano::genesis_amount - nano::Gxrb_ratio + i + 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous_genesis_chain_hash)); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code); + nano::receive_block receive (previous_destination_chain_hash, send.hash (), key1.prv, key1.pub, system.work.generate (previous_destination_chain_hash)); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive).code); + + previous_genesis_chain_hash = send.hash (); + previous_destination_chain_hash = receive.hash (); + } + } + + // Send one from destination to genesis and pocket it + nano::send_block send1 (previous_destination_chain_hash, nano::test_genesis_key.pub, nano::Gxrb_ratio - 2, key1.prv, key1.pub, system.work.generate (previous_destination_chain_hash)); + auto receive1 (std::make_shared (nano::test_genesis_key.pub, previous_genesis_chain_hash, nano::genesis_account, nano::genesis_amount - nano::Gxrb_ratio + 1, send1.hash (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous_genesis_chain_hash))); + + // Unpocketed + nano::state_block send2 (nano::genesis_account, receive1->hash (), nano::genesis_account, nano::genesis_amount - nano::Gxrb_ratio, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (receive1->hash ())); + + { + auto transaction = node->store.tx_begin_write (); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send1).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *receive1).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send2).code); + } + + // Call block confirm on the existing receive block on the genesis account which will confirm everything underneath on both accounts + node->block_confirm (receive1); + + system.deadline_set (10s); + while (true) + { + auto transaction = node->store.tx_begin_read (); + if (node->ledger.block_confirmed (transaction, receive1->hash ())) + { + break; + } + + ASSERT_NO_ERROR (system.poll ()); + } + + auto transaction (node->store.tx_begin ()); + nano::account_info account_info; + ASSERT_FALSE (node->store.account_get (transaction, nano::test_genesis_key.pub, account_info)); + ASSERT_EQ (num_blocks + 2, account_info.confirmation_height); + ASSERT_EQ (num_blocks + 3, account_info.block_count); // Includes the unpocketed send + + ASSERT_FALSE (node->store.account_get (transaction, key1.pub, account_info)); + ASSERT_EQ (num_blocks + 1, account_info.confirmation_height); + ASSERT_EQ (num_blocks + 1, account_info.block_count); + + ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in), num_blocks * 2 + 2); +}