From a60b60aa1fcfc8ffdf13b40b2a67a0feb2a81a26 Mon Sep 17 00:00:00 2001 From: Thiago Silva <82097354+thsfs@users.noreply.github.com> Date: Wed, 1 Mar 2023 09:54:35 -0300 Subject: [PATCH] Decouple message deserializer from the TCP socket (#4160) * message_deserializer without socket (part 1) * message_deserializer without socket (part 2) * Add a guard clause for message_deserializer::channel_read_fn * Modify inproc channel to use message_deserializer * Renaming to read_query and read_op for name simplicity. * Moving socket related code to a member function of socket. * Avoiding possible memory traps for the shared pointers * Getting a socket copy instead of tcp_server's --------- Co-authored-by: clemahieu --- nano/node/transport/inproc.cpp | 42 +++++++++++++++----- nano/node/transport/message_deserializer.cpp | 27 ++++++------- nano/node/transport/message_deserializer.hpp | 13 +++--- nano/node/transport/socket.cpp | 11 +++++ nano/node/transport/socket.hpp | 2 + nano/node/transport/tcp_server.cpp | 12 +++++- 6 files changed, 72 insertions(+), 35 deletions(-) diff --git a/nano/node/transport/inproc.cpp b/nano/node/transport/inproc.cpp index 346f4c9f72..f3494267d5 100644 --- a/nano/node/transport/inproc.cpp +++ b/nano/node/transport/inproc.cpp @@ -1,5 +1,7 @@ +#include #include #include +#include #include @@ -53,23 +55,41 @@ class message_visitor_inbound : public nano::message_visitor */ void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a) { - // we create a temporary channel for the reply path, in case the receiver of the message wants to reply - auto remote_channel = std::make_shared (destination, node); + auto offset = 0u; + auto const buffer_read_fn = [&offset, buffer_v = buffer_a.to_bytes ()] (std::shared_ptr> const & data_a, size_t size_a, std::function callback_a) { + debug_assert (buffer_v.size () >= (offset + size_a)); + data_a->resize (size_a); + auto const copy_start = buffer_v.begin () + offset; + std::copy (copy_start, copy_start + size_a, data_a->data ()); + offset += size_a; + callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size_a); + }; - // create an inbound message visitor class to handle incoming messages because that's what the message parser expects - message_visitor_inbound visitor{ destination.network.inbound, remote_channel }; + auto const message_deserializer = std::make_shared (node.network_params.network, node.network.publish_filter, node.block_uniquer, node.vote_uniquer, buffer_read_fn); + message_deserializer->read ( + [this] (boost::system::error_code ec_a, std::unique_ptr message_a) { + if (ec_a || !message_a) + { + return; + } - nano::message_parser parser{ destination.network.publish_filter, destination.block_uniquer, destination.vote_uniquer, visitor, destination.work, destination.network_params.network }; + // we create a temporary channel for the reply path, in case the receiver of the message wants to reply + auto remote_channel = std::make_shared (destination, node); - // parse the message and action any work that needs to be done on that object via the visitor object - auto bytes = buffer_a.to_bytes (); - auto size = bytes.size (); - parser.deserialize_buffer (bytes.data (), size); + // process message + { + node.stats.inc (nano::stat::type::message, nano::to_stat_detail (message_a->header.type), nano::stat::dir::in); + + // create an inbound message visitor class to handle incoming messages + message_visitor_inbound visitor{ destination.network.inbound, remote_channel }; + message_a->visit (visitor); + } + }); if (callback_a) { - node.background ([callback_a, size] () { - callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size); + node.background ([callback_l = std::move (callback_a), buffer_size = buffer_a.size ()] () { + callback_l (boost::system::errc::make_error_code (boost::system::errc::success), buffer_size); }); } } diff --git a/nano/node/transport/message_deserializer.cpp b/nano/node/transport/message_deserializer.cpp index e465e89ad0..661a70c511 100644 --- a/nano/node/transport/message_deserializer.cpp +++ b/nano/node/transport/message_deserializer.cpp @@ -1,27 +1,27 @@ #include #include -nano::transport::message_deserializer::message_deserializer (nano::network_constants const & network_constants_a, nano::network_filter & publish_filter_a, nano::block_uniquer & block_uniquer_a, nano::vote_uniquer & vote_uniquer_a) : +nano::transport::message_deserializer::message_deserializer (nano::network_constants const & network_constants_a, nano::network_filter & publish_filter_a, nano::block_uniquer & block_uniquer_a, nano::vote_uniquer & vote_uniquer_a, +read_query read_op) : read_buffer{ std::make_shared> () }, network_constants_m{ network_constants_a }, publish_filter_m{ publish_filter_a }, block_uniquer_m{ block_uniquer_a }, - vote_uniquer_m{ vote_uniquer_a } + vote_uniquer_m{ vote_uniquer_a }, + read_op{ std::move (read_op) } { + debug_assert (this->read_op); read_buffer->resize (MAX_MESSAGE_SIZE); } -void nano::transport::message_deserializer::read (std::shared_ptr socket, const nano::transport::message_deserializer::callback_type && callback) +void nano::transport::message_deserializer::read (const nano::transport::message_deserializer::callback_type && callback) { debug_assert (callback); + debug_assert (read_op); status = parse_status::none; - // Increase timeout to receive TCP header (idle server socket) - auto prev_timeout = socket->get_default_timeout_value (); - socket->set_default_timeout_value (network_constants_m.idle_timeout); - - socket->async_read (read_buffer, HEADER_SIZE, [this_l = shared_from_this (), socket, callback = std::move (callback), prev_timeout] (boost::system::error_code const & ec, std::size_t size_a) { + read_op (read_buffer, HEADER_SIZE, [this_l = shared_from_this (), callback = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) { if (ec) { callback (ec, nullptr); @@ -32,15 +32,11 @@ void nano::transport::message_deserializer::read (std::shared_ptrset_default_timeout_value (prev_timeout); - - this_l->received_header (socket, std::move (callback)); + this_l->received_header (std::move (callback)); }); } -void nano::transport::message_deserializer::received_header (std::shared_ptr socket, const nano::transport::message_deserializer::callback_type && callback) +void nano::transport::message_deserializer::received_header (const nano::transport::message_deserializer::callback_type && callback) { nano::bufferstream stream{ read_buffer->data (), HEADER_SIZE }; auto error = false; @@ -86,7 +82,8 @@ void nano::transport::message_deserializer::received_header (std::shared_ptrasync_read (read_buffer, payload_size, [this_l = shared_from_this (), payload_size, header, callback = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) { + debug_assert (read_op); + read_op (read_buffer, payload_size, [this_l = shared_from_this (), payload_size, header, callback = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) { if (ec) { callback (ec, nullptr); diff --git a/nano/node/transport/message_deserializer.hpp b/nano/node/transport/message_deserializer.hpp index 331024b5f4..0a85454e51 100644 --- a/nano/node/transport/message_deserializer.hpp +++ b/nano/node/transport/message_deserializer.hpp @@ -2,15 +2,12 @@ #include #include -#include #include #include namespace nano { -class socket; - namespace transport { class message_deserializer : public std::enable_shared_from_this @@ -45,19 +42,20 @@ namespace transport parse_status status; - message_deserializer (network_constants const &, network_filter &, block_uniquer &, vote_uniquer &); + using read_query = std::function> const &, size_t, std::function)>; + message_deserializer (network_constants const &, network_filter &, block_uniquer &, vote_uniquer &, read_query read_op); /* - * Asynchronously read next message from socket. + * Asynchronously read next message from the channel_read_fn. * If an irrecoverable error is encountered callback will be called with an error code set and null message. * If a 'soft' error is encountered (eg. duplicate block publish) error won't be set but message will be null. In that case, `status` field will be set to code indicating reason for failure. * If message is received successfully, error code won't be set and message will be non-null. `status` field will be set to `success`. * Should not be called until the previous invocation finishes and calls the callback. */ - void read (std::shared_ptr socket, callback_type const && callback); + void read (callback_type const && callback); private: - void received_header (std::shared_ptr socket, callback_type const && callback); + void received_header (callback_type const && callback); void received_message (nano::message_header header, std::size_t payload_size, callback_type const && callback); /* @@ -90,6 +88,7 @@ namespace transport nano::network_filter & publish_filter_m; nano::block_uniquer & block_uniquer_m; nano::vote_uniquer & vote_uniquer_m; + read_query read_op; public: static stat::detail to_stat_detail (parse_status); diff --git a/nano/node/transport/socket.cpp b/nano/node/transport/socket.cpp index aee4ee353d..97dd1f2283 100644 --- a/nano/node/transport/socket.cpp +++ b/nano/node/transport/socket.cpp @@ -243,6 +243,17 @@ void nano::transport::socket::checkup () }); } +void nano::transport::socket::read_impl (std::shared_ptr> const & data_a, size_t size_a, std::function callback_a) +{ + // Increase timeout to receive TCP header (idle server socket) + auto const prev_timeout = get_default_timeout_value (); + set_default_timeout_value (node.network_params.network.idle_timeout); + async_read (data_a, size_a, [callback_l = std::move (callback_a), prev_timeout, this_l = shared_from_this ()] (boost::system::error_code const & ec_a, std::size_t size_a) { + this_l->set_default_timeout_value (prev_timeout); + callback_l (ec_a, size_a); + }); +} + bool nano::transport::socket::has_timed_out () const { return timed_out; diff --git a/nano/node/transport/socket.hpp b/nano/node/transport/socket.hpp index f92f13313f..bbdcb4fe93 100644 --- a/nano/node/transport/socket.hpp +++ b/nano/node/transport/socket.hpp @@ -39,6 +39,7 @@ class server_socket; class socket : public std::enable_shared_from_this { friend class server_socket; + friend class tcp_server; public: enum class type_t @@ -172,6 +173,7 @@ class socket : public std::enable_shared_from_this void set_last_completion (); void set_last_receive_time (); void checkup (); + void read_impl (std::shared_ptr> const & data_a, size_t size_a, std::function callback_a); private: type_t type_m{ type_t::undefined }; diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index fe1c069006..974f39c71c 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -8,6 +8,8 @@ #include +#include + nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a) : node (node_a), port (port_a) @@ -125,7 +127,13 @@ nano::transport::tcp_server::tcp_server (std::shared_ptr (node->network_params.network, node->network.publish_filter, node->block_uniquer, node->vote_uniquer) } + message_deserializer{ + std::make_shared (node->network_params.network, node->network.publish_filter, node->block_uniquer, node->vote_uniquer, + [socket_l = socket] (std::shared_ptr> const & data_a, size_t size_a, std::function callback_a) { + debug_assert (socket_l != nullptr); + socket_l->read_impl (data_a, size_a, callback_a); + }) + } { debug_assert (socket != nullptr); } @@ -186,7 +194,7 @@ void nano::transport::tcp_server::receive_message () return; } - message_deserializer->read (socket, [this_l = shared_from_this ()] (boost::system::error_code ec, std::unique_ptr message) { + message_deserializer->read ([this_l = shared_from_this ()] (boost::system::error_code ec, std::unique_ptr message) { if (ec) { // IO error or critical error when deserializing message