diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index c7152c6641..fe8bcfb6fb 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -722,7 +722,7 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake) ASSERT_FALSE (ec); }); }); - ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake) != 0); + ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake) != 0); { nano::lock_guard guard (node0->tcp_listener->mutex); ASSERT_EQ (node0->tcp_listener->connections.size (), 1); diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index de7b6d020c..b1baaa9631 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -235,6 +235,15 @@ enum class detail : uint8_t tcp_read_error, tcp_write_error, + // tcp_server + handshake, + handshake_abort, + handshake_error, + handshake_network_error, + handshake_initiate, + handshake_response, + handshake_response_invalid, + // ipc invocations, diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index bea5f4f9ba..a10d85dd54 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -301,7 +301,7 @@ nano::transport::tcp_server::~tcp_server () return; } - node->logger.debug (nano::log::type::tcp_server, "Exiting TCP server ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Exiting TCP server ({})", fmt::streamed (remote_endpoint)); if (socket->type () == nano::transport::socket::type_t::bootstrap) { @@ -341,7 +341,7 @@ void nano::transport::tcp_server::start () return; } - node->logger.debug (nano::log::type::tcp_server, "Starting TCP server ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Starting TCP server ({})", fmt::streamed (remote_endpoint)); receive_message (); } @@ -374,7 +374,7 @@ void nano::transport::tcp_server::receive_message () node->logger.debug (nano::log::type::tcp_server, "Error reading message: {}, status: {} ({})", ec.message (), to_string (this_l->message_deserializer->status), - nano::util::to_str (this_l->remote_endpoint)); + fmt::streamed (this_l->remote_endpoint)); this_l->stop (); } @@ -392,10 +392,11 @@ void nano::transport::tcp_server::received_message (std::unique_ptrstatus != transport::parse_status::success); node->stats.inc (nano::stat::type::error, to_stat_detail (message_deserializer->status)); + + // Avoid too much noise about `duplicate_publish_message` errors if (message_deserializer->status == transport::parse_status::duplicate_publish_message) { node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message); } else { - // Avoid too much noise about `duplicate_publish_message` errors node->logger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})", to_string (message_deserializer->status), - nano::util::to_str (remote_endpoint)); + fmt::streamed (remote_endpoint)); } } - if (should_continue) + switch (result) { - receive_message (); + case process_result::progress: + { + receive_message (); + } + break; + case process_result::abort: + { + stop (); + } + break; + case process_result::pause: + { + // Do nothing + } + break; } } -bool nano::transport::tcp_server::process_message (std::unique_ptr message) +auto nano::transport::tcp_server::process_message (std::unique_ptr message) -> process_result { auto node = this->node.lock (); if (!node) { - return false; + return process_result::abort; } - node->stats.inc (nano::stat::type::tcp_server, to_stat_detail (message->header.type), nano::stat::dir::in); + + node->stats.inc (nano::stat::type::tcp_server, to_stat_detail (message->type ()), nano::stat::dir::in); debug_assert (is_undefined_connection () || is_realtime_connection () || is_bootstrap_connection ()); @@ -446,50 +463,68 @@ bool nano::transport::tcp_server::process_message (std::unique_ptrvisit (handshake_visitor); - if (handshake_visitor.process) - { - queue_realtime (std::move (message)); - return true; - } - else if (handshake_visitor.bootstrap) + + switch (handshake_visitor.result) { - if (!to_bootstrap_connection ()) + case handshake_status::abort: + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_abort); + node->logger.debug (nano::log::type::tcp_server, "Aborting handshake: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint)); + + return process_result::abort; + } + case handshake_status::handshake: { - stop (); - return false; + return process_result::progress; // Continue handshake } - } - else - { - // Neither handshake nor bootstrap received when in handshake mode - node->logger.debug (nano::log::type::tcp_server, "Neither handshake nor bootstrap received when in handshake mode: {} ({})", - nano::to_string (message->header.type), - nano::util::to_str (remote_endpoint)); + case handshake_status::realtime: + { + queue_realtime (std::move (message)); + return process_result::progress; // Continue receiving new messages + } + case handshake_status::bootstrap: + { + bool success = to_bootstrap_connection (); + if (!success) + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Error switching to bootstrap mode: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint)); - return true; + return process_result::abort; // Switch failed, abort + } + else + { + // Fall through to process the bootstrap message + } + } } } else if (is_realtime_connection ()) { realtime_message_visitor realtime_visitor{ *this }; message->visit (realtime_visitor); + if (realtime_visitor.process) { queue_realtime (std::move (message)); } - return true; + + return process_result::progress; } - // the server will switch to bootstrap mode immediately after processing the first bootstrap message, thus no `else if` + // The server will switch to bootstrap mode immediately after processing the first bootstrap message, thus no `else if` if (is_bootstrap_connection ()) { bootstrap_message_visitor bootstrap_visitor{ shared_from_this () }; message->visit (bootstrap_visitor); - return !bootstrap_visitor.processed; // Stop receiving new messages if bootstrap serving started + + // Pause receiving new messages if bootstrap serving started + return bootstrap_visitor.processed ? process_result::pause : process_result::progress; } + debug_assert (false); - return true; // Continue receiving new messages + return process_result::abort; } void nano::transport::tcp_server::queue_realtime (std::unique_ptr message) @@ -502,63 +537,74 @@ void nano::transport::tcp_server::queue_realtime (std::unique_ptr node->network.tcp_channels.message_manager.put_message (nano::tcp_message_item{ std::move (message), remote_endpoint, remote_node_id, socket }); } -/* - * Handshake - */ - -nano::transport::tcp_server::handshake_message_visitor::handshake_message_visitor (std::shared_ptr server) : - server{ std::move (server) } -{ -} - -void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (nano::node_id_handshake const & message) +auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake const & message) -> handshake_status { - auto node = server->node.lock (); + auto node = this->node.lock (); if (!node) { - return; + return handshake_status::abort; } + if (node->flags.disable_tcp_realtime) { - node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", nano::util::to_str (server->remote_endpoint)); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", fmt::streamed (remote_endpoint)); - // Stop invalid handshake - server->stop (); - return; + return handshake_status::abort; } + if (!message.query && !message.response) + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", fmt::streamed (remote_endpoint)); - if (message.query && server->handshake_query_received) + return handshake_status::abort; + } + if (message.query && handshake_received) // Second handshake message should be a response only { - node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", nano::util::to_str (server->remote_endpoint)); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", fmt::streamed (remote_endpoint)); - // Stop invalid handshake - server->stop (); - return; + return handshake_status::abort; } - server->handshake_query_received = true; + handshake_received = true; - node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", nano::util::to_str (server->remote_endpoint)); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in); + node->logger.debug (nano::log::type::tcp_server, "Handshake message received ({})", fmt::streamed (remote_endpoint)); if (message.query) { - server->send_handshake_response (*message.query, message.is_v2 ()); + // Sends response + our own query + send_handshake_response (*message.query, message.is_v2 ()); + // Fall through and continue handshake } if (message.response) { - if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (server->remote_endpoint))) + if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (remote_endpoint))) { - server->to_realtime_connection (message.response->node_id); + bool success = to_realtime_connection (message.response->node_id); + if (success) + { + return handshake_status::realtime; // Switched to realtime + } + else + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", fmt::streamed (remote_endpoint)); + + return handshake_status::abort; + } } else { - // Stop invalid handshake - server->stop (); - return; + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response_invalid); + node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", fmt::streamed (remote_endpoint)); + + return handshake_status::abort; } } - process = true; + return handshake_status::handshake; // Handshake is in progress } void nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) @@ -568,11 +614,13 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha { return; } + auto response = node->network.prepare_handshake_response (query, v2); auto own_query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint)); nano::node_id_handshake handshake_response{ node->network_params.network, own_query, response }; - // TODO: Use channel + node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", fmt::streamed (remote_endpoint)); + auto shared_const_buffer = handshake_response.to_shared_const_buffer (); socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) { auto node = this_l->node.lock (); @@ -582,47 +630,53 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha } if (ec) { - node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), nano::util::to_str (this_l->remote_endpoint)); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error); + node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), fmt::streamed (this_l->remote_endpoint)); // Stop invalid handshake this_l->stop (); } else { - node->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response, nano::stat::dir::out); } }); } +/* + * handshake_message_visitor + */ + +void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (const nano::node_id_handshake & message) +{ + result = server.process_handshake (message); +} + void nano::transport::tcp_server::handshake_message_visitor::bulk_pull (const nano::bulk_pull & message) { - bootstrap = true; + result = handshake_status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::bulk_pull_account (const nano::bulk_pull_account & message) { - bootstrap = true; + result = handshake_status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::bulk_push (const nano::bulk_push & message) { - bootstrap = true; + result = handshake_status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::frontier_req (const nano::frontier_req & message) { - bootstrap = true; + result = handshake_status::bootstrap; } /* - * Realtime + * realtime_message_visitor */ -nano::transport::tcp_server::realtime_message_visitor::realtime_message_visitor (nano::transport::tcp_server & server_a) : - server{ server_a } -{ -} - void nano::transport::tcp_server::realtime_message_visitor::keepalive (const nano::keepalive & message) { process = true; @@ -684,7 +738,7 @@ void nano::transport::tcp_server::realtime_message_visitor::asc_pull_ack (const } /* - * Bootstrap + * bootstrap_message_visitor */ nano::transport::tcp_server::bootstrap_message_visitor::bootstrap_message_visitor (std::shared_ptr server) : @@ -769,6 +823,10 @@ void nano::transport::tcp_server::bootstrap_message_visitor::frontier_req (const processed = true; } +/* + * + */ + // TODO: We could periodically call this (from a dedicated timeout thread for eg.) but socket already handles timeouts, // and since we only ever store tcp_server as weak_ptr, socket timeout will automatically trigger tcp_server cleanup void nano::transport::tcp_server::timeout () @@ -780,7 +838,7 @@ void nano::transport::tcp_server::timeout () } if (socket->has_timed_out ()) { - node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", fmt::streamed (remote_endpoint)); { nano::lock_guard lock{ node->tcp_listener->mutex }; @@ -834,7 +892,7 @@ bool nano::transport::tcp_server::to_bootstrap_connection () ++node->tcp_listener->bootstrap_count; socket->type_set (nano::transport::socket::type_t::bootstrap); - node->logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", fmt::streamed (remote_endpoint)); return true; } @@ -846,17 +904,22 @@ bool nano::transport::tcp_server::to_realtime_connection (nano::account const & { return false; } - if (socket->type () == nano::transport::socket::type_t::undefined && !node->flags.disable_tcp_realtime) + if (node->flags.disable_tcp_realtime) + { + return false; + } + if (socket->type () != nano::transport::socket::type_t::undefined) { - remote_node_id = node_id; - ++node->tcp_listener->realtime_count; - socket->type_set (nano::transport::socket::type_t::realtime); + return false; + } - node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", nano::util::to_str (remote_endpoint)); + remote_node_id = node_id; + ++node->tcp_listener->realtime_count; + socket->type_set (nano::transport::socket::type_t::realtime); - return true; - } - return false; + node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", fmt::streamed (remote_endpoint)); + + return true; } bool nano::transport::tcp_server::is_undefined_connection () const diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 0760241164..e8a220bc14 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -69,19 +69,23 @@ class tcp_server final : public std::enable_shared_from_this std::weak_ptr const node; nano::mutex mutex; std::atomic stopped{ false }; - std::atomic handshake_query_received{ false }; + std::atomic handshake_received{ false }; // Remote enpoint used to remove response channel even after socket closing nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 }; nano::account remote_node_id{}; std::chrono::steady_clock::time_point last_telemetry_req{}; private: - void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2); + enum class process_result + { + abort, + progress, + pause, + }; void receive_message (); void received_message (std::unique_ptr message); - bool process_message (std::unique_ptr message); - + process_result process_message (std::unique_ptr message); void queue_realtime (std::unique_ptr message); bool to_bootstrap_connection (); @@ -90,19 +94,30 @@ class tcp_server final : public std::enable_shared_from_this bool is_bootstrap_connection () const; bool is_realtime_connection () const; + enum class handshake_status + { + abort, + handshake, + realtime, + bootstrap, + }; + + handshake_status process_handshake (nano::node_id_handshake const & message); + void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2); + private: bool const allow_bootstrap; std::shared_ptr message_deserializer; std::optional last_keepalive; -private: +private: // Visitors class handshake_message_visitor : public nano::message_visitor { public: - bool process{ false }; - bool bootstrap{ false }; + handshake_status result{ handshake_status::abort }; - explicit handshake_message_visitor (std::shared_ptr); + explicit handshake_message_visitor (tcp_server & server) : + server{ server } {}; void node_id_handshake (nano::node_id_handshake const &) override; void bulk_pull (nano::bulk_pull const &) override; @@ -111,7 +126,7 @@ class tcp_server final : public std::enable_shared_from_this void frontier_req (nano::frontier_req const &) override; private: - std::shared_ptr server; + tcp_server & server; }; class realtime_message_visitor : public nano::message_visitor @@ -119,7 +134,8 @@ class tcp_server final : public std::enable_shared_from_this public: bool process{ false }; - explicit realtime_message_visitor (tcp_server &); + explicit realtime_message_visitor (tcp_server & server) : + server{ server } {}; void keepalive (nano::keepalive const &) override; void publish (nano::publish const &) override; @@ -150,5 +166,7 @@ class tcp_server final : public std::enable_shared_from_this private: std::shared_ptr server; }; + + friend class handshake_message_visitor; }; }