Skip to content

Commit

Permalink
Throttled received block for bootstrap (#2216)
Browse files Browse the repository at this point in the history
* Throttled received block for bootstrap

* bulk_push_server::throttled_receive ()

* block_processor_full & block_processor_half_full tests

* Extract nano::block_processor::size () function
  • Loading branch information
SergiySW authored and guilhermelawless committed Aug 15, 2019
1 parent 3209fbe commit ce285af
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 25 deletions.
57 changes: 57 additions & 0 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2802,6 +2802,63 @@ TEST (node, block_processor_reject_rolled_back)
ASSERT_TRUE (node.active.empty ());
}

TEST (node, block_processor_full)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 2;
auto & node = *system.add_node (nano::node_config (24000, system.logging), node_flags);
nano::genesis genesis;
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node.work_generate_blocking (*send1);
auto send2 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node.work_generate_blocking (*send2);
auto send3 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send2->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 3 * nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node.work_generate_blocking (*send3);
// The write guard prevents block processor doing any writes
auto write_guard = node.write_database_queue.wait (nano::writer::confirmation_height);
node.block_processor.add (send1);
ASSERT_FALSE (node.block_processor.full ());
node.block_processor.add (send2);
ASSERT_FALSE (node.block_processor.full ());
node.block_processor.add (send3);
// Block processor may be not full during state blocks signatures verification
system.deadline_set (2s);
while (!node.block_processor.full ())
{
ASSERT_NO_ERROR (system.poll ());
}
}

TEST (node, block_processor_half_full)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 4;
auto & node = *system.add_node (nano::node_config (24000, system.logging), node_flags);
nano::genesis genesis;
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node.work_generate_blocking (*send1);
auto send2 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node.work_generate_blocking (*send2);
auto send3 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send2->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 3 * nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node.work_generate_blocking (*send3);
// The write guard prevents block processor doing any writes
auto write_guard = node.write_database_queue.wait (nano::writer::confirmation_height);
node.block_processor.add (send1);
ASSERT_FALSE (node.block_processor.half_full ());
node.block_processor.add (send2);
ASSERT_FALSE (node.block_processor.half_full ());
node.block_processor.add (send3);
// Block processor may be not half_full during state blocks signatures verification
system.deadline_set (2s);
while (!node.block_processor.half_full ())
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_FALSE (node.block_processor.full ());
}

TEST (node, confirm_back)
{
nano::system system (24000, 1);
Expand Down
14 changes: 12 additions & 2 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,20 @@ void nano::block_processor::flush ()
}
}

bool nano::block_processor::full ()
size_t nano::block_processor::size ()
{
std::unique_lock<std::mutex> lock (mutex);
return (blocks.size () + state_blocks.size ()) > node.flags.block_processor_full_size;
return (blocks.size () + state_blocks.size () + forced.size ());
}

bool nano::block_processor::full ()
{
return size () > node.flags.block_processor_full_size;
}

bool nano::block_processor::half_full ()
{
return size () > node.flags.block_processor_full_size / 2;
}

void nano::block_processor::add (std::shared_ptr<nano::block> block_a, uint64_t origination)
Expand Down
2 changes: 2 additions & 0 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ class block_processor final
~block_processor ();
void stop ();
void flush ();
size_t size ();
bool full ();
bool half_full ();
void add (nano::unchecked_info const &);
void add (std::shared_ptr<nano::block>, uint64_t = 0);
void force (std::shared_ptr<nano::block>);
Expand Down
62 changes: 39 additions & 23 deletions nano/node/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ void nano::bulk_pull_client::request ()
req, [this_l](boost::system::error_code const & ec, size_t size_a) {
if (!ec)
{
this_l->receive_block ();
this_l->throttled_receive_block ();
}
else
{
Expand All @@ -355,6 +355,24 @@ void nano::bulk_pull_client::request ()
false); // is bootstrap traffic is_droppable false
}

void nano::bulk_pull_client::throttled_receive_block ()
{
if (!connection->node->block_processor.half_full ())
{
receive_block ();
}
else
{
auto this_l (shared_from_this ());
connection->node->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_l]() {
if (!this_l->connection->attempt->stopped)
{
this_l->throttled_receive_block ();
}
});
}
}

void nano::bulk_pull_client::receive_block ()
{
auto this_l (shared_from_this ());
Expand Down Expand Up @@ -918,14 +936,7 @@ void nano::bootstrap_attempt::run ()
{
if (!pulls.empty ())
{
if (!node->block_processor.full ())
{
request_pull (lock);
}
else
{
condition.wait_for (lock, std::chrono::seconds (15));
}
request_pull (lock);
}
else
{
Expand Down Expand Up @@ -1356,14 +1367,7 @@ void nano::bootstrap_attempt::lazy_run ()
{
if (!pulls.empty ())
{
if (!node->block_processor.full ())
{
request_pull (lock);
}
else
{
condition.wait_for (lock, std::chrono::seconds (15));
}
request_pull (lock);
}
else
{
Expand Down Expand Up @@ -2400,7 +2404,7 @@ class request_response_visitor : public nano::message_visitor
void bulk_push (nano::bulk_push const &) override
{
auto response (std::make_shared<nano::bulk_push_server> (connection));
response->receive ();
response->throttled_receive ();
}
void frontier_req (nano::frontier_req const &) override
{
Expand Down Expand Up @@ -3052,6 +3056,21 @@ connection (connection_a)
receive_buffer->resize (256);
}

void nano::bulk_push_server::throttled_receive ()
{
if (!connection->node->block_processor.half_full ())
{
receive ();
}
else
{
auto this_l (shared_from_this ());
connection->node->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_l]() {
this_l->throttled_receive ();
});
}
}

void nano::bulk_push_server::receive ()
{
if (connection->node->bootstrap_initiator.in_progress ())
Expand Down Expand Up @@ -3150,11 +3169,8 @@ void nano::bulk_push_server::received_block (boost::system::error_code const & e
auto block (nano::deserialize_block (stream, type_a));
if (block != nullptr && !nano::work_validate (*block))
{
if (!connection->node->block_processor.full ())
{
connection->node->process_active (std::move (block));
}
receive ();
connection->node->process_active (std::move (block));
throttled_receive ();
}
else
{
Expand Down
2 changes: 2 additions & 0 deletions nano/node/bootstrap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ class bulk_pull_client final : public std::enable_shared_from_this<nano::bulk_pu
~bulk_pull_client ();
void request ();
void receive_block ();
void throttled_receive_block ();
void received_type ();
void received_block (boost::system::error_code const &, size_t, nano::block_type);
nano::block_hash first ();
Expand Down Expand Up @@ -374,6 +375,7 @@ class bulk_push_server final : public std::enable_shared_from_this<nano::bulk_pu
{
public:
explicit bulk_push_server (std::shared_ptr<nano::bootstrap_server> const &);
void throttled_receive ();
void receive ();
void received_type ();
void received_block (boost::system::error_code const &, size_t, nano::block_type);
Expand Down

0 comments on commit ce285af

Please sign in to comment.