From db9fad225161325507b3f53651fd43dbdb00086e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 18 Mar 2024 20:16:14 +0100 Subject: [PATCH 1/3] Rework tcp_server handshake visitor --- nano/lib/stats_enums.hpp | 9 ++ nano/node/transport/tcp_server.cpp | 160 +++++++++++++++++++---------- nano/node/transport/tcp_server.hpp | 13 ++- 3 files changed, 124 insertions(+), 58 deletions(-) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index de7b6d020c..b1baaa9631 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -235,6 +235,15 @@ enum class detail : uint8_t tcp_read_error, tcp_write_error, + // tcp_server + handshake, + handshake_abort, + handshake_error, + handshake_network_error, + handshake_initiate, + handshake_response, + handshake_response_invalid, + // ipc invocations, diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index bea5f4f9ba..22608310ff 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -301,7 +301,7 @@ nano::transport::tcp_server::~tcp_server () return; } - node->logger.debug (nano::log::type::tcp_server, "Exiting TCP server ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Exiting TCP server ({})", fmt::streamed (remote_endpoint)); if (socket->type () == nano::transport::socket::type_t::bootstrap) { @@ -341,7 +341,7 @@ void nano::transport::tcp_server::start () return; } - node->logger.debug (nano::log::type::tcp_server, "Starting TCP server ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Starting TCP server ({})", fmt::streamed (remote_endpoint)); receive_message (); } @@ -374,7 +374,7 @@ void nano::transport::tcp_server::receive_message () node->logger.debug (nano::log::type::tcp_server, "Error reading message: {}, status: {} ({})", ec.message (), to_string (this_l->message_deserializer->status), - nano::util::to_str (this_l->remote_endpoint)); + fmt::streamed (this_l->remote_endpoint)); this_l->stop (); } @@ -412,7 +412,7 @@ void nano::transport::tcp_server::received_message (std::unique_ptrlogger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})", to_string (message_deserializer->status), - nano::util::to_str (remote_endpoint)); + fmt::streamed (remote_endpoint)); } } @@ -420,6 +420,10 @@ void nano::transport::tcp_server::received_message (std::unique_ptr message) @@ -429,6 +433,7 @@ bool nano::transport::tcp_server::process_message (std::unique_ptrstats.inc (nano::stat::type::tcp_server, to_stat_detail (message->header.type), nano::stat::dir::in); debug_assert (is_undefined_connection () || is_realtime_connection () || is_bootstrap_connection ()); @@ -448,27 +453,39 @@ bool nano::transport::tcp_server::process_message (std::unique_ptrvisit (handshake_visitor); - if (handshake_visitor.process) - { - queue_realtime (std::move (message)); - return true; - } - else if (handshake_visitor.bootstrap) + + switch (handshake_visitor.result) { - if (!to_bootstrap_connection ()) + case handshake_message_visitor::status::abort: { - stop (); - return false; + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_abort); + node->logger.debug (nano::log::type::tcp_server, "Aborting handshake: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint)); + + return false; // Stop receiving new messages } - } - else - { - // Neither handshake nor bootstrap received when in handshake mode - node->logger.debug (nano::log::type::tcp_server, "Neither handshake nor bootstrap received when in handshake mode: {} ({})", - nano::to_string (message->header.type), - nano::util::to_str (remote_endpoint)); + case handshake_message_visitor::status::progress: + { + return true; // Continue handshake + } + case handshake_message_visitor::status::realtime: + { + queue_realtime (std::move (message)); + return true; // Continue receiving new messages + } + case handshake_message_visitor::status::bootstrap: + { + if (!to_bootstrap_connection ()) + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Error switching to bootstrap mode: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint)); - return true; + return false; // Switch failed, stop receiving new messages + } + else + { + // Fall through to process the bootstrap message + } + } } } else if (is_realtime_connection ()) @@ -481,7 +498,7 @@ bool nano::transport::tcp_server::process_message (std::unique_ptrflags.disable_tcp_realtime) { - node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", nano::util::to_str (server->remote_endpoint)); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", fmt::streamed (server->remote_endpoint)); - // Stop invalid handshake - server->stop (); - return; + result = status::abort; + return; // Abort } + if (!message.query && !message.response) + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", fmt::streamed (server->remote_endpoint)); - if (message.query && server->handshake_query_received) + result = status::abort; + return; // Abort + } + if (message.query && server->handshake_received) // Second handshake message should be a response only { - node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", nano::util::to_str (server->remote_endpoint)); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", fmt::streamed (server->remote_endpoint)); - // Stop invalid handshake - server->stop (); - return; + result = status::abort; + return; // Abort } - server->handshake_query_received = true; + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in); - node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", nano::util::to_str (server->remote_endpoint)); + server->handshake_received = true; + + node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", fmt::streamed (server->remote_endpoint)); if (message.query) { + // Sends response + our own query server->send_handshake_response (*message.query, message.is_v2 ()); + result = status::progress; // Continue handshake } if (message.response) { if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (server->remote_endpoint))) { - server->to_realtime_connection (message.response->node_id); + bool success = server->to_realtime_connection (message.response->node_id); + if (success) + { + result = status::realtime; // Switched to realtime + } + else + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", fmt::streamed (server->remote_endpoint)); + + result = status::abort; + return; + } } else { - // Stop invalid handshake - server->stop (); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response_invalid); + node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", fmt::streamed (server->remote_endpoint)); + + result = status::abort; return; } } - - process = true; } void nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) @@ -568,11 +609,13 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha { return; } + auto response = node->network.prepare_handshake_response (query, v2); auto own_query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint)); nano::node_id_handshake handshake_response{ node->network_params.network, own_query, response }; - // TODO: Use channel + node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", nano::util::to_str (remote_endpoint)); + auto shared_const_buffer = handshake_response.to_shared_const_buffer (); socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) { auto node = this_l->node.lock (); @@ -582,36 +625,38 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha } if (ec) { - node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), nano::util::to_str (this_l->remote_endpoint)); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error); + node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), fmt::streamed (this_l->remote_endpoint)); // Stop invalid handshake this_l->stop (); } else { - node->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response, nano::stat::dir::out); } }); } void nano::transport::tcp_server::handshake_message_visitor::bulk_pull (const nano::bulk_pull & message) { - bootstrap = true; + result = status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::bulk_pull_account (const nano::bulk_pull_account & message) { - bootstrap = true; + result = status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::bulk_push (const nano::bulk_push & message) { - bootstrap = true; + result = status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::frontier_req (const nano::frontier_req & message) { - bootstrap = true; + result = status::bootstrap; } /* @@ -780,7 +825,7 @@ void nano::transport::tcp_server::timeout () } if (socket->has_timed_out ()) { - node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", fmt::streamed (remote_endpoint)); { nano::lock_guard lock{ node->tcp_listener->mutex }; @@ -834,7 +879,7 @@ bool nano::transport::tcp_server::to_bootstrap_connection () ++node->tcp_listener->bootstrap_count; socket->type_set (nano::transport::socket::type_t::bootstrap); - node->logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", fmt::streamed (remote_endpoint)); return true; } @@ -846,17 +891,22 @@ bool nano::transport::tcp_server::to_realtime_connection (nano::account const & { return false; } - if (socket->type () == nano::transport::socket::type_t::undefined && !node->flags.disable_tcp_realtime) + if (node->flags.disable_tcp_realtime) + { + return false; + } + if (socket->type () != nano::transport::socket::type_t::undefined) { - remote_node_id = node_id; - ++node->tcp_listener->realtime_count; - socket->type_set (nano::transport::socket::type_t::realtime); + return false; + } - node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", nano::util::to_str (remote_endpoint)); + remote_node_id = node_id; + ++node->tcp_listener->realtime_count; + socket->type_set (nano::transport::socket::type_t::realtime); - return true; - } - return false; + node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", fmt::streamed (remote_endpoint)); + + return true; } bool nano::transport::tcp_server::is_undefined_connection () const diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 0760241164..94cac72434 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -69,7 +69,7 @@ class tcp_server final : public std::enable_shared_from_this std::weak_ptr const node; nano::mutex mutex; std::atomic stopped{ false }; - std::atomic handshake_query_received{ false }; + std::atomic handshake_received{ false }; // Remote enpoint used to remove response channel even after socket closing nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 }; nano::account remote_node_id{}; @@ -99,8 +99,15 @@ class tcp_server final : public std::enable_shared_from_this class handshake_message_visitor : public nano::message_visitor { public: - bool process{ false }; - bool bootstrap{ false }; + enum class status + { + abort, + progress, + realtime, + bootstrap, + }; + + status result{ status::abort }; explicit handshake_message_visitor (std::shared_ptr); From 3603b5fe87d01fdb2a0312d6ba4c7c867e199cfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 18 Mar 2024 22:44:06 +0100 Subject: [PATCH 2/3] Simplify message visitor --- nano/core_test/network.cpp | 2 +- nano/node/transport/tcp_server.cpp | 158 ++++++++++++++++------------- nano/node/transport/tcp_server.hpp | 43 +++++--- 3 files changed, 114 insertions(+), 89 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index c7152c6641..fe8bcfb6fb 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -722,7 +722,7 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake) ASSERT_FALSE (ec); }); }); - ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake) != 0); + ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake) != 0); { nano::lock_guard guard (node0->tcp_listener->mutex); ASSERT_EQ (node0->tcp_listener->connections.size (), 1); diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index 22608310ff..e6ca627824 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -392,10 +392,11 @@ void nano::transport::tcp_server::received_message (std::unique_ptrstatus != transport::parse_status::success); node->stats.inc (nano::stat::type::error, to_stat_detail (message_deserializer->status)); + + // Avoid too much noise about `duplicate_publish_message` errors if (message_deserializer->status == transport::parse_status::duplicate_publish_message) { node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message); } else { - // Avoid too much noise about `duplicate_publish_message` errors node->logger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})", to_string (message_deserializer->status), fmt::streamed (remote_endpoint)); } } - if (should_continue) - { - receive_message (); - } - else + switch (result) { - stop (); + case process_result::progress: + { + receive_message (); + } + break; + case process_result::abort: + { + stop (); + } + break; + case process_result::pause: + { + // Do nothing + } + break; } } -bool nano::transport::tcp_server::process_message (std::unique_ptr message) +auto nano::transport::tcp_server::process_message (std::unique_ptr message) -> process_result { auto node = this->node.lock (); if (!node) { - return false; + return process_result::abort; } - node->stats.inc (nano::stat::type::tcp_server, to_stat_detail (message->header.type), nano::stat::dir::in); + node->stats.inc (nano::stat::type::tcp_server, to_stat_detail (message->type ()), nano::stat::dir::in); debug_assert (is_undefined_connection () || is_realtime_connection () || is_bootstrap_connection ()); @@ -451,35 +463,36 @@ bool nano::transport::tcp_server::process_message (std::unique_ptrvisit (handshake_visitor); switch (handshake_visitor.result) { - case handshake_message_visitor::status::abort: + case handshake_status::abort: { node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_abort); node->logger.debug (nano::log::type::tcp_server, "Aborting handshake: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint)); - return false; // Stop receiving new messages + return process_result::abort; } - case handshake_message_visitor::status::progress: + case handshake_status::progress: { - return true; // Continue handshake + return process_result::progress; // Continue handshake } - case handshake_message_visitor::status::realtime: + case handshake_status::realtime: { queue_realtime (std::move (message)); - return true; // Continue receiving new messages + return process_result::progress; // Continue receiving new messages } - case handshake_message_visitor::status::bootstrap: + case handshake_status::bootstrap: { - if (!to_bootstrap_connection ()) + bool success = to_bootstrap_connection (); + if (!success) { node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); node->logger.debug (nano::log::type::tcp_server, "Error switching to bootstrap mode: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint)); - return false; // Switch failed, stop receiving new messages + return process_result::abort; // Switch failed, abort } else { @@ -492,21 +505,26 @@ bool nano::transport::tcp_server::process_message (std::unique_ptrvisit (realtime_visitor); + if (realtime_visitor.process) { queue_realtime (std::move (message)); } - return true; + + return process_result::progress; } // The server will switch to bootstrap mode immediately after processing the first bootstrap message, thus no `else if` if (is_bootstrap_connection ()) { bootstrap_message_visitor bootstrap_visitor{ shared_from_this () }; message->visit (bootstrap_visitor); - return !bootstrap_visitor.processed; // Stop receiving new messages if bootstrap serving started + + // Pause receiving new messages if bootstrap serving started + return bootstrap_visitor.processed ? process_result::pause : process_result::progress; } + debug_assert (false); - return true; // Continue receiving new messages + return process_result::abort; } void nano::transport::tcp_server::queue_realtime (std::unique_ptr message) @@ -519,87 +537,75 @@ void nano::transport::tcp_server::queue_realtime (std::unique_ptr node->network.tcp_channels.message_manager.put_message (nano::tcp_message_item{ std::move (message), remote_endpoint, remote_node_id, socket }); } -/* - * Handshake - */ - -nano::transport::tcp_server::handshake_message_visitor::handshake_message_visitor (std::shared_ptr server) : - server{ std::move (server) } +auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake const & message) -> handshake_status { -} - -void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (nano::node_id_handshake const & message) -{ - auto node = server->node.lock (); + auto node = this->node.lock (); if (!node) { - return; + return handshake_status::abort; } if (node->flags.disable_tcp_realtime) { node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); - node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", fmt::streamed (server->remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", fmt::streamed (remote_endpoint)); - result = status::abort; - return; // Abort + return handshake_status::abort; } if (!message.query && !message.response) { node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); - node->logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", fmt::streamed (server->remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", fmt::streamed (remote_endpoint)); - result = status::abort; - return; // Abort + return handshake_status::abort; } - if (message.query && server->handshake_received) // Second handshake message should be a response only + if (message.query && handshake_received) // Second handshake message should be a response only { node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); - node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", fmt::streamed (server->remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", fmt::streamed (remote_endpoint)); - result = status::abort; - return; // Abort + return handshake_status::abort; } node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in); - server->handshake_received = true; + handshake_received = true; - node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", fmt::streamed (server->remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", fmt::streamed (remote_endpoint)); if (message.query) { // Sends response + our own query - server->send_handshake_response (*message.query, message.is_v2 ()); - result = status::progress; // Continue handshake + send_handshake_response (*message.query, message.is_v2 ()); + // Fall through and continue handshake } if (message.response) { - if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (server->remote_endpoint))) + if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (remote_endpoint))) { - bool success = server->to_realtime_connection (message.response->node_id); + bool success = to_realtime_connection (message.response->node_id); if (success) { - result = status::realtime; // Switched to realtime + return handshake_status::realtime; // Switched to realtime } else { node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); - node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", fmt::streamed (server->remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", fmt::streamed (remote_endpoint)); - result = status::abort; - return; + return handshake_status::abort; } } else { node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response_invalid); - node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", fmt::streamed (server->remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", fmt::streamed (remote_endpoint)); - result = status::abort; - return; + return handshake_status::abort; } } + + return handshake_status::progress; } void nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) @@ -614,7 +620,7 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha auto own_query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint)); nano::node_id_handshake handshake_response{ node->network_params.network, own_query, response }; - node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", fmt::streamed (remote_endpoint)); auto shared_const_buffer = handshake_response.to_shared_const_buffer (); socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) { @@ -639,35 +645,39 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha }); } +/* + * handshake_message_visitor + */ + +void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (const nano::node_id_handshake & message) +{ + result = server.process_handshake (message); +} + void nano::transport::tcp_server::handshake_message_visitor::bulk_pull (const nano::bulk_pull & message) { - result = status::bootstrap; + result = handshake_status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::bulk_pull_account (const nano::bulk_pull_account & message) { - result = status::bootstrap; + result = handshake_status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::bulk_push (const nano::bulk_push & message) { - result = status::bootstrap; + result = handshake_status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::frontier_req (const nano::frontier_req & message) { - result = status::bootstrap; + result = handshake_status::bootstrap; } /* - * Realtime + * realtime_message_visitor */ -nano::transport::tcp_server::realtime_message_visitor::realtime_message_visitor (nano::transport::tcp_server & server_a) : - server{ server_a } -{ -} - void nano::transport::tcp_server::realtime_message_visitor::keepalive (const nano::keepalive & message) { process = true; @@ -729,7 +739,7 @@ void nano::transport::tcp_server::realtime_message_visitor::asc_pull_ack (const } /* - * Bootstrap + * bootstrap_message_visitor */ nano::transport::tcp_server::bootstrap_message_visitor::bootstrap_message_visitor (std::shared_ptr server) : @@ -814,6 +824,10 @@ void nano::transport::tcp_server::bootstrap_message_visitor::frontier_req (const processed = true; } +/* + * + */ + // TODO: We could periodically call this (from a dedicated timeout thread for eg.) but socket already handles timeouts, // and since we only ever store tcp_server as weak_ptr, socket timeout will automatically trigger tcp_server cleanup void nano::transport::tcp_server::timeout () diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 94cac72434..cde01c25f4 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -76,12 +76,16 @@ class tcp_server final : public std::enable_shared_from_this std::chrono::steady_clock::time_point last_telemetry_req{}; private: - void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2); + enum class process_result + { + abort, + progress, + pause, + }; void receive_message (); void received_message (std::unique_ptr message); - bool process_message (std::unique_ptr message); - + process_result process_message (std::unique_ptr message); void queue_realtime (std::unique_ptr message); bool to_bootstrap_connection (); @@ -90,26 +94,30 @@ class tcp_server final : public std::enable_shared_from_this bool is_bootstrap_connection () const; bool is_realtime_connection () const; + enum class handshake_status + { + abort, + progress, + realtime, + bootstrap, + }; + + handshake_status process_handshake (nano::node_id_handshake const & message); + void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2); + private: bool const allow_bootstrap; std::shared_ptr message_deserializer; std::optional last_keepalive; -private: +private: // Visitors class handshake_message_visitor : public nano::message_visitor { public: - enum class status - { - abort, - progress, - realtime, - bootstrap, - }; - - status result{ status::abort }; + handshake_status result{ handshake_status::abort }; - explicit handshake_message_visitor (std::shared_ptr); + explicit handshake_message_visitor (tcp_server & server) : + server{ server } {}; void node_id_handshake (nano::node_id_handshake const &) override; void bulk_pull (nano::bulk_pull const &) override; @@ -118,7 +126,7 @@ class tcp_server final : public std::enable_shared_from_this void frontier_req (nano::frontier_req const &) override; private: - std::shared_ptr server; + tcp_server & server; }; class realtime_message_visitor : public nano::message_visitor @@ -126,7 +134,8 @@ class tcp_server final : public std::enable_shared_from_this public: bool process{ false }; - explicit realtime_message_visitor (tcp_server &); + explicit realtime_message_visitor (tcp_server & server) : + server{ server } {}; void keepalive (nano::keepalive const &) override; void publish (nano::publish const &) override; @@ -157,5 +166,7 @@ class tcp_server final : public std::enable_shared_from_this private: std::shared_ptr server; }; + + friend class handshake_message_visitor; }; } From dd3da51a9b2164865f06c5a37aef518b936aeebb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 19 Mar 2024 19:59:49 +0100 Subject: [PATCH 3/3] Fixes --- nano/node/transport/tcp_server.cpp | 9 ++++----- nano/node/transport/tcp_server.hpp | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index e6ca627824..a10d85dd54 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -475,7 +475,7 @@ auto nano::transport::tcp_server::process_message (std::unique_ptrstats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in); - handshake_received = true; - node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", fmt::streamed (remote_endpoint)); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in); + node->logger.debug (nano::log::type::tcp_server, "Handshake message received ({})", fmt::streamed (remote_endpoint)); if (message.query) { @@ -605,7 +604,7 @@ auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake con } } - return handshake_status::progress; + return handshake_status::handshake; // Handshake is in progress } void nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index cde01c25f4..e8a220bc14 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -97,7 +97,7 @@ class tcp_server final : public std::enable_shared_from_this enum class handshake_status { abort, - progress, + handshake, realtime, bootstrap, };