Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk pull account #1039

Merged
merged 5 commits into from
Aug 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rai/core_test/message_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class test_visitor : public rai::message_visitor
confirm_req_count (0),
confirm_ack_count (0),
bulk_pull_count (0),
bulk_pull_account_count (0),
bulk_pull_blocks_count (0),
bulk_push_count (0),
frontier_req_count (0)
Expand All @@ -37,6 +38,10 @@ class test_visitor : public rai::message_visitor
{
++bulk_pull_count;
}
void bulk_pull_account (rai::bulk_pull_account const &) override
{
++bulk_pull_account_count;
}
void bulk_pull_blocks (rai::bulk_pull_blocks const &) override
{
++bulk_pull_blocks_count;
Expand All @@ -58,6 +63,7 @@ class test_visitor : public rai::message_visitor
uint64_t confirm_req_count;
uint64_t confirm_ack_count;
uint64_t bulk_pull_count;
uint64_t bulk_pull_account_count;
uint64_t bulk_pull_blocks_count;
uint64_t bulk_push_count;
uint64_t frontier_req_count;
Expand Down
337 changes: 337 additions & 0 deletions rai/node/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1340,6 +1340,15 @@ void rai::bootstrap_server::receive_header_action (boost::system::error_code con
});
break;
}
case rai::message_type::bulk_pull_account:
{
node->stats.inc (rai::stat::type::bootstrap, rai::stat::detail::bulk_pull_account, rai::stat::dir::in);
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, sizeof (rai::uint256_union) + sizeof (rai::uint128_union) + sizeof (uint8_t), [this_l, header](boost::system::error_code const & ec, size_t size_a) {
this_l->receive_bulk_pull_account_action (ec, size_a, header);
});
break;
}
case rai::message_type::bulk_pull_blocks:
{
node->stats.inc (rai::stat::type::bootstrap, rai::stat::detail::bulk_pull_blocks, rai::stat::dir::in);
Expand Down Expand Up @@ -1403,6 +1412,26 @@ void rai::bootstrap_server::receive_bulk_pull_action (boost::system::error_code
}
}

void rai::bootstrap_server::receive_bulk_pull_account_action (boost::system::error_code const & ec, size_t size_a, rai::message_header const & header_a)
{
if (!ec)
{
auto error (false);
assert (size_a == (sizeof (rai::uint256_union) + sizeof (rai::uint128_union) + sizeof (uint8_t)));
rai::bufferstream stream (receive_buffer->data (), size_a);
std::unique_ptr<rai::bulk_pull_account> request (new rai::bulk_pull_account (error, stream, header_a));
if (!error)
{
if (node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (node->log) << boost::str (boost::format ("Received bulk pull account for %1% with a minimum amount of %2%") % request->account.to_account () % rai::amount (request->minimum_amount).format_balance (rai::Mxrb_ratio, 10, true));
}
add_request (std::unique_ptr<rai::message> (request.release ()));
receive ();
}
}
}

void rai::bootstrap_server::receive_bulk_pull_blocks_action (boost::system::error_code const & ec, size_t size_a, rai::message_header const & header_a)
{
if (!ec)
Expand Down Expand Up @@ -1500,6 +1529,11 @@ class request_response_visitor : public rai::message_visitor
auto response (std::make_shared<rai::bulk_pull_server> (connection, std::unique_ptr<rai::bulk_pull> (static_cast<rai::bulk_pull *> (connection->requests.front ().release ()))));
response->send_next ();
}
void bulk_pull_account (rai::bulk_pull_account const &) override
{
auto response (std::make_shared<rai::bulk_pull_account_server> (connection, std::unique_ptr<rai::bulk_pull_account> (static_cast<rai::bulk_pull_account *> (connection->requests.front ().release ()))));
response->send_frontier ();
}
void bulk_pull_blocks (rai::bulk_pull_blocks const &) override
{
auto response (std::make_shared<rai::bulk_pull_blocks_server> (connection, std::unique_ptr<rai::bulk_pull_blocks> (static_cast<rai::bulk_pull_blocks *> (connection->requests.front ().release ()))));
Expand Down Expand Up @@ -1697,6 +1731,309 @@ send_buffer (std::make_shared<std::vector<uint8_t>> ())
set_current_end ();
}

/**
* Bulk pull blocks related to an account
*/
void rai::bulk_pull_account_server::set_params ()
{
assert (request != nullptr);

/*
* Parse the flags
*/
invalid_request = false;
if (request->flags == rai::bulk_pull_account_flags::pending_address_only)
{
pending_address_only = true;
}
else if (request->flags == rai::bulk_pull_account_flags::pending_hash_and_amount)
{
pending_address_only = false;
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Invalid bulk_pull_account flags supplied %1%") % static_cast<uint8_t> (request->flags));
}

invalid_request = true;

return;
}

/*
* Initialize the current item from the requested account
*/
current_key.account = request->account;
current_key.hash = 0;
}

void rai::bulk_pull_account_server::send_frontier ()
{
/*
* This function is really the entry point into this class,
* so handle the invalid_request case by terminating the
* request without any response
*/
if (invalid_request)
{
connection->finish_request ();

return;
}

/*
* Supply the account frontier
*/
/**
** Establish a database transaction
**/
rai::transaction stream_transaction (connection->node->store.environment, nullptr, false);

/**
** Get account balance and frontier block hash
**/
auto account_frontier_hash (connection->node->ledger.latest (stream_transaction, request->account));
auto account_frontier_balance_int (connection->node->ledger.account_balance (stream_transaction, request->account));
rai::uint128_union account_frontier_balance (account_frontier_balance_int);

/**
** Write the frontier block hash and balance into a buffer
**/
send_buffer->clear ();
{
rai::vectorstream output_stream (*send_buffer);

write (output_stream, account_frontier_hash.bytes);
write (output_stream, account_frontier_balance.bytes);
}

/**
** Send the buffer to the requestor
**/
auto this_l (shared_from_this ());
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}

void rai::bulk_pull_account_server::send_next_block ()
{
/*
* Get the next item from the queue, it is a tuple with the key (which
* contains the account and hash) and data (which contains the amount)
*/
auto block_data (get_next ());
auto block_info_key (block_data.first.get ());
auto block_info (block_data.second.get ());

if (block_info_key != nullptr)
{
/*
* If we have a new item, emit it to the socket
*/
send_buffer->clear ();

if (pending_address_only)
{
rai::vectorstream output_stream (*send_buffer);

if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Sending address: %1%") % block_info->source.to_string ());
}

write (output_stream, block_info->source.bytes);
}
else
{
rai::vectorstream output_stream (*send_buffer);

if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Sending block: %1%") % block_info_key->hash.to_string ());
}

write (output_stream, block_info_key->hash.bytes);
write (output_stream, block_info->amount.bytes);
}

auto this_l (shared_from_this ());
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}
else
{
/*
* Otherwise, finalize the connection
*/
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Done sending blocks"));
}

send_finished ();
}
}

std::pair<std::unique_ptr<rai::pending_key>, std::unique_ptr<rai::pending_info>> rai::bulk_pull_account_server::get_next ()
{
std::pair<std::unique_ptr<rai::pending_key>, std::unique_ptr<rai::pending_info>> result;

while (true)
{
/*
* For each iteration of this loop, establish and then
* destroy a database transaction, to avoid locking the
* database for a prolonged period.
*/
rai::transaction stream_transaction (connection->node->store.environment, nullptr, false);
auto stream (connection->node->store.pending_begin (stream_transaction, current_key));

if (stream->first == nullptr)
{
break;
}

rai::pending_key key (stream->first);
rai::pending_info info (stream->second);

/*
* Get the key for the next value, to use in the next call or iteration
*/
current_key.account = key.account;
current_key.hash = key.hash.number () + 1;

/*
* Finish up if the response is for a different account
*/
if (key.account != request->account)
{
break;
}

/*
* Skip entries where the amount is less than the requested
* minimum
*/
if (info.amount < request->minimum_amount)
{
continue;
}

/*
* If the pending_address_only flag is set, de-duplicate the
* responses. The responses are the address of the sender,
* so they are are part of the pending table's information
* and not key, so we have to de-duplicate them manually.
*/
if (pending_address_only)
{
if (deduplication.count (info.source) != 0)
{
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick question about this: We don't seem to use continue anywhere else, and only a handful of loop breaks. Maybe this is considered under the same umbrella as early returns? If so, it could be restructured to drop the break and continue statements, but maybe it's just personal preference.

}

deduplication.insert ({ info.source, true });
}

result.first = std::unique_ptr<rai::pending_key> (new rai::pending_key (key));
result.second = std::unique_ptr<rai::pending_info> (new rai::pending_info (info));

break;
}

return result;
}

void rai::bulk_pull_account_server::sent_action (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
send_next_block ();
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Unable to bulk send block: %1%") % ec.message ());
}
}
}

void rai::bulk_pull_account_server::send_finished ()
{
/*
* The "bulk_pull_account" final sequence is a final block of all
* zeros. If we are sending only account public keys (with the
* "pending_address_only" flag) then it will be 256-bits of zeros,
* otherwise it will be 384-bits of zeros.
*/
send_buffer->clear ();

{
rai::vectorstream output_stream (*send_buffer);
rai::uint256_union account_zero (0);
rai::uint128_union balance_zero (0);

write (output_stream, account_zero.bytes);

if (!pending_address_only)
{
write (output_stream, balance_zero.bytes);
}
}

auto this_l (shared_from_this ());

if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << "Bulk sending for an account finished";
}

connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->complete (ec, size_a);
});
}

void rai::bulk_pull_account_server::complete (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
if (pending_address_only)
{
assert (size_a == 32);
}
else
{
assert (size_a == 48);
}

connection->finish_request ();
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << "Unable to pending-as-zero";
}
}
}

rai::bulk_pull_account_server::bulk_pull_account_server (std::shared_ptr<rai::bootstrap_server> const & connection_a, std::unique_ptr<rai::bulk_pull_account> request_a) :
connection (connection_a),
request (std::move (request_a)),
send_buffer (std::make_shared<std::vector<uint8_t>> ()),
current_key (0, 0)
{
/*
* Setup the streaming response for the first call to "send_frontier" and "send_next_block"
*/
set_params ();
}

/**
* Bulk pull of a range of blocks, or a checksum for a range of
* blocks [min_hash, max_hash) up to a max of max_count. mode
Expand Down
Loading