Skip to content

Commit

Permalink
Decouple message deserializer from the TCP socket (#4160)
Browse files Browse the repository at this point in the history
* message_deserializer without socket (part 1)
* message_deserializer without socket (part 2)
* Add a guard clause for message_deserializer::channel_read_fn
* Modify inproc channel to use message_deserializer
* Renaming to read_query and read_op for name simplicity.
* Moving socket related code to a member function of socket.
* Avoiding possible memory traps for the shared pointers
* Getting a socket copy instead of tcp_server's

---------

Co-authored-by: clemahieu <[email protected]>
  • Loading branch information
thsfs and clemahieu authored Mar 1, 2023
1 parent 9f682b4 commit a60b60a
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 35 deletions.
42 changes: 31 additions & 11 deletions nano/node/transport/inproc.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include <nano/node/network.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/transport/message_deserializer.hpp>

#include <boost/format.hpp>

Expand Down Expand Up @@ -53,23 +55,41 @@ class message_visitor_inbound : public nano::message_visitor
*/
void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a)
{
// we create a temporary channel for the reply path, in case the receiver of the message wants to reply
auto remote_channel = std::make_shared<nano::transport::inproc::channel> (destination, node);
auto offset = 0u;
auto const buffer_read_fn = [&offset, buffer_v = buffer_a.to_bytes ()] (std::shared_ptr<std::vector<uint8_t>> const & data_a, size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
debug_assert (buffer_v.size () >= (offset + size_a));
data_a->resize (size_a);
auto const copy_start = buffer_v.begin () + offset;
std::copy (copy_start, copy_start + size_a, data_a->data ());
offset += size_a;
callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size_a);
};

// create an inbound message visitor class to handle incoming messages because that's what the message parser expects
message_visitor_inbound visitor{ destination.network.inbound, remote_channel };
auto const message_deserializer = std::make_shared<nano::transport::message_deserializer> (node.network_params.network, node.network.publish_filter, node.block_uniquer, node.vote_uniquer, buffer_read_fn);
message_deserializer->read (
[this] (boost::system::error_code ec_a, std::unique_ptr<nano::message> message_a) {
if (ec_a || !message_a)
{
return;
}

nano::message_parser parser{ destination.network.publish_filter, destination.block_uniquer, destination.vote_uniquer, visitor, destination.work, destination.network_params.network };
// we create a temporary channel for the reply path, in case the receiver of the message wants to reply
auto remote_channel = std::make_shared<nano::transport::inproc::channel> (destination, node);

// parse the message and action any work that needs to be done on that object via the visitor object
auto bytes = buffer_a.to_bytes ();
auto size = bytes.size ();
parser.deserialize_buffer (bytes.data (), size);
// process message
{
node.stats.inc (nano::stat::type::message, nano::to_stat_detail (message_a->header.type), nano::stat::dir::in);

// create an inbound message visitor class to handle incoming messages
message_visitor_inbound visitor{ destination.network.inbound, remote_channel };
message_a->visit (visitor);
}
});

if (callback_a)
{
node.background ([callback_a, size] () {
callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size);
node.background ([callback_l = std::move (callback_a), buffer_size = buffer_a.size ()] () {
callback_l (boost::system::errc::make_error_code (boost::system::errc::success), buffer_size);
});
}
}
Expand Down
27 changes: 12 additions & 15 deletions nano/node/transport/message_deserializer.cpp
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/message_deserializer.hpp>

nano::transport::message_deserializer::message_deserializer (nano::network_constants const & network_constants_a, nano::network_filter & publish_filter_a, nano::block_uniquer & block_uniquer_a, nano::vote_uniquer & vote_uniquer_a) :
nano::transport::message_deserializer::message_deserializer (nano::network_constants const & network_constants_a, nano::network_filter & publish_filter_a, nano::block_uniquer & block_uniquer_a, nano::vote_uniquer & vote_uniquer_a,
read_query read_op) :
read_buffer{ std::make_shared<std::vector<uint8_t>> () },
network_constants_m{ network_constants_a },
publish_filter_m{ publish_filter_a },
block_uniquer_m{ block_uniquer_a },
vote_uniquer_m{ vote_uniquer_a }
vote_uniquer_m{ vote_uniquer_a },
read_op{ std::move (read_op) }
{
debug_assert (this->read_op);
read_buffer->resize (MAX_MESSAGE_SIZE);
}

void nano::transport::message_deserializer::read (std::shared_ptr<nano::transport::socket> socket, const nano::transport::message_deserializer::callback_type && callback)
void nano::transport::message_deserializer::read (const nano::transport::message_deserializer::callback_type && callback)
{
debug_assert (callback);
debug_assert (read_op);

status = parse_status::none;

// Increase timeout to receive TCP header (idle server socket)
auto prev_timeout = socket->get_default_timeout_value ();
socket->set_default_timeout_value (network_constants_m.idle_timeout);

socket->async_read (read_buffer, HEADER_SIZE, [this_l = shared_from_this (), socket, callback = std::move (callback), prev_timeout] (boost::system::error_code const & ec, std::size_t size_a) {
read_op (read_buffer, HEADER_SIZE, [this_l = shared_from_this (), callback = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
if (ec)
{
callback (ec, nullptr);
Expand All @@ -32,15 +32,11 @@ void nano::transport::message_deserializer::read (std::shared_ptr<nano::transpor
callback (boost::asio::error::fault, nullptr);
return;
}

// Decrease timeout to default
socket->set_default_timeout_value (prev_timeout);

this_l->received_header (socket, std::move (callback));
this_l->received_header (std::move (callback));
});
}

void nano::transport::message_deserializer::received_header (std::shared_ptr<nano::transport::socket> socket, const nano::transport::message_deserializer::callback_type && callback)
void nano::transport::message_deserializer::received_header (const nano::transport::message_deserializer::callback_type && callback)
{
nano::bufferstream stream{ read_buffer->data (), HEADER_SIZE };
auto error = false;
Expand Down Expand Up @@ -86,7 +82,8 @@ void nano::transport::message_deserializer::received_header (std::shared_ptr<nan
}
else
{
socket->async_read (read_buffer, payload_size, [this_l = shared_from_this (), payload_size, header, callback = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
debug_assert (read_op);
read_op (read_buffer, payload_size, [this_l = shared_from_this (), payload_size, header, callback = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
if (ec)
{
callback (ec, nullptr);
Expand Down
13 changes: 6 additions & 7 deletions nano/node/transport/message_deserializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

#include <nano/node/common.hpp>
#include <nano/node/messages.hpp>
#include <nano/node/transport/socket.hpp>

#include <memory>
#include <vector>

namespace nano
{
class socket;

namespace transport
{
class message_deserializer : public std::enable_shared_from_this<nano::transport::message_deserializer>
Expand Down Expand Up @@ -45,19 +42,20 @@ namespace transport

parse_status status;

message_deserializer (network_constants const &, network_filter &, block_uniquer &, vote_uniquer &);
using read_query = std::function<void (std::shared_ptr<std::vector<uint8_t>> const &, size_t, std::function<void (boost::system::error_code const &, std::size_t)>)>;
message_deserializer (network_constants const &, network_filter &, block_uniquer &, vote_uniquer &, read_query read_op);

/*
* Asynchronously read next message from socket.
* Asynchronously read next message from the channel_read_fn.
* If an irrecoverable error is encountered callback will be called with an error code set and null message.
* If a 'soft' error is encountered (eg. duplicate block publish) error won't be set but message will be null. In that case, `status` field will be set to code indicating reason for failure.
* If message is received successfully, error code won't be set and message will be non-null. `status` field will be set to `success`.
* Should not be called until the previous invocation finishes and calls the callback.
*/
void read (std::shared_ptr<nano::transport::socket> socket, callback_type const && callback);
void read (callback_type const && callback);

private:
void received_header (std::shared_ptr<nano::transport::socket> socket, callback_type const && callback);
void received_header (callback_type const && callback);
void received_message (nano::message_header header, std::size_t payload_size, callback_type const && callback);

/*
Expand Down Expand Up @@ -90,6 +88,7 @@ namespace transport
nano::network_filter & publish_filter_m;
nano::block_uniquer & block_uniquer_m;
nano::vote_uniquer & vote_uniquer_m;
read_query read_op;

public:
static stat::detail to_stat_detail (parse_status);
Expand Down
11 changes: 11 additions & 0 deletions nano/node/transport/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,17 @@ void nano::transport::socket::checkup ()
});
}

void nano::transport::socket::read_impl (std::shared_ptr<std::vector<uint8_t>> const & data_a, size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a)
{
// Increase timeout to receive TCP header (idle server socket)
auto const prev_timeout = get_default_timeout_value ();
set_default_timeout_value (node.network_params.network.idle_timeout);
async_read (data_a, size_a, [callback_l = std::move (callback_a), prev_timeout, this_l = shared_from_this ()] (boost::system::error_code const & ec_a, std::size_t size_a) {
this_l->set_default_timeout_value (prev_timeout);
callback_l (ec_a, size_a);
});
}

bool nano::transport::socket::has_timed_out () const
{
return timed_out;
Expand Down
2 changes: 2 additions & 0 deletions nano/node/transport/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class server_socket;
class socket : public std::enable_shared_from_this<nano::transport::socket>
{
friend class server_socket;
friend class tcp_server;

public:
enum class type_t
Expand Down Expand Up @@ -172,6 +173,7 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>
void set_last_completion ();
void set_last_receive_time ();
void checkup ();
void read_impl (std::shared_ptr<std::vector<uint8_t>> const & data_a, size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a);

private:
type_t type_m{ type_t::undefined };
Expand Down
12 changes: 10 additions & 2 deletions nano/node/transport/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include <boost/format.hpp>

#include <memory>

nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a) :
node (node_a),
port (port_a)
Expand Down Expand Up @@ -125,7 +127,13 @@ nano::transport::tcp_server::tcp_server (std::shared_ptr<nano::transport::socket
socket{ std::move (socket_a) },
node{ std::move (node_a) },
allow_bootstrap{ allow_bootstrap_a },
message_deserializer{ std::make_shared<nano::transport::message_deserializer> (node->network_params.network, node->network.publish_filter, node->block_uniquer, node->vote_uniquer) }
message_deserializer{
std::make_shared<nano::transport::message_deserializer> (node->network_params.network, node->network.publish_filter, node->block_uniquer, node->vote_uniquer,
[socket_l = socket] (std::shared_ptr<std::vector<uint8_t>> const & data_a, size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
debug_assert (socket_l != nullptr);
socket_l->read_impl (data_a, size_a, callback_a);
})
}
{
debug_assert (socket != nullptr);
}
Expand Down Expand Up @@ -186,7 +194,7 @@ void nano::transport::tcp_server::receive_message ()
return;
}

message_deserializer->read (socket, [this_l = shared_from_this ()] (boost::system::error_code ec, std::unique_ptr<nano::message> message) {
message_deserializer->read ([this_l = shared_from_this ()] (boost::system::error_code ec, std::unique_ptr<nano::message> message) {
if (ec)
{
// IO error or critical error when deserializing message
Expand Down

0 comments on commit a60b60a

Please sign in to comment.