Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttled received block for bootstrap #2216

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2802,6 +2802,63 @@ TEST (node, block_processor_reject_rolled_back)
ASSERT_TRUE (node.active.empty ());
}

TEST (node, block_processor_full)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 2;
auto & node = *system.add_node (nano::node_config (24000, system.logging), node_flags);
nano::genesis genesis;
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node.work_generate_blocking (*send1);
auto send2 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node.work_generate_blocking (*send2);
auto send3 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send2->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 3 * nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node.work_generate_blocking (*send3);
// The write guard prevents block processor doing any writes
auto write_guard = node.write_database_queue.wait (nano::writer::confirmation_height);
node.block_processor.add (send1);
ASSERT_FALSE (node.block_processor.full ());
node.block_processor.add (send2);
ASSERT_FALSE (node.block_processor.full ());
node.block_processor.add (send3);
// Block processor may be not full during state blocks signatures verification
system.deadline_set (2s);
while (!node.block_processor.full ())
{
ASSERT_NO_ERROR (system.poll ());
}
}

TEST (node, block_processor_half_full)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 4;
auto & node = *system.add_node (nano::node_config (24000, system.logging), node_flags);
nano::genesis genesis;
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node.work_generate_blocking (*send1);
auto send2 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node.work_generate_blocking (*send2);
auto send3 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send2->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 3 * nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node.work_generate_blocking (*send3);
// The write guard prevents block processor doing any writes
auto write_guard = node.write_database_queue.wait (nano::writer::confirmation_height);
node.block_processor.add (send1);
ASSERT_FALSE (node.block_processor.half_full ());
node.block_processor.add (send2);
ASSERT_FALSE (node.block_processor.half_full ());
node.block_processor.add (send3);
// Block processor may be not half_full during state blocks signatures verification
system.deadline_set (2s);
while (!node.block_processor.half_full ())
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_FALSE (node.block_processor.full ());
}

TEST (node, confirm_back)
{
nano::system system (24000, 1);
Expand Down
6 changes: 6 additions & 0 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ bool nano::block_processor::full ()
return (blocks.size () + state_blocks.size ()) > node.flags.block_processor_full_size;
}

bool nano::block_processor::half_full ()
{
std::unique_lock<std::mutex> lock (mutex);
return (blocks.size () + state_blocks.size ()) > node.flags.block_processor_full_size / 2;
}

void nano::block_processor::add (std::shared_ptr<nano::block> block_a, uint64_t origination)
{
nano::unchecked_info info (block_a, 0, origination, nano::signature_verification::unknown);
Expand Down
1 change: 1 addition & 0 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class block_processor final
void stop ();
void flush ();
bool full ();
bool half_full ();
void add (nano::unchecked_info const &);
void add (std::shared_ptr<nano::block>, uint64_t = 0);
void force (std::shared_ptr<nano::block>);
Expand Down
63 changes: 40 additions & 23 deletions nano/node/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ void nano::bulk_pull_client::request ()
req, [this_l](boost::system::error_code const & ec, size_t size_a) {
if (!ec)
{
this_l->receive_block ();
this_l->throttled_receive_block ();
}
else
{
Expand All @@ -355,6 +355,24 @@ void nano::bulk_pull_client::request ()
false); // is bootstrap traffic is_droppable false
}

void nano::bulk_pull_client::throttled_receive_block ()
{
if (!connection->node->block_processor.half_full ())
{
receive_block ();
}
else
{
auto this_l (shared_from_this ());
connection->node->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_l]() {
if (!this_l->connection->attempt->stopped)
{
this_l->throttled_receive_block ();
}
});
}
}

void nano::bulk_pull_client::receive_block ()
{
auto this_l (shared_from_this ());
Expand Down Expand Up @@ -918,14 +936,7 @@ void nano::bootstrap_attempt::run ()
{
if (!pulls.empty ())
{
if (!node->block_processor.full ())
{
request_pull (lock);
}
else
{
condition.wait_for (lock, std::chrono::seconds (15));
}
request_pull (lock);
}
else
{
Expand Down Expand Up @@ -1356,14 +1367,7 @@ void nano::bootstrap_attempt::lazy_run ()
{
if (!pulls.empty ())
{
if (!node->block_processor.full ())
{
request_pull (lock);
}
else
{
condition.wait_for (lock, std::chrono::seconds (15));
}
request_pull (lock);
}
else
{
Expand Down Expand Up @@ -2400,7 +2404,7 @@ class request_response_visitor : public nano::message_visitor
void bulk_push (nano::bulk_push const &) override
{
auto response (std::make_shared<nano::bulk_push_server> (connection));
response->receive ();
response->throttled_receive ();
}
void frontier_req (nano::frontier_req const &) override
{
Expand Down Expand Up @@ -3052,6 +3056,22 @@ connection (connection_a)
receive_buffer->resize (256);
}


void nano::bulk_push_server::throttled_receive ()
{
if (!connection->node->block_processor.half_full ())
{
receive ();
}
else
{
auto this_l (shared_from_this ());
connection->node->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_l]() {
this_l->throttled_receive ();
});
}
}

void nano::bulk_push_server::receive ()
{
if (connection->node->bootstrap_initiator.in_progress ())
Expand Down Expand Up @@ -3150,11 +3170,8 @@ void nano::bulk_push_server::received_block (boost::system::error_code const & e
auto block (nano::deserialize_block (stream, type_a));
if (block != nullptr && !nano::work_validate (*block))
{
if (!connection->node->block_processor.full ())
{
connection->node->process_active (std::move (block));
}
receive ();
connection->node->process_active (std::move (block));
throttled_receive ();
}
else
{
Expand Down
2 changes: 2 additions & 0 deletions nano/node/bootstrap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ class bulk_pull_client final : public std::enable_shared_from_this<nano::bulk_pu
~bulk_pull_client ();
void request ();
void receive_block ();
void throttled_receive_block ();
void received_type ();
void received_block (boost::system::error_code const &, size_t, nano::block_type);
nano::block_hash first ();
Expand Down Expand Up @@ -374,6 +375,7 @@ class bulk_push_server final : public std::enable_shared_from_this<nano::bulk_pu
{
public:
explicit bulk_push_server (std::shared_ptr<nano::bootstrap_server> const &);
void throttled_receive ();
void receive ();
void received_type ();
void received_block (boost::system::error_code const &, size_t, nano::block_type);
Expand Down