From c7ab4eee0e89f78420460583794ee595ae68870d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 14 Mar 2024 19:35:48 +0100 Subject: [PATCH 01/13] Organize --- nano/node/transport/tcp.cpp | 58 ++++++++++++++++++------------------- nano/node/transport/tcp.hpp | 57 ++++++++++++++++-------------------- 2 files changed, 54 insertions(+), 61 deletions(-) diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 6dab450206..0ee13cf3db 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -131,6 +131,35 @@ nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function lock{ mutex }; + + message_manager.stop (); + + // Close all TCP sockets + for (auto const & channel : channels) + { + if (channel.socket) + { + channel.socket->close (); + } + // Remove response server + if (channel.response_server) + { + channel.response_server->stop (); + } + } + channels.clear (); +} + bool nano::transport::tcp_channels::insert (std::shared_ptr const & channel_a, std::shared_ptr const & socket_a, std::shared_ptr const & server_a) { auto endpoint (channel_a->get_tcp_endpoint ()); @@ -355,35 +384,6 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa } } -void nano::transport::tcp_channels::start () -{ - ongoing_keepalive (); - ongoing_merge (0); -} - -void nano::transport::tcp_channels::stop () -{ - stopped = true; - nano::unique_lock lock{ mutex }; - - message_manager.stop (); - - // Close all TCP sockets - for (auto const & channel : channels) - { - if (channel.socket) - { - channel.socket->close (); - } - // Remove response server - if (channel.response_server) - { - channel.response_server->stop (); - } - } - channels.clear (); -} - bool nano::transport::tcp_channels::max_ip_connections (nano::tcp_endpoint const & endpoint_a) { if (node.flags.disable_max_peers_per_ip) diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index fd588eb458..e24f662915 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -126,7 +126,11 @@ namespace transport friend class telemetry_simultaneous_requests_Test; public: - explicit tcp_channels (nano::node &, std::function const &)> = nullptr); + explicit tcp_channels (nano::node &, std::function const &)> sink = nullptr); + + void start (); + void stop (); + bool insert (std::shared_ptr const &, std::shared_ptr const &, std::shared_ptr const &); void erase (nano::tcp_endpoint const &); std::size_t size () const; @@ -138,8 +142,6 @@ namespace transport // Get the next peer for attempting a tcp connection nano::tcp_endpoint bootstrap_peer (); void receive (); - void start (); - void stop (); void process_messages (); void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &, std::shared_ptr const &); bool max_ip_connections (nano::tcp_endpoint const & endpoint_a); @@ -166,40 +168,14 @@ namespace transport nano::tcp_message_manager message_manager; private: - class endpoint_tag - { - }; - class ip_address_tag - { - }; - class subnetwork_tag - { - }; - class random_access_tag - { - }; - class last_packet_sent_tag - { - }; - class last_bootstrap_attempt_tag - { - }; - class last_attempt_tag - { - }; - class node_id_tag - { - }; - class version_tag - { - }; - class channel_tcp_wrapper final { public: std::shared_ptr channel; std::shared_ptr socket; std::shared_ptr response_server; + + public: channel_tcp_wrapper (std::shared_ptr channel_a, std::shared_ptr socket_a, std::shared_ptr server_a) : channel (std::move (channel_a)), socket (std::move (socket_a)), response_server (std::move (server_a)) { @@ -234,6 +210,7 @@ namespace transport return channel->get_network_version (); } }; + class tcp_endpoint_attempt final { public: @@ -242,6 +219,7 @@ namespace transport boost::asio::ip::address subnetwork; std::chrono::steady_clock::time_point last_attempt{ std::chrono::steady_clock::now () }; + public: explicit tcp_endpoint_attempt (nano::tcp_endpoint const & endpoint_a) : endpoint (endpoint_a), address (nano::transport::ipv4_address_or_ipv6_subnet (endpoint_a.address ())), @@ -249,7 +227,19 @@ namespace transport { } }; - mutable nano::mutex mutex; + + // clang-format off + class endpoint_tag {}; + class ip_address_tag {}; + class subnetwork_tag {}; + class random_access_tag {}; + class last_packet_sent_tag {}; + class last_bootstrap_attempt_tag {}; + class last_attempt_tag {}; + class node_id_tag {}; + class version_tag {}; + // clang-format on + // clang-format off boost::multi_index_container, mi::const_mem_fun>>> channels; + boost::multi_index_container, @@ -284,7 +275,9 @@ namespace transport private: std::function const &)> sink; + std::atomic stopped{ false }; + mutable nano::mutex mutex; friend class network_peer_max_tcp_attempts_subnetwork_Test; }; From 6c2bf461073b9c1f010b4ce49e88978436471ada Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 14 Mar 2024 19:37:07 +0100 Subject: [PATCH 02/13] Renamings --- nano/node/transport/tcp.cpp | 6 +++--- nano/node/transport/tcp.hpp | 34 +++++++++++++++++----------------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 0ee13cf3db..f78f1cf770 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -308,7 +308,7 @@ nano::tcp_endpoint nano::transport::tcp_channels::bootstrap_peer () if (i->channel->get_network_version () >= node.network_params.network.protocol_version_min) { result = nano::transport::map_endpoint_to_tcp (i->channel->get_peering_endpoint ()); - channels.get ().modify (i, [] (channel_tcp_wrapper & wrapper_a) { + channels.get ().modify (i, [] (channel_entry & wrapper_a) { wrapper_a.channel->set_last_bootstrap_attempt (std::chrono::steady_clock::now ()); }); i = n; @@ -601,7 +601,7 @@ void nano::transport::tcp_channels::modify (std::shared_ptr ().find (channel_a->get_tcp_endpoint ())); if (existing != channels.get ().end ()) { - channels.get ().modify (existing, [modify_callback = std::move (modify_callback_a)] (channel_tcp_wrapper & wrapper_a) { + channels.get ().modify (existing, [modify_callback = std::move (modify_callback_a)] (channel_entry & wrapper_a) { modify_callback (wrapper_a.channel); }); } @@ -613,7 +613,7 @@ void nano::transport::tcp_channels::update (nano::tcp_endpoint const & endpoint_ auto existing (channels.get ().find (endpoint_a)); if (existing != channels.get ().end ()) { - channels.get ().modify (existing, [] (channel_tcp_wrapper & wrapper_a) { + channels.get ().modify (existing, [] (channel_entry & wrapper_a) { wrapper_a.channel->set_last_packet_sent (std::chrono::steady_clock::now ()); }); } diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index e24f662915..edde2816f7 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -168,7 +168,7 @@ namespace transport nano::tcp_message_manager message_manager; private: - class channel_tcp_wrapper final + class channel_entry final { public: std::shared_ptr channel; @@ -176,7 +176,7 @@ namespace transport std::shared_ptr response_server; public: - channel_tcp_wrapper (std::shared_ptr channel_a, std::shared_ptr socket_a, std::shared_ptr server_a) : + channel_entry (std::shared_ptr channel_a, std::shared_ptr socket_a, std::shared_ptr server_a) : channel (std::move (channel_a)), socket (std::move (socket_a)), response_server (std::move (server_a)) { } @@ -211,7 +211,7 @@ namespace transport } }; - class tcp_endpoint_attempt final + class attempt_entry final { public: nano::tcp_endpoint endpoint; @@ -220,7 +220,7 @@ namespace transport std::chrono::steady_clock::time_point last_attempt{ std::chrono::steady_clock::now () }; public: - explicit tcp_endpoint_attempt (nano::tcp_endpoint const & endpoint_a) : + explicit attempt_entry (nano::tcp_endpoint const & endpoint_a) : endpoint (endpoint_a), address (nano::transport::ipv4_address_or_ipv6_subnet (endpoint_a.address ())), subnetwork (nano::transport::map_address_to_subnetwork (endpoint_a.address ())) @@ -241,35 +241,35 @@ namespace transport // clang-format on // clang-format off - boost::multi_index_container>, mi::ordered_non_unique, - mi::const_mem_fun>, + mi::const_mem_fun>, mi::hashed_unique, - mi::const_mem_fun>, + mi::const_mem_fun>, mi::hashed_non_unique, - mi::const_mem_fun>, + mi::const_mem_fun>, mi::ordered_non_unique, - mi::const_mem_fun>, + mi::const_mem_fun>, mi::ordered_non_unique, - mi::const_mem_fun>, + mi::const_mem_fun>, mi::hashed_non_unique, - mi::const_mem_fun>, + mi::const_mem_fun>, mi::hashed_non_unique, - mi::const_mem_fun>>> + mi::const_mem_fun>>> channels; - boost::multi_index_container, - mi::member>, + mi::member>, mi::hashed_non_unique, - mi::member>, + mi::member>, mi::hashed_non_unique, - mi::member>, + mi::member>, mi::ordered_non_unique, - mi::member>>> + mi::member>>> attempts; // clang-format on From 30a23a250f2cb4920792459f0a8c6cda7c88f34d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 14 Mar 2024 20:02:29 +0100 Subject: [PATCH 03/13] Dedicated thread for tcp keepalives --- nano/lib/logging_enums.hpp | 1 + nano/lib/stats_enums.hpp | 1 + nano/lib/thread_roles.cpp | 3 ++ nano/lib/thread_roles.hpp | 1 + nano/node/transport/tcp.cpp | 64 ++++++++++++++++++++++++++++--------- nano/node/transport/tcp.hpp | 11 +++++-- 6 files changed, 64 insertions(+), 17 deletions(-) diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index 822f73621c..a5782edd7e 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -56,6 +56,7 @@ enum class type tcp, tcp_server, tcp_listener, + tcp_channels, prunning, conf_processor_bounded, conf_processor_unbounded, diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 68679b2c53..42f255d20f 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -24,6 +24,7 @@ enum class type : uint8_t http_callback, ipc, tcp, + tcp_channels, channel, socket, confirmation_height, diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 6c66748d6b..bb65aeaa68 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -115,6 +115,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::network_keepalive: thread_role_name_string = "Net keepalive"; break; + case nano::thread_role::name::tcp_keepalive: + thread_role_name_string = "Tcp keepalive"; + break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 8fd4175ae9..2f5a47041d 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -47,6 +47,7 @@ enum class name rep_tiers, network_cleanup, network_keepalive, + tcp_keepalive, }; /* diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index f78f1cf770..94955ad377 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -131,16 +131,34 @@ nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function lock{ mutex }; + { + nano::lock_guard lock{ mutex }; + stopped = true; + } + condition.notify_all (); + + if (keepalive_thread.joinable ()) + { + keepalive_thread.join (); + } message_manager.stop (); @@ -160,6 +178,26 @@ void nano::transport::tcp_channels::stop () channels.clear (); } +// TODO: Merge with keepalive in network class +void nano::transport::tcp_channels::run_keepalive () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + condition.wait_for (lock, node.network_params.network.keepalive_period); + if (stopped) + { + return; + } + lock.unlock (); + + node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::loop_keepalive); + keepalive (); + + lock.lock (); + } +} + bool nano::transport::tcp_channels::insert (std::shared_ptr const & channel_a, std::shared_ptr const & socket_a, std::shared_ptr const & server_a) { auto endpoint (channel_a->get_tcp_endpoint ()); @@ -485,33 +523,29 @@ void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point channels.get ().erase (channels.get ().begin (), lower_bound); } -void nano::transport::tcp_channels::ongoing_keepalive () +void nano::transport::tcp_channels::keepalive () { nano::keepalive message{ node.network_params.network }; node.network.random_fill (message.peers); + nano::unique_lock lock{ mutex }; + + auto const cutoff_time = std::chrono::steady_clock::now () - node.network_params.network.keepalive_period; + // Wake up channels std::vector> send_list; - auto keepalive_sent_cutoff (channels.get ().lower_bound (std::chrono::steady_clock::now () - node.network_params.network.keepalive_period)); + auto keepalive_sent_cutoff (channels.get ().lower_bound (cutoff_time)); for (auto i (channels.get ().begin ()); i != keepalive_sent_cutoff; ++i) { send_list.push_back (i->channel); } + lock.unlock (); + for (auto & channel : send_list) { channel->send (message); } - std::weak_ptr node_w (node.shared ()); - node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.keepalive_period, [node_w] () { - if (auto node_l = node_w.lock ()) - { - if (!node_l->network.tcp_channels.stopped) - { - node_l->network.tcp_channels.ongoing_keepalive (); - } - } - }); } void nano::transport::tcp_channels::ongoing_merge (size_t channel_index) diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index edde2816f7..d2f799b501 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -11,6 +11,7 @@ #include #include +#include #include namespace mi = boost::multi_index; @@ -124,9 +125,11 @@ namespace transport { friend class nano::transport::channel_tcp; friend class telemetry_simultaneous_requests_Test; + friend class network_peer_max_tcp_attempts_subnetwork_Test; public: explicit tcp_channels (nano::node &, std::function const &)> sink = nullptr); + ~tcp_channels (); void start (); void stop (); @@ -164,6 +167,10 @@ namespace transport private: // Dependencies nano::node & node; + private: + void run_keepalive (); + void keepalive (); + public: nano::tcp_message_manager message_manager; @@ -277,9 +284,9 @@ namespace transport std::function const &)> sink; std::atomic stopped{ false }; + nano::condition_variable condition; mutable nano::mutex mutex; - - friend class network_peer_max_tcp_attempts_subnetwork_Test; + std::thread keepalive_thread; }; } // namespace transport } // namespace nano From c60a768e8439cb21033332129bd4d5e79f8a8aa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Fri, 15 Mar 2024 20:00:33 +0100 Subject: [PATCH 04/13] Rename 'reachout' to `track_reachout` --- nano/core_test/network.cpp | 8 ++++---- nano/core_test/peer_container.cpp | 10 +++++----- nano/node/network.cpp | 8 ++++---- nano/node/network.hpp | 6 +++--- nano/node/node.cpp | 2 +- nano/node/transport/tcp.cpp | 2 +- nano/node/transport/tcp.hpp | 4 ++-- 7 files changed, 20 insertions(+), 20 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index bf9ed2e651..976dc12633 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -759,7 +759,7 @@ TEST (network, peer_max_tcp_attempts) node->network.merge_peer (node2->network.endpoint ()); } ASSERT_EQ (0, node->network.size ()); - ASSERT_TRUE (node->network.tcp_channels.reachout (nano::endpoint (node->network.endpoint ().address (), system.get_available_port ()))); + ASSERT_TRUE (node->network.tcp_channels.track_reachout (nano::endpoint (node->network.endpoint ().address (), system.get_available_port ()))); ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::out)); } #endif @@ -779,11 +779,11 @@ namespace transport { auto address (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0x7f000001 + i))); // 127.0.0.1 hex nano::endpoint endpoint (address, system.get_available_port ()); - ASSERT_FALSE (node->network.tcp_channels.reachout (endpoint)); + ASSERT_FALSE (node->network.tcp_channels.track_reachout (endpoint)); } ASSERT_EQ (0, node->network.size ()); ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::out)); - ASSERT_TRUE (node->network.tcp_channels.reachout (nano::endpoint (boost::asio::ip::make_address_v6 ("::ffff:127.0.0.1"), system.get_available_port ()))); + ASSERT_TRUE (node->network.tcp_channels.track_reachout (nano::endpoint (boost::asio::ip::make_address_v6 ("::ffff:127.0.0.1"), system.get_available_port ()))); ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::out)); } } @@ -974,7 +974,7 @@ TEST (network, tcp_no_connect_excluded_peers) ASSERT_EQ (nullptr, node0->network.find_node_id (node1->get_node_id ())); // Should not actively reachout to excluded peers - ASSERT_TRUE (node0->network.reachout (node1->network.endpoint (), true)); + ASSERT_TRUE (node0->network.track_reachout (node1->network.endpoint ())); // Erasing from excluded peers should allow a connection node0->network.excluded_peers.remove (endpoint1_tcp); diff --git a/nano/core_test/peer_container.cpp b/nano/core_test/peer_container.cpp index 6518cca749..7639d3ad91 100644 --- a/nano/core_test/peer_container.cpp +++ b/nano/core_test/peer_container.cpp @@ -217,18 +217,18 @@ TEST (peer_container, reachout) auto outer_node1 = nano::test::add_outer_node (system); ASSERT_NE (nullptr, nano::test::establish_tcp (system, node1, outer_node1->network.endpoint ())); // Make sure having been contacted by them already indicates we shouldn't reach out - ASSERT_TRUE (node1.network.reachout (outer_node1->network.endpoint ())); + ASSERT_TRUE (node1.network.track_reachout (outer_node1->network.endpoint ())); auto outer_node2 = nano::test::add_outer_node (system); - ASSERT_FALSE (node1.network.reachout (outer_node2->network.endpoint ())); + ASSERT_FALSE (node1.network.track_reachout (outer_node2->network.endpoint ())); ASSERT_NE (nullptr, nano::test::establish_tcp (system, node1, outer_node2->network.endpoint ())); // Reaching out to them once should signal we shouldn't reach out again. - ASSERT_TRUE (node1.network.reachout (outer_node2->network.endpoint ())); + ASSERT_TRUE (node1.network.track_reachout (outer_node2->network.endpoint ())); // Make sure we don't purge new items node1.network.cleanup (std::chrono::steady_clock::now () - std::chrono::seconds (10)); - ASSERT_TRUE (node1.network.reachout (outer_node2->network.endpoint ())); + ASSERT_TRUE (node1.network.track_reachout (outer_node2->network.endpoint ())); // Make sure we purge old items node1.network.cleanup (std::chrono::steady_clock::now () + std::chrono::seconds (10)); - ASSERT_FALSE (node1.network.reachout (outer_node2->network.endpoint ())); + ASSERT_FALSE (node1.network.track_reachout (outer_node2->network.endpoint ())); } // This test is similar to network.filter_invalid_version_using with the difference that diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 5287391336..71ef05e7dd 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -411,7 +411,7 @@ void nano::network::merge_peers (std::array const & peers_a) void nano::network::merge_peer (nano::endpoint const & peer_a) { - if (!reachout (peer_a, node.config.allow_local_peers)) + if (!track_reachout (peer_a)) { std::weak_ptr node_w (node.shared ()); node.network.tcp_channels.start_tcp (peer_a); @@ -436,13 +436,13 @@ bool nano::network::not_a_peer (nano::endpoint const & endpoint_a, bool allow_lo return result; } -bool nano::network::reachout (nano::endpoint const & endpoint_a, bool allow_local_peers) +bool nano::network::track_reachout (nano::endpoint const & endpoint_a) { // Don't contact invalid IPs - bool error = not_a_peer (endpoint_a, allow_local_peers); + bool error = not_a_peer (endpoint_a, node.config.allow_local_peers); if (!error) { - error = tcp_channels.reachout (endpoint_a); + error = tcp_channels.track_reachout (endpoint_a); } return error; } diff --git a/nano/node/network.hpp b/nano/node/network.hpp index d51aedaa48..1e9ab3d734 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -78,9 +78,9 @@ class network final void send_keepalive_self (std::shared_ptr const &); std::shared_ptr find_node_id (nano::account const &); std::shared_ptr find_channel (nano::endpoint const &); - bool not_a_peer (nano::endpoint const &, bool); - // Should we reach out to this endpoint with a keepalive message - bool reachout (nano::endpoint const &, bool = false); + bool not_a_peer (nano::endpoint const &, bool allow_local_peers); + // Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt + bool track_reachout (nano::endpoint const &); std::deque> list (std::size_t max_count = 0, uint8_t = 0, bool = true); std::deque> list_non_pr (std::size_t); // Desired fanout for a given scale diff --git a/nano/node/node.cpp b/nano/node/node.cpp index b0de6bc931..fe588244ec 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1120,7 +1120,7 @@ void nano::node::add_initial_peers () for (auto i (store.peer.begin (transaction)), n (store.peer.end ()); i != n; ++i) { nano::endpoint endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ()); - if (!network.reachout (endpoint, config.allow_local_peers)) + if (!network.track_reachout (endpoint)) { network.tcp_channels.start_tcp (endpoint); } diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 94955ad377..b062bdc985 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -469,7 +469,7 @@ bool nano::transport::tcp_channels::max_ip_or_subnetwork_connections (nano::tcp_ return max_ip_connections (endpoint_a) || max_subnetwork_connections (endpoint_a); } -bool nano::transport::tcp_channels::reachout (nano::endpoint const & endpoint_a) +bool nano::transport::tcp_channels::track_reachout (nano::endpoint const & endpoint_a) { auto tcp_endpoint (nano::transport::map_endpoint_to_tcp (endpoint_a)); // Don't overload single IP diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index d2f799b501..d49ef9d452 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -150,8 +150,8 @@ namespace transport bool max_ip_connections (nano::tcp_endpoint const & endpoint_a); bool max_subnetwork_connections (nano::tcp_endpoint const & endpoint_a); bool max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint_a); - // Should we reach out to this endpoint with a keepalive message - bool reachout (nano::endpoint const &); + // Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt + bool track_reachout (nano::endpoint const &); std::unique_ptr collect_container_info (std::string const &); void purge (std::chrono::steady_clock::time_point const &); void ongoing_keepalive (); From ce31080e53681c208a4c6f552e8245ec76d1bd13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Fri, 15 Mar 2024 20:56:32 +0100 Subject: [PATCH 05/13] Dedicated network reachout thread --- nano/lib/stats_enums.hpp | 6 ++- nano/lib/thread_roles.cpp | 3 ++ nano/lib/thread_roles.hpp | 1 + nano/node/network.cpp | 55 +++++++++++++++++--- nano/node/network.hpp | 2 + nano/node/transport/tcp.cpp | 80 ++++++------------------------ nano/node/transport/tcp.hpp | 8 +-- nano/node/transport/tcp_server.cpp | 8 +++ nano/node/transport/tcp_server.hpp | 7 +-- 9 files changed, 93 insertions(+), 77 deletions(-) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 42f255d20f..de7b6d020c 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -71,7 +71,6 @@ enum class detail : uint8_t ok, loop, loop_cleanup, - loop_keepalive, total, process, processed, @@ -217,6 +216,11 @@ enum class detail : uint8_t message_size_too_big, outdated_version, + // network + loop_keepalive, + loop_reachout, + merge_peer, + // tcp tcp_accept_success, tcp_accept_failure, diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index bb65aeaa68..666558d583 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -115,6 +115,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::network_keepalive: thread_role_name_string = "Net keepalive"; break; + case nano::thread_role::name::network_reachout: + thread_role_name_string = "Net reachout"; + break; case nano::thread_role::name::tcp_keepalive: thread_role_name_string = "Tcp keepalive"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 2f5a47041d..4afed7e1ef 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -47,6 +47,7 @@ enum class name rep_tiers, network_cleanup, network_keepalive, + network_reachout, tcp_keepalive, }; diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 71ef05e7dd..ec1df815ff 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -48,6 +48,11 @@ void nano::network::start () run_keepalive (); }); + reachout_thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::network_reachout); + run_reachout (); + }); + if (!node.flags.disable_tcp_realtime) { tcp_channels.start (); @@ -87,6 +92,10 @@ void nano::network::stop () { cleanup_thread.join (); } + if (reachout_thread.joinable ()) + { + reachout_thread.join (); + } port = 0; } @@ -126,12 +135,11 @@ void nano::network::run_cleanup () while (!stopped) { condition.wait_for (lock, node.network_params.network.is_dev_network () ? 1s : 5s); - lock.unlock (); - if (stopped) { return; } + lock.unlock (); node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_cleanup); @@ -154,12 +162,11 @@ void nano::network::run_keepalive () while (!stopped) { condition.wait_for (lock, node.network_params.network.keepalive_period); - lock.unlock (); - if (stopped) { return; } + lock.unlock (); node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_keepalive); @@ -170,6 +177,41 @@ void nano::network::run_keepalive () } } +void nano::network::run_reachout () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + condition.wait_for (lock, node.network_params.network.merge_period); + if (stopped) + { + return; + } + lock.unlock (); + + node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_reachout); + + auto keepalive = tcp_channels.sample_keepalive (); + if (keepalive) + { + for (auto const & peer : keepalive->peers) + { + if (stopped) + { + return; + } + + merge_peer (peer); + + // Throttle reachout attempts + std::this_thread::sleep_for (node.network_params.network.merge_period); + } + } + + lock.lock (); + } +} + void nano::network::send_keepalive (std::shared_ptr const & channel_a) { nano::keepalive message{ node.network_params.network }; @@ -413,8 +455,9 @@ void nano::network::merge_peer (nano::endpoint const & peer_a) { if (!track_reachout (peer_a)) { - std::weak_ptr node_w (node.shared ()); - node.network.tcp_channels.start_tcp (peer_a); + node.stats.inc (nano::stat::type::network, nano::stat::detail::merge_peer); + + tcp_channels.start_tcp (peer_a); } } diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 1e9ab3d734..c10ebc5ccb 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -111,6 +111,7 @@ class network final void run_processing (); void run_cleanup (); void run_keepalive (); + void run_reachout (); void process_message (nano::message const &, std::shared_ptr const &); private: // Dependencies @@ -137,6 +138,7 @@ class network final std::vector processing_threads; // Using boost::thread to enable increased stack size std::thread cleanup_thread; std::thread keepalive_thread; + std::thread reachout_thread; public: static unsigned const broadcast_interval_ms = 10; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index b062bdc985..7d6bbe4a94 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -139,8 +139,6 @@ nano::transport::tcp_channels::~tcp_channels () void nano::transport::tcp_channels::start () { - ongoing_merge (0); - keepalive_thread = std::thread ([this] () { nano::thread_role::set (nano::thread_role::name::tcp_keepalive); run_keepalive (); @@ -548,75 +546,29 @@ void nano::transport::tcp_channels::keepalive () } } -void nano::transport::tcp_channels::ongoing_merge (size_t channel_index) +std::optional nano::transport::tcp_channels::sample_keepalive () { - nano::unique_lock lock{ mutex }; - std::optional keepalive; - size_t count = 0; - while (!keepalive && channels.size () > 0 && count++ < channels.size ()) + nano::lock_guard lock{ mutex }; + + auto next_rand = [this] (std::size_t max) { + std::uniform_int_distribution dist (0, max - 1); + return dist (rng); + }; + + size_t counter = 0; + while (counter++ < channels.size ()) { - ++channel_index; - if (channels.size () <= channel_index) - { - channel_index = 0; - } - auto server = channels.get ()[channel_index].response_server; - if (server && server->last_keepalive) + auto index = next_rand (channels.size ()); + if (auto server = channels.get ()[index].response_server) { - keepalive = std::move (server->last_keepalive); - server->last_keepalive = std::nullopt; - } - } - lock.unlock (); - if (keepalive) - { - ongoing_merge (channel_index, *keepalive, 1); - } - else - { - std::weak_ptr node_w = node.shared (); - node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index] () { - if (auto node_l = node_w.lock ()) + if (auto keepalive = server->pop_last_keepalive ()) { - if (!node_l->network.tcp_channels.stopped) - { - node_l->network.tcp_channels.ongoing_merge (channel_index); - } + return keepalive; } - }); + } } -} -void nano::transport::tcp_channels::ongoing_merge (size_t channel_index, nano::keepalive keepalive, size_t peer_index) -{ - debug_assert (peer_index < keepalive.peers.size ()); - node.network.merge_peer (keepalive.peers[peer_index++]); - if (peer_index < keepalive.peers.size ()) - { - std::weak_ptr node_w = node.shared (); - node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index, keepalive, peer_index] () { - if (auto node_l = node_w.lock ()) - { - if (!node_l->network.tcp_channels.stopped) - { - node_l->network.tcp_channels.ongoing_merge (channel_index, keepalive, peer_index); - } - } - }); - } - else - { - std::weak_ptr node_w = node.shared (); - node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index] () { - if (auto node_l = node_w.lock ()) - { - if (!node_l->network.tcp_channels.stopped) - { - node_l->network.tcp_channels.ongoing_merge (channel_index); - } - } - }); - } + return std::nullopt; } void nano::transport::tcp_channels::list (std::deque> & deque_a, uint8_t minimum_version_a, bool include_temporary_channels_a) diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index d49ef9d452..005ccb0429 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -154,12 +155,11 @@ namespace transport bool track_reachout (nano::endpoint const &); std::unique_ptr collect_container_info (std::string const &); void purge (std::chrono::steady_clock::time_point const &); - void ongoing_keepalive (); - void ongoing_merge (size_t channel_index); - void ongoing_merge (size_t channel_index, nano::keepalive keepalive, size_t peer_index); void list (std::deque> &, uint8_t = 0, bool = true); void modify (std::shared_ptr const &, std::function const &)>); void update (nano::tcp_endpoint const &); + std::optional sample_keepalive (); + // Connection start void start_tcp (nano::endpoint const &); void start_tcp_receive_node_id (std::shared_ptr const &, nano::endpoint const &, std::shared_ptr> const &); @@ -287,6 +287,8 @@ namespace transport nano::condition_variable condition; mutable nano::mutex mutex; std::thread keepalive_thread; + + std::default_random_engine rng; }; } // namespace transport } // namespace nano diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index edd7f9663a..bcad6d5b0b 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -796,6 +796,14 @@ void nano::transport::tcp_server::set_last_keepalive (nano::keepalive const & me } } +std::optional nano::transport::tcp_server::pop_last_keepalive () +{ + std::lock_guard lock{ mutex }; + auto result = last_keepalive; + last_keepalive = std::nullopt; + return result; +} + bool nano::transport::tcp_server::to_bootstrap_connection () { auto node = this->node.lock (); diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 5369258545..0760241164 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -63,6 +63,7 @@ class tcp_server final : public std::enable_shared_from_this void timeout (); void set_last_keepalive (nano::keepalive const & message); + std::optional pop_last_keepalive (); std::shared_ptr const socket; std::weak_ptr const node; @@ -73,7 +74,6 @@ class tcp_server final : public std::enable_shared_from_this nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 }; nano::account remote_node_id{}; std::chrono::steady_clock::time_point last_telemetry_req{}; - std::optional last_keepalive; private: void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2); @@ -90,9 +90,10 @@ class tcp_server final : public std::enable_shared_from_this bool is_bootstrap_connection () const; bool is_realtime_connection () const; +private: + bool const allow_bootstrap; std::shared_ptr message_deserializer; - - bool allow_bootstrap; + std::optional last_keepalive; private: class handshake_message_visitor : public nano::message_visitor From f46673ee16b5aa678c859e2d4c4e7647450b225d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Fri, 15 Mar 2024 21:13:44 +0100 Subject: [PATCH 06/13] Merge tcp channels and network keepalive loops --- nano/lib/thread_roles.cpp | 3 --- nano/lib/thread_roles.hpp | 1 - nano/node/network.cpp | 2 ++ nano/node/transport/tcp.cpp | 32 +------------------------------- nano/node/transport/tcp.hpp | 6 +----- 5 files changed, 4 insertions(+), 40 deletions(-) diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 666558d583..7c75351c4e 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -118,9 +118,6 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::network_reachout: thread_role_name_string = "Net reachout"; break; - case nano::thread_role::name::tcp_keepalive: - thread_role_name_string = "Tcp keepalive"; - break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 4afed7e1ef..a00a552262 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -48,7 +48,6 @@ enum class name network_cleanup, network_keepalive, network_reachout, - tcp_keepalive, }; /* diff --git a/nano/node/network.cpp b/nano/node/network.cpp index ec1df815ff..845debd39c 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -173,6 +173,8 @@ void nano::network::run_keepalive () flood_keepalive (0.75f); flood_keepalive_self (0.25f); + tcp_channels.keepalive (); + lock.lock (); } } diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 7d6bbe4a94..63776946a1 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -133,16 +133,11 @@ nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function lock{ mutex }; - while (!stopped) - { - condition.wait_for (lock, node.network_params.network.keepalive_period); - if (stopped) - { - return; - } - lock.unlock (); - - node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::loop_keepalive); - keepalive (); - - lock.lock (); - } -} - bool nano::transport::tcp_channels::insert (std::shared_ptr const & channel_a, std::shared_ptr const & socket_a, std::shared_ptr const & server_a) { auto endpoint (channel_a->get_tcp_endpoint ()); diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 005ccb0429..252f465c50 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -158,6 +158,7 @@ namespace transport void list (std::deque> &, uint8_t = 0, bool = true); void modify (std::shared_ptr const &, std::function const &)>); void update (nano::tcp_endpoint const &); + void keepalive (); std::optional sample_keepalive (); // Connection start @@ -167,10 +168,6 @@ namespace transport private: // Dependencies nano::node & node; - private: - void run_keepalive (); - void keepalive (); - public: nano::tcp_message_manager message_manager; @@ -286,7 +283,6 @@ namespace transport std::atomic stopped{ false }; nano::condition_variable condition; mutable nano::mutex mutex; - std::thread keepalive_thread; std::default_random_engine rng; }; From 1c52c24f06e64228db2a02c7466a315bf4546641 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 16 Mar 2024 10:02:02 +0100 Subject: [PATCH 07/13] Unused --- nano/node/transport/tcp.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 252f465c50..55ed88f7a2 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -145,7 +145,6 @@ namespace transport std::shared_ptr find_node_id (nano::account const &); // Get the next peer for attempting a tcp connection nano::tcp_endpoint bootstrap_peer (); - void receive (); void process_messages (); void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &, std::shared_ptr const &); bool max_ip_connections (nano::tcp_endpoint const & endpoint_a); From db5f7850cec805dd462767365f5b13ab270ad36b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 16 Mar 2024 14:24:41 +0100 Subject: [PATCH 08/13] Random number generator helper --- nano/lib/random.hpp | 31 +++++++++++++++++++++++++++++++ nano/node/transport/tcp.cpp | 11 ++--------- nano/node/transport/tcp.hpp | 3 ++- 3 files changed, 35 insertions(+), 10 deletions(-) create mode 100644 nano/lib/random.hpp diff --git a/nano/lib/random.hpp b/nano/lib/random.hpp new file mode 100644 index 0000000000..f2f35a7e2e --- /dev/null +++ b/nano/lib/random.hpp @@ -0,0 +1,31 @@ +#pragma once + +#include + +namespace nano +{ +/** + * Not safe for any crypto related code, use for non-crypto PRNG only. + */ +class random_generator final +{ +public: + /// Generate a random number in the range [min, max) + auto random (auto min, auto max) + { + release_assert (min < max); + std::uniform_int_distribution dist (min, max - 1); + return dist (rng); + } + + /// Generate a random number in the range [0, max) + auto random (auto max) + { + return random (decltype (max){ 0 }, max); + } + +private: + std::random_device device; + std::default_random_engine rng{ device () }; +}; +} \ No newline at end of file diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 63776946a1..c8acf552dd 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -224,15 +224,13 @@ std::unordered_set> nano::transport::t nano::lock_guard lock{ mutex }; // Stop trying to fill result with random samples after this many attempts auto random_cutoff (count_a * 2); - auto peers_size (channels.size ()); // Usually count_a will be much smaller than peers.size() // Otherwise make sure we have a cutoff on attempting to randomly fill if (!channels.empty ()) { for (auto i (0); i < random_cutoff && result.size () < count_a; ++i) { - auto index (nano::random_pool::generate_word32 (0, static_cast (peers_size - 1))); - + auto index = rng.random (channels.size ()); auto channel = channels.get ()[index].channel; if (!channel->alive ()) { @@ -520,15 +518,10 @@ std::optional nano::transport::tcp_channels::sample_keepalive ( { nano::lock_guard lock{ mutex }; - auto next_rand = [this] (std::size_t max) { - std::uniform_int_distribution dist (0, max - 1); - return dist (rng); - }; - size_t counter = 0; while (counter++ < channels.size ()) { - auto index = next_rand (channels.size ()); + auto index = rng.random (channels.size ()); if (auto server = channels.get ()[index].response_server) { if (auto keepalive = server->pop_last_keepalive ()) diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 55ed88f7a2..a362ac5399 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -283,7 +284,7 @@ namespace transport nano::condition_variable condition; mutable nano::mutex mutex; - std::default_random_engine rng; + mutable nano::random_generator rng; }; } // namespace transport } // namespace nano From 653bee155bea0865f3241e993e2a2f5f6f21a166 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 16 Mar 2024 14:27:53 +0100 Subject: [PATCH 09/13] Properly close channel container --- nano/node/transport/tcp.cpp | 9 ++++++++- nano/node/transport/tcp.hpp | 3 +++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index c8acf552dd..bc86b4cbc0 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -150,7 +150,13 @@ void nano::transport::tcp_channels::stop () message_manager.stop (); - // Close all TCP sockets + close (); +} + +void nano::transport::tcp_channels::close () +{ + nano::lock_guard lock{ mutex }; + for (auto const & channel : channels) { if (channel.socket) @@ -163,6 +169,7 @@ void nano::transport::tcp_channels::stop () channel.response_server->stop (); } } + channels.clear (); } diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index a362ac5399..e0ae674843 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -171,6 +171,9 @@ namespace transport public: nano::tcp_message_manager message_manager; + private: + void close (); + private: class channel_entry final { From fb0bf768a9642ffbf586f24e9bfe64bc4923358b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 16 Mar 2024 15:31:47 +0100 Subject: [PATCH 10/13] Rework channel purging --- nano/core_test/network.cpp | 2 +- nano/core_test/peer_container.cpp | 1 + nano/core_test/socket.cpp | 4 +- nano/node/transport/channel.hpp | 3 ++ nano/node/transport/fake.hpp | 2 +- nano/node/transport/inproc.hpp | 5 ++ nano/node/transport/tcp.cpp | 79 +++++++++++++++++++------------ nano/node/transport/tcp.hpp | 35 +++++++------- 8 files changed, 80 insertions(+), 51 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 976dc12633..9ffa478f91 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -1080,7 +1080,7 @@ TEST (network, cleanup_purge) ASSERT_EQ (1, node1.network.size ()); node1.network.cleanup (std::chrono::steady_clock::now ()); - ASSERT_EQ (0, node1.network.size ()); + ASSERT_TIMELY_EQ (5s, 0, node1.network.size ()); } TEST (network, loopback_channel) diff --git a/nano/core_test/peer_container.cpp b/nano/core_test/peer_container.cpp index 7639d3ad91..dd6b69edc2 100644 --- a/nano/core_test/peer_container.cpp +++ b/nano/core_test/peer_container.cpp @@ -228,6 +228,7 @@ TEST (peer_container, reachout) ASSERT_TRUE (node1.network.track_reachout (outer_node2->network.endpoint ())); // Make sure we purge old items node1.network.cleanup (std::chrono::steady_clock::now () + std::chrono::seconds (10)); + ASSERT_TIMELY (5s, node1.network.empty ()); ASSERT_FALSE (node1.network.track_reachout (outer_node2->network.endpoint ())); } diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index eaa31f761b..1ec356391b 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -420,7 +420,7 @@ TEST (socket, drop_policy) }); auto client = std::make_shared (*node); - nano::transport::channel_tcp channel{ *node, client }; + auto channel = std::make_shared (*node, client); nano::test::counted_completion write_completion (static_cast (total_message_count)); client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), listener->endpoint ().port ()), @@ -428,7 +428,7 @@ TEST (socket, drop_policy) for (int i = 0; i < total_message_count; i++) { std::vector buff (1); - channel.send_buffer ( + channel->send_buffer ( nano::shared_const_buffer (std::move (buff)), [&write_completion, client] (boost::system::error_code const & ec, size_t size_a) mutable { client.reset (); write_completion.increment (); diff --git a/nano/node/transport/channel.hpp b/nano/node/transport/channel.hpp index bede756ce3..35f8f37bab 100644 --- a/nano/node/transport/channel.hpp +++ b/nano/node/transport/channel.hpp @@ -41,6 +41,8 @@ class channel nano::transport::traffic_type = nano::transport::traffic_type::generic) = 0; + virtual void close () = 0; + virtual std::string to_string () const = 0; virtual nano::endpoint get_endpoint () const = 0; virtual nano::tcp_endpoint get_tcp_endpoint () const = 0; @@ -50,6 +52,7 @@ class channel { return false; } + virtual bool alive () const { return true; diff --git a/nano/node/transport/fake.hpp b/nano/node/transport/fake.hpp index 809c5b98ae..5c720158bd 100644 --- a/nano/node/transport/fake.hpp +++ b/nano/node/transport/fake.hpp @@ -49,7 +49,7 @@ namespace transport return nano::transport::transport_type::fake; } - void close () + void close () override { closed = true; } diff --git a/nano/node/transport/inproc.hpp b/nano/node/transport/inproc.hpp index c6012bc1a1..fc318ef51f 100644 --- a/nano/node/transport/inproc.hpp +++ b/nano/node/transport/inproc.hpp @@ -43,6 +43,11 @@ namespace transport return nano::transport::transport_type::loopback; } + void close () override + { + // Can't be closed + } + private: nano::node & destination; nano::endpoint const endpoint; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index bc86b4cbc0..49717856d8 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -54,12 +54,12 @@ void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const if (!socket_l->max (traffic_type) || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket_l->full (traffic_type))) { socket_l->async_write ( - buffer_a, [endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr (node.shared ()), callback_a] (boost::system::error_code const & ec, std::size_t size_a) { + buffer_a, [this_s = shared_from_this (), endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr{ node.shared () }, callback_a] (boost::system::error_code const & ec, std::size_t size_a) { if (auto node_l = node.lock ()) { if (!ec) { - node_l->network.tcp_channels.update (endpoint_a); + this_s->set_last_packet_sent (std::chrono::steady_clock::now ()); } if (ec == boost::system::errc::host_unreachable) { @@ -475,25 +475,52 @@ std::unique_ptr nano::transport::tcp_channels::c return composite; } -void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point const & cutoff_a) +void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point cutoff_deadline) { nano::lock_guard lock{ mutex }; - // Remove channels with dead underlying sockets - erase_if (channels, [] (auto const & entry) { - return !entry.channel->alive (); - }); + node.logger.debug (nano::log::type::tcp_channels, "Performing periodic channel cleanup, cutoff: {}s", nano::log::seconds_delta (cutoff_deadline)); + + auto should_close = [this, cutoff_deadline] (auto const & channel) { + // Remove channels that haven't successfully sent a message within the cutoff time + if (auto last = channel->get_last_packet_sent (); last < cutoff_deadline) + { + node.logger.debug (nano::log::type::tcp_channels, "Closing idle channel: {} (idle for {}s)", + channel->to_string (), + nano::log::seconds_delta (last)); + + return true; // Close + } + // Check if any tcp channels belonging to old protocol versions which may still be alive due to async operations + if (channel->get_network_version () < node.network_params.network.protocol_version_min) + { + node.logger.debug (nano::log::type::tcp_channels, "Closing channel with old protocol version: {}", channel->to_string ()); + + return true; // Close + } + return false; + }; - auto disconnect_cutoff (channels.get ().lower_bound (cutoff_a)); - channels.get ().erase (channels.get ().begin (), disconnect_cutoff); + for (auto const & entry : channels) + { + if (should_close (entry.channel)) + { + entry.channel->close (); + } + } + + erase_if (channels, [this] (auto const & entry) { + if (!entry.channel->alive ()) + { + node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", entry.channel->to_string ()); + return true; // Erase + } + return false; + }); // Remove keepalive attempt tracking for attempts older than cutoff - auto attempts_cutoff (attempts.get ().lower_bound (cutoff_a)); + auto attempts_cutoff (attempts.get ().lower_bound (cutoff_deadline)); attempts.get ().erase (attempts.get ().begin (), attempts_cutoff); - - // Check if any tcp channels belonging to old protocol versions which may still be alive due to async operations - auto lower_bound = channels.get ().lower_bound (node.network_params.network.protocol_version_min); - channels.get ().erase (channels.get ().begin (), lower_bound); } void nano::transport::tcp_channels::keepalive () @@ -506,16 +533,18 @@ void nano::transport::tcp_channels::keepalive () auto const cutoff_time = std::chrono::steady_clock::now () - node.network_params.network.keepalive_period; // Wake up channels - std::vector> send_list; - auto keepalive_sent_cutoff (channels.get ().lower_bound (cutoff_time)); - for (auto i (channels.get ().begin ()); i != keepalive_sent_cutoff; ++i) + std::vector> to_wakeup; + for (auto const & entry : channels) { - send_list.push_back (i->channel); + if (entry.channel->get_last_packet_sent () < cutoff_time) + { + to_wakeup.push_back (entry.channel); + } } lock.unlock (); - for (auto & channel : send_list) + for (auto & channel : to_wakeup) { channel->send (message); } @@ -563,18 +592,6 @@ void nano::transport::tcp_channels::modify (std::shared_ptr lock{ mutex }; - auto existing (channels.get ().find (endpoint_a)); - if (existing != channels.get ().end ()) - { - channels.get ().modify (existing, [] (channel_entry & wrapper_a) { - wrapper_a.channel->set_last_packet_sent (std::chrono::steady_clock::now ()); - }); - } -} - void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a) { auto socket = std::make_shared (node); diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index e0ae674843..0402b915af 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -55,7 +55,7 @@ namespace transport class tcp_server; class tcp_channels; - class channel_tcp : public nano::transport::channel + class channel_tcp : public nano::transport::channel, public std::enable_shared_from_this { friend class nano::transport::tcp_channels; @@ -74,10 +74,6 @@ namespace transport { return &node == &other_a.node && socket.lock () == other_a.socket.lock (); } - std::weak_ptr socket; - /* Mark for temporary channels. Usually remote ports of these channels are ephemeral and received from incoming connections to server. - If remote part has open listening port, temporary channel will be replaced with direct connection to listening port soon. But if other side is behing NAT or firewall this connection can be pemanent. */ - std::atomic temporary{ false }; void set_endpoint (); @@ -97,7 +93,7 @@ namespace transport return nano::transport::transport_type::tcp; } - virtual bool max (nano::transport::traffic_type traffic_type) override + bool max (nano::transport::traffic_type traffic_type) override { bool result = true; if (auto socket_l = socket.lock ()) @@ -107,7 +103,7 @@ namespace transport return result; } - virtual bool alive () const override + bool alive () const override { if (auto socket_l = socket.lock ()) { @@ -116,6 +112,21 @@ namespace transport return false; } + void close () override + { + if (auto socket_l = socket.lock ()) + { + socket_l->close (); + } + } + + public: + std::weak_ptr socket; + + /* Mark for temporary channels. Usually remote ports of these channels are ephemeral and received from incoming connections to server. + If remote part has open listening port, temporary channel will be replaced with direct connection to listening port soon. But if other side is behing NAT or firewall this connection can be pemanent. */ + std::atomic temporary{ false }; + private: nano::tcp_endpoint endpoint{ boost::asio::ip::address_v6::any (), 0 }; @@ -154,10 +165,9 @@ namespace transport // Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt bool track_reachout (nano::endpoint const &); std::unique_ptr collect_container_info (std::string const &); - void purge (std::chrono::steady_clock::time_point const &); + void purge (std::chrono::steady_clock::time_point cutoff_deadline); void list (std::deque> &, uint8_t = 0, bool = true); void modify (std::shared_ptr const &, std::function const &)>); - void update (nano::tcp_endpoint const &); void keepalive (); std::optional sample_keepalive (); @@ -191,10 +201,6 @@ namespace transport { return channel->get_tcp_endpoint (); } - std::chrono::steady_clock::time_point last_packet_sent () const - { - return channel->get_last_packet_sent (); - } std::chrono::steady_clock::time_point last_bootstrap_attempt () const { return channel->get_last_bootstrap_attempt (); @@ -240,7 +246,6 @@ namespace transport class ip_address_tag {}; class subnetwork_tag {}; class random_access_tag {}; - class last_packet_sent_tag {}; class last_bootstrap_attempt_tag {}; class last_attempt_tag {}; class node_id_tag {}; @@ -257,8 +262,6 @@ namespace transport mi::const_mem_fun>, mi::hashed_non_unique, mi::const_mem_fun>, - mi::ordered_non_unique, - mi::const_mem_fun>, mi::ordered_non_unique, mi::const_mem_fun>, mi::hashed_non_unique, From fffc74693cb402ebf1c2c2c88e681285ae3813d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 16 Mar 2024 18:06:07 +0100 Subject: [PATCH 11/13] Reverse track_reachout return value --- nano/core_test/network.cpp | 8 ++++---- nano/core_test/peer_container.cpp | 10 +++++----- nano/node/network.cpp | 9 ++++----- nano/node/node.cpp | 2 +- nano/node/transport/tcp.cpp | 31 ++++++++++++++++++++++--------- 5 files changed, 36 insertions(+), 24 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 9ffa478f91..cf5188bc55 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -759,7 +759,7 @@ TEST (network, peer_max_tcp_attempts) node->network.merge_peer (node2->network.endpoint ()); } ASSERT_EQ (0, node->network.size ()); - ASSERT_TRUE (node->network.tcp_channels.track_reachout (nano::endpoint (node->network.endpoint ().address (), system.get_available_port ()))); + ASSERT_FALSE (node->network.tcp_channels.track_reachout (nano::endpoint (node->network.endpoint ().address (), system.get_available_port ()))); ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::out)); } #endif @@ -779,11 +779,11 @@ namespace transport { auto address (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0x7f000001 + i))); // 127.0.0.1 hex nano::endpoint endpoint (address, system.get_available_port ()); - ASSERT_FALSE (node->network.tcp_channels.track_reachout (endpoint)); + ASSERT_TRUE (node->network.tcp_channels.track_reachout (endpoint)); } ASSERT_EQ (0, node->network.size ()); ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::out)); - ASSERT_TRUE (node->network.tcp_channels.track_reachout (nano::endpoint (boost::asio::ip::make_address_v6 ("::ffff:127.0.0.1"), system.get_available_port ()))); + ASSERT_FALSE (node->network.tcp_channels.track_reachout (nano::endpoint (boost::asio::ip::make_address_v6 ("::ffff:127.0.0.1"), system.get_available_port ()))); ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::out)); } } @@ -974,7 +974,7 @@ TEST (network, tcp_no_connect_excluded_peers) ASSERT_EQ (nullptr, node0->network.find_node_id (node1->get_node_id ())); // Should not actively reachout to excluded peers - ASSERT_TRUE (node0->network.track_reachout (node1->network.endpoint ())); + ASSERT_FALSE (node0->network.track_reachout (node1->network.endpoint ())); // Erasing from excluded peers should allow a connection node0->network.excluded_peers.remove (endpoint1_tcp); diff --git a/nano/core_test/peer_container.cpp b/nano/core_test/peer_container.cpp index dd6b69edc2..c0f04d9d91 100644 --- a/nano/core_test/peer_container.cpp +++ b/nano/core_test/peer_container.cpp @@ -217,19 +217,19 @@ TEST (peer_container, reachout) auto outer_node1 = nano::test::add_outer_node (system); ASSERT_NE (nullptr, nano::test::establish_tcp (system, node1, outer_node1->network.endpoint ())); // Make sure having been contacted by them already indicates we shouldn't reach out - ASSERT_TRUE (node1.network.track_reachout (outer_node1->network.endpoint ())); + ASSERT_FALSE (node1.network.track_reachout (outer_node1->network.endpoint ())); auto outer_node2 = nano::test::add_outer_node (system); - ASSERT_FALSE (node1.network.track_reachout (outer_node2->network.endpoint ())); + ASSERT_TRUE (node1.network.track_reachout (outer_node2->network.endpoint ())); ASSERT_NE (nullptr, nano::test::establish_tcp (system, node1, outer_node2->network.endpoint ())); // Reaching out to them once should signal we shouldn't reach out again. - ASSERT_TRUE (node1.network.track_reachout (outer_node2->network.endpoint ())); + ASSERT_FALSE (node1.network.track_reachout (outer_node2->network.endpoint ())); // Make sure we don't purge new items node1.network.cleanup (std::chrono::steady_clock::now () - std::chrono::seconds (10)); - ASSERT_TRUE (node1.network.track_reachout (outer_node2->network.endpoint ())); + ASSERT_FALSE (node1.network.track_reachout (outer_node2->network.endpoint ())); // Make sure we purge old items node1.network.cleanup (std::chrono::steady_clock::now () + std::chrono::seconds (10)); ASSERT_TIMELY (5s, node1.network.empty ()); - ASSERT_FALSE (node1.network.track_reachout (outer_node2->network.endpoint ())); + ASSERT_TRUE (node1.network.track_reachout (outer_node2->network.endpoint ())); } // This test is similar to network.filter_invalid_version_using with the difference that diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 845debd39c..95381bcad8 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -455,7 +455,7 @@ void nano::network::merge_peers (std::array const & peers_a) void nano::network::merge_peer (nano::endpoint const & peer_a) { - if (!track_reachout (peer_a)) + if (track_reachout (peer_a)) { node.stats.inc (nano::stat::type::network, nano::stat::detail::merge_peer); @@ -484,12 +484,11 @@ bool nano::network::not_a_peer (nano::endpoint const & endpoint_a, bool allow_lo bool nano::network::track_reachout (nano::endpoint const & endpoint_a) { // Don't contact invalid IPs - bool error = not_a_peer (endpoint_a, node.config.allow_local_peers); - if (!error) + if (not_a_peer (endpoint_a, node.config.allow_local_peers)) { - error = tcp_channels.track_reachout (endpoint_a); + return false; } - return error; + return tcp_channels.track_reachout (endpoint_a); } std::deque> nano::network::list (std::size_t count_a, uint8_t minimum_version_a, bool include_tcp_temporary_channels_a) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index fe588244ec..c3dde6cfee 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1120,7 +1120,7 @@ void nano::node::add_initial_peers () for (auto i (store.peer.begin (transaction)), n (store.peer.end ()); i != n; ++i) { nano::endpoint endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ()); - if (!network.track_reachout (endpoint)) + if (network.track_reachout (endpoint)) { network.tcp_channels.start_tcp (endpoint); } diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 49717856d8..ac7ba411f5 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -444,18 +444,31 @@ bool nano::transport::tcp_channels::max_ip_or_subnetwork_connections (nano::tcp_ bool nano::transport::tcp_channels::track_reachout (nano::endpoint const & endpoint_a) { - auto tcp_endpoint (nano::transport::map_endpoint_to_tcp (endpoint_a)); + auto const tcp_endpoint = nano::transport::map_endpoint_to_tcp (endpoint_a); + // Don't overload single IP - bool error = node.network.excluded_peers.check (tcp_endpoint) || max_ip_or_subnetwork_connections (tcp_endpoint); - if (!error && !node.flags.disable_tcp_realtime) + if (max_ip_or_subnetwork_connections (tcp_endpoint)) { - // Don't keepalive to nodes that already sent us something - error |= find_channel (tcp_endpoint) != nullptr; - nano::lock_guard lock{ mutex }; - auto inserted (attempts.emplace (tcp_endpoint)); - error |= !inserted.second; + return false; } - return error; + if (node.network.excluded_peers.check (tcp_endpoint)) + { + return false; + } + if (node.flags.disable_tcp_realtime) + { + return false; + } + + // Don't keepalive to nodes that already sent us something + if (find_channel (tcp_endpoint) != nullptr) + { + return false; + } + + nano::lock_guard lock{ mutex }; + auto [it, inserted] = attempts.emplace (tcp_endpoint); + return inserted; } std::unique_ptr nano::transport::tcp_channels::collect_container_info (std::string const & name) From 8bfb8af34b2c6c02fdeb9761d286edf2d794ffe4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 16 Mar 2024 18:17:13 +0100 Subject: [PATCH 12/13] Rework `add_initial_peers` --- nano/node/node.cpp | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index c3dde6cfee..43d2b87ac6 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1116,15 +1116,22 @@ void nano::node::add_initial_peers () return; } - auto transaction (store.tx_begin_read ()); - for (auto i (store.peer.begin (transaction)), n (store.peer.end ()); i != n; ++i) + std::vector initial_peers; { - nano::endpoint endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ()); - if (network.track_reachout (endpoint)) + auto transaction = store.tx_begin_read (); + for (auto i (store.peer.begin (transaction)), n (store.peer.end ()); i != n; ++i) { - network.tcp_channels.start_tcp (endpoint); + nano::endpoint endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ()); + initial_peers.push_back (endpoint); } } + + logger.info (nano::log::type::node, "Adding cached initial peers: {}", initial_peers.size ()); + + for (auto const & peer : initial_peers) + { + network.merge_peer (peer); + } } void nano::node::start_election (std::shared_ptr const & block) From 0d61dbae7109d976ee581ef9f5acabe5dae23757 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 17 Mar 2024 10:21:56 +0100 Subject: [PATCH 13/13] Stop network component last --- nano/node/node.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 43d2b87ac6..054e3669f7 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -684,7 +684,6 @@ void nano::node::stop () generator.stop (); final_generator.stop (); confirmation_height_processor.stop (); - network.stop (); telemetry.stop (); websocket.stop (); bootstrap_server.stop (); @@ -696,6 +695,8 @@ void nano::node::stop () epoch_upgrader.stop (); workers.stop (); local_block_broadcaster.stop (); + network.stop (); // Stop network last to avoid killing in-use sockets + // work pool is not stopped on purpose due to testing setup }