Skip to content

Commit

Permalink
Reduce length holding read transaction open for in confirmation heigh…
Browse files Browse the repository at this point in the history
…t processor (#1954)

* Reduce holding read transaction open to a few seconds maximum

* Write lock shouldn't be based on number of blocks being cemented

* Reverse order of read transactions being opened in do-while loop
  • Loading branch information
wezrule authored May 9, 2019
1 parent 29ec4f5 commit 0c4fc6e
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 25 deletions.
33 changes: 33 additions & 0 deletions nano/core_test/block_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1683,6 +1683,39 @@ TEST (block_store, incompatible_version)
}
}

TEST (block_store, reset_renew_existing_transaction)
{
nano::logger_mt logger;
bool init (false);
nano::mdb_store store (init, logger, nano::unique_path ());
ASSERT_TRUE (!init);

nano::keypair key1;
nano::open_block block (0, 1, 1, nano::keypair ().prv, 0, 0);
nano::uint256_union hash1 (block.hash ());
auto read_transaction = store.tx_begin_read ();

// Block shouldn't exist yet
auto block_non_existing (store.block_get (read_transaction, hash1));
ASSERT_EQ (nullptr, block_non_existing);

// Release resources for the transaction
read_transaction.reset ();

// Write the block
{
auto write_transaction (store.tx_begin_write ());
nano::block_sideband sideband (nano::block_type::open, 0, 0, 0, 0, 0);
store.block_put (write_transaction, hash1, block, sideband);
}

read_transaction.renew ();

// Block should exist now
auto block_existing (store.block_get (read_transaction, hash1));
ASSERT_NE (nullptr, block_existing);
}

namespace
{
// These functions take the latest account_info and create a legacy one so that upgrade tests can be emulated more easily.
Expand Down
51 changes: 32 additions & 19 deletions nano/node/confirmation_height_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ void nano::confirmation_height_processor::add_confirmation_height (nano::block_h

release_assert (receive_source_pairs.empty ());
auto error = false;

auto read_transaction (store.tx_begin_read ());
// Traverse account chain and all sources for receive blocks iteratively
do
{
Expand All @@ -117,10 +119,9 @@ void nano::confirmation_height_processor::add_confirmation_height (nano::block_h
}
}

auto transaction (store.tx_begin_read ());
auto block_height (store.block_account_height (transaction, current));
nano::account account (store.block_account (transaction, current));
release_assert (!store.account_get (transaction, account, account_info));
auto block_height (store.block_account_height (read_transaction, current));
nano::account account (store.block_account (read_transaction, current));
release_assert (!store.account_get (read_transaction, account, account_info));
auto confirmation_height = account_info.confirmation_height;

auto account_it = confirmation_height_pending_write_cache.find (account);
Expand All @@ -129,20 +130,23 @@ void nano::confirmation_height_processor::add_confirmation_height (nano::block_h
confirmation_height = account_it->second;
}

auto count_before_open_receive = receive_source_pairs.size ();
auto count_before_receive = receive_source_pairs.size ();
if (block_height > confirmation_height)
{
if ((block_height - confirmation_height) > 20000)
{
logger.always_log ("Iterating over a large account chain for setting confirmation height. The top block: ", current.to_string ());
}

collect_unconfirmed_receive_and_sources_for_account (block_height, confirmation_height, current, account, transaction);
collect_unconfirmed_receive_and_sources_for_account (block_height, confirmation_height, current, account, read_transaction);
}

// If this adds no more open_receive blocks, then we can now confirm this account as well as the linked open/receive block
// No longer need the read transaction
read_transaction.reset ();

// If this adds no more open or receive blocks, then we can now confirm this account as well as the linked open/receive block
// Collect as pending any writes to the database and do them in bulk after a certain time.
auto confirmed_receives_pending = (count_before_open_receive != receive_source_pairs.size ());
auto confirmed_receives_pending = (count_before_receive != receive_source_pairs.size ());
if (!confirmed_receives_pending)
{
if (block_height > confirmation_height)
Expand Down Expand Up @@ -193,7 +197,7 @@ void nano::confirmation_height_processor::add_confirmation_height (nano::block_h
return total += conf_height_details_a.num_blocks_confirmed;
});

if ((total_pending_write_block_count >= batch_write_size || receive_source_pairs.empty ()) && !pending_writes.empty ())
if ((pending_writes.size () >= batch_write_size || receive_source_pairs.empty ()) && !pending_writes.empty ())
{
error = write_pending (pending_writes, total_pending_write_block_count);
// Don't set any more blocks as confirmed from the original hash if an inconsistency is found
Expand All @@ -211,6 +215,8 @@ void nano::confirmation_height_processor::add_confirmation_height (nano::block_h
{
break;
}

read_transaction.renew ();
} while (!receive_source_pairs.empty () || current != hash_a);
}

Expand All @@ -225,7 +231,7 @@ bool nano::confirmation_height_processor::write_pending (std::deque<conf_height_
// Write in batches
while (total_pending_write_block_count > 0)
{
uint64_t num_block_writes = 0;
uint64_t num_accounts_processed = 0;
auto transaction (store.tx_begin_write ());
while (!all_pending_a.empty ())
{
Expand Down Expand Up @@ -257,10 +263,10 @@ bool nano::confirmation_height_processor::write_pending (std::deque<conf_height_
store.account_put (transaction, pending.account, account_info);
}
total_pending_write_block_count -= pending.num_blocks_confirmed;
num_block_writes += pending.num_blocks_confirmed;
++num_accounts_processed;
all_pending_a.erase (all_pending_a.begin ());

if (num_block_writes >= batch_write_size)
if (num_accounts_processed >= batch_write_size)
{
// Commit changes periodically to reduce time holding write locks for long chains
break;
Expand All @@ -270,19 +276,18 @@ bool nano::confirmation_height_processor::write_pending (std::deque<conf_height_
return false;
}

void nano::confirmation_height_processor::collect_unconfirmed_receive_and_sources_for_account (uint64_t block_height_a, uint64_t confirmation_height_a, nano::block_hash const & hash_a, nano::account const & account_a, nano::transaction & transaction_a)
void nano::confirmation_height_processor::collect_unconfirmed_receive_and_sources_for_account (uint64_t block_height_a, uint64_t confirmation_height_a, nano::block_hash const & hash_a, nano::account const & account_a, nano::read_transaction const & transaction_a)
{
auto hash (hash_a);
auto num_to_confirm = block_height_a - confirmation_height_a;

// Store heights of blocks
constexpr auto height_not_set = std::numeric_limits<uint64_t>::max ();
auto next_height = height_not_set;
while (num_to_confirm > 0 && !hash.is_zero ())
while ((num_to_confirm > 0) && !hash.is_zero ())
{
active.confirm_block (hash);
nano::block_sideband sideband;
auto block (store.block_get (transaction_a, hash, &sideband));
auto block (store.block_get (transaction_a, hash));
if (block)
{
auto source (block->source ());
Expand All @@ -293,19 +298,27 @@ void nano::confirmation_height_processor::collect_unconfirmed_receive_and_source

if (!source.is_zero () && source != epoch_link && store.source_exists (transaction_a, source))
{
auto block_height = confirmation_height_a + num_to_confirm;
// Set the height for the receive block above (if there is one)
if (next_height != height_not_set)
{
receive_source_pairs.back ().receive_details.num_blocks_confirmed = next_height - sideband.height;
receive_source_pairs.back ().receive_details.num_blocks_confirmed = next_height - block_height;
}

receive_source_pairs.emplace_back (conf_height_details{ account_a, hash, sideband.height, height_not_set }, source);
receive_source_pairs.emplace_back (conf_height_details{ account_a, hash, block_height, height_not_set }, source);
++receive_source_pairs_size;
next_height = sideband.height;
next_height = block_height;
}

hash = block->previous ();
}

// We could be traversing a very large account so we don't want to have open read transactions for too long.
if (num_to_confirm % batch_read_size == 0)
{
transaction_a.refresh ();
}

--num_to_confirm;
}

Expand Down
11 changes: 7 additions & 4 deletions nano/node/confirmation_height_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace nano
class block_store;
class stat;
class active_transactions;
class transaction;
class read_transaction;
class logger_mt;

class pending_confirmation_height
Expand Down Expand Up @@ -41,8 +41,11 @@ class confirmation_height_processor final
void stop ();
bool is_processing_block (nano::block_hash const &);

/** The maximum amount of blocks to write at once */
static uint64_t constexpr batch_write_size = 4096;
/** The maximum amount of accounts to iterate over while writing */
static uint64_t constexpr batch_write_size = 2048;

/** The maximum number of blocks to be read in while iterating over a long account chain */
static uint64_t constexpr batch_read_size = 4096;

private:
class conf_height_details final
Expand Down Expand Up @@ -79,7 +82,7 @@ class confirmation_height_processor final

void run ();
void add_confirmation_height (nano::block_hash const &);
void collect_unconfirmed_receive_and_sources_for_account (uint64_t, uint64_t, nano::block_hash const &, nano::account const &, nano::transaction &);
void collect_unconfirmed_receive_and_sources_for_account (uint64_t, uint64_t, nano::block_hash const &, nano::account const &, nano::read_transaction const &);
bool write_pending (std::deque<conf_height_details> &, int64_t);

friend std::unique_ptr<seq_con_info_component> collect_seq_con_info (confirmation_height_processor &, const std::string &);
Expand Down
6 changes: 6 additions & 0 deletions nano/secure/blockstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,12 @@ void nano::read_transaction::renew () const
impl->renew ();
}

void nano::read_transaction::refresh () const
{
reset ();
renew ();
}

nano::write_transaction::write_transaction (std::unique_ptr<nano::write_transaction_impl> write_transaction_impl) :
impl (std::move (write_transaction_impl))
{
Expand Down
1 change: 1 addition & 0 deletions nano/secure/blockstore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class read_transaction final : public transaction
void * get_handle () const override;
void reset () const;
void renew () const;
void refresh () const;

private:
std::unique_ptr<nano::read_transaction_impl> impl;
Expand Down
72 changes: 70 additions & 2 deletions nano/slow_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,77 @@ TEST (node, mass_vote_by_hash)
}
}

TEST (confirmation_height, many_accounts)
{
bool delay_frontier_confirmation_height_updating = true;
nano::system system;
nano::node_config node_config (24000, system.logging);
node_config.online_weight_minimum = 100;
auto node = system.add_node (node_config, delay_frontier_confirmation_height_updating);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);

// The number of frontiers should be more than the batch_write_size to test the amount of blocks confirmed is correct.
auto num_accounts = nano::confirmation_height_processor::batch_write_size * 2 + 50;
nano::keypair last_keypair = nano::test_genesis_key;
auto last_open_hash = node->latest (nano::test_genesis_key.pub);
{
auto transaction = node->store.tx_begin_write ();
for (auto i = num_accounts - 1; i > 0; --i)
{
nano::keypair key;
system.wallet (0)->insert_adhoc (key.prv);

nano::send_block send (last_open_hash, key.pub, nano::Gxrb_ratio, last_keypair.prv, last_keypair.pub, system.work.generate (last_open_hash));
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code);
nano::open_block open (send.hash (), last_keypair.pub, key.pub, key.prv, key.pub, system.work.generate (key.pub));
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, open).code);
last_open_hash = open.hash ();
last_keypair = key;
}
}

// Call block confirm on the last open block which will confirm everything
{
auto transaction = node->store.tx_begin_read ();
auto block = node->store.block_get (transaction, last_open_hash);
node->block_confirm (block);
}

system.deadline_set (60s);
while (true)
{
auto transaction = node->store.tx_begin_read ();
if (node->ledger.block_confirmed (transaction, last_open_hash))
{
break;
}

ASSERT_NO_ERROR (system.poll ());
}

auto transaction (node->store.tx_begin_read ());
// All frontiers (except last) should have 2 blocks and both should be confirmed
for (auto i (node->store.latest_begin (transaction)), n (node->store.latest_end ()); i != n; ++i)
{
auto & account = i->first;
auto & account_info = i->second;
if (account != last_keypair.pub)
{
ASSERT_EQ (2, account_info.confirmation_height);
ASSERT_EQ (2, account_info.block_count);
}
else
{
ASSERT_EQ (1, account_info.confirmation_height);
ASSERT_EQ (1, account_info.block_count);
}
}

ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in), num_accounts * 2 - 2);
}

TEST (confirmation_height, long_chains)
{
// The chains should be longer than the batch_write_size to test the amount of blocks confirmed is correct.
bool delay_frontier_confirmation_height_updating = true;
nano::system system;
auto node = system.add_node (nano::node_config (24000, system.logging), delay_frontier_confirmation_height_updating);
Expand All @@ -454,7 +522,7 @@ TEST (confirmation_height, long_chains)
nano::block_hash latest (node->latest (nano::test_genesis_key.pub));
system.wallet (0)->insert_adhoc (key1.prv);

auto num_blocks = nano::confirmation_height_processor::batch_write_size * 2 + 50; // Give it a slight offset so it's not completely evenly divisible
constexpr auto num_blocks = 10000;

// First open the other account
nano::send_block send (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio + num_blocks + 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest));
Expand Down

0 comments on commit 0c4fc6e

Please sign in to comment.