Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continuous backlog population #3999

Merged
merged 10 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nano/core_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ add_executable(
fakes/websocket_client.hpp
fakes/work_peer.hpp
active_transactions.cpp
backlog.cpp
block.cpp
block_store.cpp
blockprocessor.cpp
Expand Down
4 changes: 4 additions & 0 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ TEST (active_transactions, confirm_election_by_request)
// Ensure election on node1 is already confirmed before connecting with node2
ASSERT_TIMELY (5s, nano::test::confirmed (node1, { send1 }));

// Wait for the election to be removed and give time for any in-flight vote broadcasts to settle
ASSERT_TIMELY (5s, node1.active.empty ());
WAIT (1s);

// At this point node1 should not generate votes for send1 block unless it receives a request

// Create a second node
Expand Down
107 changes: 107 additions & 0 deletions nano/core_test/backlog.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#include <nano/node/active_transactions.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>

#include <gtest/gtest.h>

#include <numeric>

using namespace std::chrono_literals;

namespace
{
using block_list_t = std::vector<std::shared_ptr<nano::block>>;

/*
* Creates `count` 1 raw sends from genesis to unique accounts and corresponding open blocks.
* The genesis chain is then confirmed, but leaves open blocks unconfirmed
* The list of unconfirmed open blocks is returned.
*/
block_list_t setup_independent_blocks (nano::test::system & system, nano::node & node, int count)
{
std::vector<std::shared_ptr<nano::block>> blocks;

auto latest = node.latest (nano::dev::genesis_key.pub);
auto balance = node.balance (nano::dev::genesis_key.pub);

for (int n = 0; n < count; ++n)
{
nano::keypair key;
nano::block_builder builder;

balance -= 1;
auto send = builder
.state ()
.account (nano::dev::genesis_key.pub)
.previous (latest)
.representative (nano::dev::genesis_key.pub)
.balance (balance)
.link (key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (latest))
.build_shared ();
latest = send->hash ();

auto open = builder
.state ()
.account (key.pub)
.previous (0)
.representative (key.pub)
.balance (1)
.link (send->hash ())
.sign (key.prv, key.pub)
.work (*system.work.generate (key.pub))
.build_shared ();

EXPECT_TRUE (nano::test::process (node, { send, open }));
EXPECT_TIMELY (5s, nano::test::exists (node, { send, open })); // Ensure blocks are in the ledger

blocks.push_back (open);
}

// Confirm whole genesis chain at once
EXPECT_TIMELY (5s, nano::test::confirm (node, { latest }));
EXPECT_TIMELY (5s, nano::test::confirmed (node, { latest }));

return blocks;
}
}

/*
* Ensures all not confirmed accounts get activated by backlog scan periodically
*/
TEST (backlog, population)
{
nano::test::system system{};
auto & node = *system.add_node ();

nano::mutex mutex;
std::unordered_set<nano::account> activated;

node.backlog.activate_callback.add ([&] (nano::transaction const & transaction, nano::account const & account, nano::account_info const & account_info, nano::confirmation_height_info const & conf_info) {
nano::lock_guard<nano::mutex> lock{ mutex };

activated.insert (account);
});

auto blocks = setup_independent_blocks (system, node, 256);

// Checks if `activated` set contains all accounts we previously set up
auto all_activated = [&] () {
nano::lock_guard<nano::mutex> lock{ mutex };
return std::all_of (blocks.begin (), blocks.end (), [&] (auto const & item) {
auto account = item->account ();
debug_assert (!account.is_zero ());
return activated.count (account) != 0;
});
};
ASSERT_TIMELY (5s, all_activated ());

// Clear activated set to ensure we activate those accounts more than once
{
nano::lock_guard<nano::mutex> lock{ mutex };
activated.clear ();
}

ASSERT_TIMELY (5s, all_activated ());
}
7 changes: 7 additions & 0 deletions nano/core_test/toml.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ TEST (toml, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.work_peers, defaults.node.work_peers);
ASSERT_EQ (conf.node.work_threads, defaults.node.work_threads);
ASSERT_EQ (conf.node.max_queued_requests, defaults.node.max_queued_requests);
ASSERT_EQ (conf.node.backlog_scan_batch_size, defaults.node.backlog_scan_batch_size);
ASSERT_EQ (conf.node.backlog_scan_frequency, defaults.node.backlog_scan_frequency);

ASSERT_EQ (conf.node.logging.bulk_pull_logging_value, defaults.node.logging.bulk_pull_logging_value);
ASSERT_EQ (conf.node.logging.flush, defaults.node.logging.flush);
Expand Down Expand Up @@ -433,6 +435,9 @@ TEST (toml, daemon_config_deserialize_no_defaults)
max_work_generate_multiplier = 1.0
max_queued_requests = 999
frontiers_confirmation = "always"
backlog_scan_batch_size = 999
backlog_scan_frequency = 999

[node.diagnostics.txn_tracking]
enable = true
ignore_writes_below_block_processor_max_time = false
Expand Down Expand Up @@ -602,6 +607,8 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.work_peers, defaults.node.work_peers);
ASSERT_NE (conf.node.work_threads, defaults.node.work_threads);
ASSERT_NE (conf.node.max_queued_requests, defaults.node.max_queued_requests);
ASSERT_NE (conf.node.backlog_scan_batch_size, defaults.node.backlog_scan_batch_size);
ASSERT_NE (conf.node.backlog_scan_frequency, defaults.node.backlog_scan_frequency);

ASSERT_NE (conf.node.logging.bulk_pull_logging_value, defaults.node.logging.bulk_pull_logging_value);
ASSERT_NE (conf.node.logging.flush, defaults.node.logging.flush);
Expand Down
10 changes: 10 additions & 0 deletions nano/lib/stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,9 @@ std::string nano::stat::type_to_string (stat::type type)
case nano::stat::type::active:
res = "active";
break;
case nano::stat::type::backlog:
res = "backlog";
break;
}
return res;
}
Expand All @@ -572,6 +575,9 @@ std::string nano::stat::detail_to_string (stat::detail detail)
case nano::stat::detail::loop:
res = "loop";
break;
case nano::stat::detail::total:
res = "total";
break;
case nano::stat::detail::queue:
res = "queue";
break;
Expand Down Expand Up @@ -811,6 +817,7 @@ std::string nano::stat::detail_to_string (stat::detail detail)
break;
case nano::stat::detail::election_not_confirmed:
res = "election_not_confirmed";
break;
case nano::stat::detail::election_hinted_overflow:
res = "election_hinted_overflow";
break;
Expand Down Expand Up @@ -1045,6 +1052,9 @@ std::string nano::stat::detail_to_string (stat::detail detail)
case nano::stat::detail::channel_full:
res = "channel_full";
break;
case nano::stat::detail::activated:
res = "activated";
break;
}
return res;
}
Expand Down
5 changes: 5 additions & 0 deletions nano/lib/stats.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ class stat final
blockprocessor,
bootstrap_server,
active,
backlog,
};

/** Optional detail type */
Expand All @@ -258,6 +259,7 @@ class stat final

// common
loop,
total,

// processing queue
queue,
Expand Down Expand Up @@ -458,6 +460,9 @@ class stat final
response_blocks,
response_account_info,
channel_full,

// backlog
activated,
};

/** Direction of the stat. If the direction is irrelevant, use in */
Expand Down
94 changes: 64 additions & 30 deletions nano/node/backlog_population.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,42 @@
#include <nano/node/nodeconfig.hpp>
#include <nano/secure/store.hpp>

nano::backlog_population::backlog_population (const config & config_a, nano::store & store_a, nano::election_scheduler & scheduler_a) :
nano::backlog_population::backlog_population (const config & config_a, nano::store & store_a, nano::stat & stats_a) :
config_m{ config_a },
store_m{ store_a },
scheduler{ scheduler_a }
store{ store_a },
stats{ stats_a }
{
}

nano::backlog_population::~backlog_population ()
{
stop ();
if (thread.joinable ())
{
thread.join ();
}
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
}

void nano::backlog_population::start ()
{
if (!thread.joinable ())
{
thread = std::thread{ [this] () { run (); } };
}
debug_assert (!thread.joinable ());

thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::backlog_population);
run ();
} };
}

void nano::backlog_population::stop ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
stopped = true;
notify ();
nano::join_or_pass (thread);
}

void nano::backlog_population::trigger ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
triggered = true;
{
nano::unique_lock<nano::mutex> lock{ mutex };
triggered = true;
}
notify ();
}

Expand All @@ -49,48 +50,81 @@ void nano::backlog_population::notify ()

bool nano::backlog_population::predicate () const
{
return triggered;
return triggered || config_m.enabled;
}

void nano::backlog_population::run ()
{
nano::thread_role::set (nano::thread_role::name::backlog_population);
const auto delay = std::chrono::seconds{ config_m.delay_between_runs_in_seconds };
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
if (predicate () || config_m.ongoing_backlog_population_enabled)
if (predicate ())
{
stats.inc (nano::stat::type::backlog, nano::stat::detail::loop);

triggered = false;
lock.unlock ();
populate_backlog ();
lock.lock ();
}

condition.wait_for (lock, delay, [this] () {
condition.wait (lock, [this] () {
return stopped || predicate ();
});
}
}

void nano::backlog_population::populate_backlog ()
{
debug_assert (config_m.frequency > 0);

const auto chunk_size = config_m.batch_size / config_m.frequency;
auto done = false;
uint64_t const chunk_size = 65536;
nano::account next = 0;
uint64_t total = 0;
while (!stopped && !done)
{
auto transaction = store_m.tx_begin_read ();
auto count = 0;
auto i = store_m.account.begin (transaction, next);
const auto end = store_m.account.end ();
for (; !stopped && i != end && count < chunk_size; ++i, ++count, ++total)
{
auto const & account = i->first;
scheduler.activate (account, transaction);
next = account.number () + 1;
auto transaction = store.tx_begin_read ();

auto count = 0;
auto i = store.account.begin (transaction, next);
const auto end = store.account.end ();
for (; !stopped && i != end && count < chunk_size; ++i, ++count, ++total)
{
stats.inc (nano::stat::type::backlog, nano::stat::detail::total);

auto const & account = i->first;
activate (transaction, account);
next = account.number () + 1;
}
done = store.account.begin (transaction, next) == end;
}
done = store_m.account.begin (transaction, next) == end;

// Give the rest of the node time to progress without holding database lock
std::this_thread::sleep_for (std::chrono::milliseconds (1000 / config_m.frequency));
}
}

void nano::backlog_population::activate (nano::transaction const & transaction, nano::account const & account)
{
debug_assert (!activate_callback.empty ());

auto const maybe_account_info = store.account.get (transaction, account);
if (!maybe_account_info)
{
return;
}
auto const account_info = *maybe_account_info;

auto const maybe_conf_info = store.confirmation_height.get (transaction, account);
auto const conf_info = maybe_conf_info.value_or (nano::confirmation_height_info{});

// If conf info is empty then it means then it means nothing is confirmed yet
if (conf_info.height < account_info.block_count)
{
stats.inc (nano::stat::type::backlog, nano::stat::detail::activated);

activate_callback.notify (transaction, account, account_info, conf_info);
}
}
Loading