diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index c9ebbcd90f..942fc178d7 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -61,7 +61,7 @@ TEST (network, construction_with_specified_port) auto const node = system.add_node (nano::node_config{ port, system.logging }); EXPECT_EQ (port, node->network.port); EXPECT_EQ (port, node->network.endpoint ().port ()); - EXPECT_EQ (port, node->tcp_listener.endpoint ().port ()); + EXPECT_EQ (port, node->tcp_listener->endpoint ().port ()); } TEST (network, construction_without_specified_port) @@ -71,7 +71,7 @@ TEST (network, construction_without_specified_port) auto const port = node->network.port.load (); EXPECT_NE (0, port); EXPECT_EQ (port, node->network.endpoint ().port ()); - EXPECT_EQ (port, node->tcp_listener.endpoint ().port ()); + EXPECT_EQ (port, node->tcp_listener->endpoint ().port ()); } TEST (network, send_node_id_handshake_tcp) @@ -646,8 +646,8 @@ TEST (node, port_mapping) TEST (tcp_listener, tcp_node_id_handshake) { nano::test::system system (1); - auto socket (std::make_shared (*system.nodes[0])); - auto bootstrap_endpoint (system.nodes[0]->tcp_listener.endpoint ()); + auto socket (std::make_shared (*system.nodes[0])); + auto bootstrap_endpoint (system.nodes[0]->tcp_listener->endpoint ()); auto cookie (system.nodes[0]->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (bootstrap_endpoint))); ASSERT_TRUE (cookie); nano::node_id_handshake::query_payload query{ *cookie }; @@ -684,9 +684,9 @@ TEST (tcp_listener, DISABLED_tcp_listener_timeout_empty) { nano::test::system system (1); auto node0 (system.nodes[0]); - auto socket (std::make_shared (*node0)); + auto socket (std::make_shared (*node0)); std::atomic connected (false); - socket->async_connect (node0->tcp_listener.endpoint (), [&connected] (boost::system::error_code const & ec) { + socket->async_connect (node0->tcp_listener->endpoint (), [&connected] (boost::system::error_code const & ec) { ASSERT_FALSE (ec); connected = true; }); @@ -696,8 +696,8 @@ TEST (tcp_listener, DISABLED_tcp_listener_timeout_empty) while (!disconnected) { { - nano::lock_guard guard (node0->tcp_listener.mutex); - disconnected = node0->tcp_listener.connections.empty (); + nano::lock_guard guard (node0->tcp_listener->mutex); + disconnected = node0->tcp_listener->connections.empty (); } ASSERT_NO_ERROR (system.poll ()); } @@ -707,13 +707,13 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake) { nano::test::system system (1); auto node0 (system.nodes[0]); - auto socket (std::make_shared (*node0)); - auto cookie (node0->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (node0->tcp_listener.endpoint ()))); + auto socket (std::make_shared (*node0)); + auto cookie (node0->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (node0->tcp_listener->endpoint ()))); ASSERT_TRUE (cookie); nano::node_id_handshake::query_payload query{ *cookie }; nano::node_id_handshake node_id_handshake{ nano::dev::network_params.network, query }; auto channel = std::make_shared (*node0, socket); - socket->async_connect (node0->tcp_listener.endpoint (), [&node_id_handshake, channel] (boost::system::error_code const & ec) { + socket->async_connect (node0->tcp_listener->endpoint (), [&node_id_handshake, channel] (boost::system::error_code const & ec) { ASSERT_FALSE (ec); channel->send (node_id_handshake, [] (boost::system::error_code const & ec, size_t size_a) { ASSERT_FALSE (ec); @@ -721,16 +721,16 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake) }); ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake) != 0); { - nano::lock_guard guard (node0->tcp_listener.mutex); - ASSERT_EQ (node0->tcp_listener.connections.size (), 1); + nano::lock_guard guard (node0->tcp_listener->mutex); + ASSERT_EQ (node0->tcp_listener->connections.size (), 1); } bool disconnected (false); system.deadline_set (std::chrono::seconds (20)); while (!disconnected) { { - nano::lock_guard guard (node0->tcp_listener.mutex); - disconnected = node0->tcp_listener.connections.empty (); + nano::lock_guard guard (node0->tcp_listener->mutex); + disconnected = node0->tcp_listener->connections.empty (); } ASSERT_NO_ERROR (system.poll ()); } diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 2627406a02..18eebf6520 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -3184,7 +3184,7 @@ TEST (node, peers) node2->start (); ASSERT_TIMELY (10s, !node2->network.empty () && !node1->network.empty ()) // Wait to finish TCP node ID handshakes - ASSERT_TIMELY (10s, node1->tcp_listener.realtime_count != 0 && node2->tcp_listener.realtime_count != 0); + ASSERT_TIMELY (10s, node1->tcp_listener->realtime_count != 0 && node2->tcp_listener->realtime_count != 0); // Confirm that the peers match with the endpoints we are expecting ASSERT_EQ (1, node1->network.size ()); auto list1 (node1->network.list (2)); diff --git a/nano/core_test/request_aggregator.cpp b/nano/core_test/request_aggregator.cpp index 344b6316b5..0ddefbe700 100644 --- a/nano/core_test/request_aggregator.cpp +++ b/nano/core_test/request_aggregator.cpp @@ -29,7 +29,7 @@ TEST (request_aggregator, one) .build_shared (); std::vector> request; request.emplace_back (send1->hash (), send1->root ()); - auto client = std::make_shared (node); + auto client = std::make_shared (node); std::shared_ptr dummy_channel = std::make_shared (node, client); node.aggregator.add (dummy_channel, request); ASSERT_EQ (1, node.aggregator.size ()); @@ -98,7 +98,7 @@ TEST (request_aggregator, one_update) ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *receive1).code); std::vector> request; request.emplace_back (send2->hash (), send2->root ()); - auto client = std::make_shared (node); + auto client = std::make_shared (node); std::shared_ptr dummy_channel = std::make_shared (node, client); node.aggregator.add (dummy_channel, request); request.clear (); @@ -165,7 +165,7 @@ TEST (request_aggregator, two) std::vector> request; request.emplace_back (send2->hash (), send2->root ()); request.emplace_back (receive1->hash (), receive1->root ()); - auto client = std::make_shared (node); + auto client = std::make_shared (node); std::shared_ptr dummy_channel = std::make_shared (node, client); // Process both blocks node.aggregator.add (dummy_channel, request); @@ -289,7 +289,7 @@ TEST (request_aggregator, split) election->force_confirm (); ASSERT_TIMELY (5s, max_vbh + 2 == node.ledger.cache.cemented_count); ASSERT_EQ (max_vbh + 1, request.size ()); - auto client = std::make_shared (node); + auto client = std::make_shared (node); std::shared_ptr dummy_channel = std::make_shared (node, client); node.aggregator.add (dummy_channel, request); ASSERT_EQ (1, node.aggregator.size ()); @@ -330,7 +330,7 @@ TEST (request_aggregator, channel_lifetime) request.emplace_back (send1->hash (), send1->root ()); { // The aggregator should extend the lifetime of the channel - auto client = std::make_shared (node); + auto client = std::make_shared (node); std::shared_ptr dummy_channel = std::make_shared (node, client); node.aggregator.add (dummy_channel, request); } @@ -361,11 +361,11 @@ TEST (request_aggregator, channel_update) request.emplace_back (send1->hash (), send1->root ()); std::weak_ptr channel1_w; { - auto client1 = std::make_shared (node); + auto client1 = std::make_shared (node); std::shared_ptr dummy_channel1 = std::make_shared (node, client1); channel1_w = dummy_channel1; node.aggregator.add (dummy_channel1, request); - auto client2 = std::make_shared (node); + auto client2 = std::make_shared (node); std::shared_ptr dummy_channel2 = std::make_shared (node, client2); // The aggregator then hold channel2 and drop channel1 node.aggregator.add (dummy_channel2, request); @@ -399,7 +399,7 @@ TEST (request_aggregator, channel_max_queue) ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code); std::vector> request; request.emplace_back (send1->hash (), send1->root ()); - auto client = std::make_shared (node); + auto client = std::make_shared (node); std::shared_ptr dummy_channel = std::make_shared (node, client); node.aggregator.add (dummy_channel, request); node.aggregator.add (dummy_channel, request); @@ -427,7 +427,7 @@ TEST (request_aggregator, unique) ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code); std::vector> request; request.emplace_back (send1->hash (), send1->root ()); - auto client = std::make_shared (node); + auto client = std::make_shared (node); std::shared_ptr dummy_channel = std::make_shared (node, client); node.aggregator.add (dummy_channel, request); node.aggregator.add (dummy_channel, request); @@ -474,7 +474,7 @@ TEST (request_aggregator, cannot_vote) request.emplace_back (send2->hash (), send2->root ()); // Incorrect hash, correct root request.emplace_back (1, send2->root ()); - auto client = std::make_shared (node); + auto client = std::make_shared (node); std::shared_ptr dummy_channel = std::make_shared (node, client); node.aggregator.add (dummy_channel, request); ASSERT_EQ (1, node.aggregator.size ()); diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index ea716bb3e1..3df857307c 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -24,23 +24,19 @@ TEST (socket, max_connections) auto node = system.add_node (); auto server_port = system.get_available_port (); - boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port }; - - // start a server socket that allows max 2 live connections - auto server_socket = std::make_shared (*node, listen_endpoint, 2); - boost::system::error_code ec; - server_socket->start (ec); - ASSERT_FALSE (ec); - - boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_socket->listening_port () }; // successful incoming connections are stored in server_sockets to keep them alive (server side) std::vector> server_sockets; - server_socket->on_connection ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { + + // start a server socket that allows max 2 live connections + auto listener = std::make_shared (server_port, *node, 2); + listener->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { server_sockets.push_back (new_connection); return true; }); + boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), listener->endpoint ().port () }; + // client side connection tracking std::atomic connection_attempts = 0; auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) { @@ -50,13 +46,13 @@ TEST (socket, max_connections) // start 3 clients, 2 will persist but 1 will be dropped - auto client1 = std::make_shared (*node); + auto client1 = std::make_shared (*node); client1->async_connect (dst_endpoint, connect_handler); - auto client2 = std::make_shared (*node); + auto client2 = std::make_shared (*node); client2->async_connect (dst_endpoint, connect_handler); - auto client3 = std::make_shared (*node); + auto client3 = std::make_shared (*node); client3->async_connect (dst_endpoint, connect_handler); auto get_tcp_accept_failures = [&node] () { @@ -70,35 +66,36 @@ TEST (socket, max_connections) ASSERT_TIMELY_EQ (5s, get_tcp_accept_failures (), 1); ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), 2); ASSERT_TIMELY_EQ (5s, connection_attempts, 3); + ASSERT_TIMELY_EQ (5s, server_sockets.size (), 2); // create space for one socket and fill the connections table again server_sockets[0].reset (); - auto client4 = std::make_shared (*node); + auto client4 = std::make_shared (*node); client4->async_connect (dst_endpoint, connect_handler); - auto client5 = std::make_shared (*node); + auto client5 = std::make_shared (*node); client5->async_connect (dst_endpoint, connect_handler); ASSERT_TIMELY_EQ (5s, get_tcp_accept_failures (), 2); ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), 3); ASSERT_TIMELY_EQ (5s, connection_attempts, 5); + ASSERT_TIMELY_EQ (5s, server_sockets.size (), 3); // close all existing sockets and fill the connections table again // start counting form 1 because 0 is the already closed socket server_sockets[1].reset (); server_sockets[2].reset (); - ASSERT_EQ (server_sockets.size (), 3); - auto client6 = std::make_shared (*node); + auto client6 = std::make_shared (*node); client6->async_connect (dst_endpoint, connect_handler); - auto client7 = std::make_shared (*node); + auto client7 = std::make_shared (*node); client7->async_connect (dst_endpoint, connect_handler); - auto client8 = std::make_shared (*node); + auto client8 = std::make_shared (*node); client8->async_connect (dst_endpoint, connect_handler); ASSERT_TIMELY_EQ (5s, get_tcp_accept_failures (), 3); @@ -117,27 +114,23 @@ TEST (socket, max_connections_per_ip) ASSERT_FALSE (node->flags.disable_max_peers_per_ip); auto server_port = system.get_available_port (); - boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port }; const auto max_ip_connections = node->network_params.network.max_peers_per_ip; ASSERT_TRUE (max_ip_connections >= 1); const auto max_global_connections = 1000; - auto server_socket = std::make_shared (*node, listen_endpoint, max_global_connections); - boost::system::error_code ec; - server_socket->start (ec); - ASSERT_FALSE (ec); - - boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_socket->listening_port () }; - // successful incoming connections are stored in server_sockets to keep them alive (server side) std::vector> server_sockets; - server_socket->on_connection ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { + + auto listener = std::make_shared (server_port, *node, max_global_connections); + listener->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { server_sockets.push_back (new_connection); return true; }); + boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), listener->endpoint ().port () }; + // client side connection tracking std::atomic connection_attempts = 0; auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) { @@ -151,7 +144,7 @@ TEST (socket, max_connections_per_ip) for (auto idx = 0; idx < max_ip_connections + 1; ++idx) { - auto client = std::make_shared (*node); + auto client = std::make_shared (*node); client->async_connect (dst_endpoint, connect_handler); client_list.push_back (client); } @@ -206,13 +199,13 @@ TEST (socket, count_subnetwork_connections) auto address5 = boost::asio::ip::make_address ("a41d:b7b3::"); // out of the network prefix auto address6 = boost::asio::ip::make_address ("a41d:b7b3::1"); // out of the network prefix - auto connection0 = std::make_shared (*node); - auto connection1 = std::make_shared (*node); - auto connection2 = std::make_shared (*node); - auto connection3 = std::make_shared (*node); - auto connection4 = std::make_shared (*node); - auto connection5 = std::make_shared (*node); - auto connection6 = std::make_shared (*node); + auto connection0 = std::make_shared (*node); + auto connection1 = std::make_shared (*node); + auto connection2 = std::make_shared (*node); + auto connection3 = std::make_shared (*node); + auto connection4 = std::make_shared (*node); + auto connection5 = std::make_shared (*node); + auto connection6 = std::make_shared (*node); nano::transport::address_socket_mmap connections_per_address; connections_per_address.emplace (address0, connection0); @@ -247,20 +240,17 @@ TEST (socket, max_connections_per_subnetwork) const auto max_global_connections = 1000; - auto server_socket = std::make_shared (*node, listen_endpoint, max_global_connections); - boost::system::error_code ec; - server_socket->start (ec); - ASSERT_FALSE (ec); - - boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_socket->listening_port () }; - // successful incoming connections are stored in server_sockets to keep them alive (server side) std::vector> server_sockets; - server_socket->on_connection ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { + + auto listener = std::make_shared (server_port, *node, max_global_connections); + listener->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { server_sockets.push_back (new_connection); return true; }); + boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), listener->endpoint ().port () }; + // client side connection tracking std::atomic connection_attempts = 0; auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) { @@ -274,7 +264,7 @@ TEST (socket, max_connections_per_subnetwork) for (auto idx = 0; idx < max_subnetwork_connections + 1; ++idx) { - auto client = std::make_shared (*node); + auto client = std::make_shared (*node); client->async_connect (dst_endpoint, connect_handler); client_list.push_back (client); } @@ -304,27 +294,23 @@ TEST (socket, disabled_max_peers_per_ip) ASSERT_TRUE (node->flags.disable_max_peers_per_ip); auto server_port = system.get_available_port (); - boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port }; const auto max_ip_connections = node->network_params.network.max_peers_per_ip; ASSERT_TRUE (max_ip_connections >= 1); const auto max_global_connections = 1000; - auto server_socket = std::make_shared (*node, listen_endpoint, max_global_connections); - boost::system::error_code ec; - server_socket->start (ec); - ASSERT_FALSE (ec); - - boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_socket->listening_port () }; - // successful incoming connections are stored in server_sockets to keep them alive (server side) std::vector> server_sockets; - server_socket->on_connection ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { + + auto server_socket = std::make_shared (server_port, *node, max_global_connections); + server_socket->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { server_sockets.push_back (new_connection); return true; }); + boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_socket->endpoint ().port () }; + // client side connection tracking std::atomic connection_attempts = 0; auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) { @@ -338,7 +324,7 @@ TEST (socket, disabled_max_peers_per_ip) for (auto idx = 0; idx < max_ip_connections + 1; ++idx) { - auto client = std::make_shared (*node); + auto client = std::make_shared (*node); client->async_connect (dst_endpoint, connect_handler); client_list.push_back (client); } @@ -372,25 +358,21 @@ TEST (socket, disconnection_of_silent_connections) auto node = system.add_node (config); auto server_port = system.get_available_port (); - boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port }; - - // start a server listening socket - auto server_socket = std::make_shared (*node, listen_endpoint, 1); - boost::system::error_code ec; - server_socket->start (ec); - ASSERT_FALSE (ec); - - boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_socket->listening_port () }; // on a connection, a server data socket is created. The shared pointer guarantees the object's lifecycle until the end of this test. std::shared_ptr server_data_socket; - server_socket->on_connection ([&server_data_socket] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { + + // start a server listening socket + auto listener = std::make_shared (server_port, *node, 1); + listener->start ([&server_data_socket] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { server_data_socket = new_connection; return true; }); + boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), listener->endpoint ().port () }; + // Instantiates a client to simulate an incoming connection. - auto client_socket = std::make_shared (*node); + auto client_socket = std::make_shared (*node); std::atomic connected{ false }; // Opening a connection that will be closed because it remains silent during the tolerance time. client_socket->async_connect (dst_endpoint, [client_socket, &connected] (boost::system::error_code const & ec_a) { @@ -431,24 +413,18 @@ TEST (socket, drop_policy) auto func = [&] (size_t total_message_count, nano::transport::buffer_drop_policy drop_policy) { auto server_port (system.get_available_port ()); - boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::any (), server_port); - auto server_socket = std::make_shared (*node, endpoint, 1); - boost::system::error_code ec; - server_socket->start (ec); - ASSERT_FALSE (ec); - - // Accept connection, but don't read so the writer will drop. - server_socket->on_connection ([&connections] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { + auto listener = std::make_shared (server_port, *node, 1); + listener->start ([&connections] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { connections.push_back (new_connection); return true; }); - auto client = std::make_shared (*node); + auto client = std::make_shared (*node); nano::transport::channel_tcp channel{ *node, client }; nano::test::counted_completion write_completion (static_cast (total_message_count)); - client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), server_socket->listening_port ()), + client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), listener->endpoint ().port ()), [&channel, total_message_count, node, &write_completion, &drop_policy, client] (boost::system::error_code const & ec_a) mutable { for (int i = 0; i < total_message_count; i++) { @@ -524,14 +500,10 @@ TEST (socket, concurrent_writes) auto server_port (system.get_available_port ()); boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::any (), server_port); - auto server_socket = std::make_shared (*node, endpoint, max_connections); - boost::system::error_code ec; - server_socket->start (ec); - ASSERT_FALSE (ec); std::vector> connections; - // On every new connection, start reading data - server_socket->on_connection ([&connections, &reader] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { + auto listener = std::make_shared (server_port, *node, max_connections); + listener->start ([&connections, &reader] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { if (ec_a) { std::cerr << "on_connection: " << ec_a.message () << std::endl; @@ -549,9 +521,9 @@ TEST (socket, concurrent_writes) std::vector> clients; for (unsigned i = 0; i < client_count; i++) { - auto client = std::make_shared (*node); + auto client = std::make_shared (*node); clients.push_back (client); - client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), server_socket->listening_port ()), + client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), listener->endpoint ().port ()), [&connection_count_completion] (boost::system::error_code const & ec_a) { if (ec_a) { @@ -615,7 +587,7 @@ TEST (socket_timeout, connect) boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::make_address_v6 ("::ffff:10.255.254.253"), 1234); // create a client socket and try to connect to the IP address that wil not respond - auto socket = std::make_shared (*node); + auto socket = std::make_shared (*node); std::atomic done = false; boost::system::error_code ec; socket->async_connect (endpoint, [&ec, &done] (boost::system::error_code const & ec_a) { @@ -656,7 +628,7 @@ TEST (socket_timeout, read) }); // create a client socket to connect and call async_read, which should time out - auto socket = std::make_shared (*node); + auto socket = std::make_shared (*node); std::atomic done = false; boost::system::error_code ec; socket->async_connect (acceptor.local_endpoint (), [&socket, &ec, &done] (boost::system::error_code const & ec_a) { @@ -704,7 +676,7 @@ TEST (socket_timeout, write) // create a client socket and send lots of data to fill the socket queue on the local and remote side // eventually, the all tcp queues should fill up and async_write will not be able to progress // and the timeout should kick in and close the socket, which will cause the async_write to return an error - auto socket = std::make_shared (*node, 1024 * 64); // socket with a max queue size much larger than OS buffers + auto socket = std::make_shared (*node, nano::transport::socket::endpoint_type_t::client, 1024 * 64); // socket with a max queue size much larger than OS buffers std::atomic done = false; boost::system::error_code ec; socket->async_connect (acceptor.local_endpoint (), [&socket, &ec, &done] (boost::system::error_code const & ec_a) { @@ -759,7 +731,7 @@ TEST (socket_timeout, read_overlapped) }); // create a client socket to connect and call async_read twice, the second call should time out - auto socket = std::make_shared (*node); + auto socket = std::make_shared (*node); std::atomic done = false; boost::system::error_code ec; socket->async_connect (acceptor.local_endpoint (), [&socket, &ec, &done] (boost::system::error_code const & ec_a) { @@ -818,7 +790,7 @@ TEST (socket_timeout, write_overlapped) // create a client socket and send lots of data to fill the socket queue on the local and remote side // eventually, the all tcp queues should fill up and async_write will not be able to progress // and the timeout should kick in and close the socket, which will cause the async_write to return an error - auto socket = std::make_shared (*node, 1024 * 64); // socket with a max queue size much larger than OS buffers + auto socket = std::make_shared (*node, nano::transport::socket::endpoint_type_t::client, 1024 * 64); // socket with a max queue size much larger than OS buffers std::atomic done = false; boost::system::error_code ec; socket->async_connect (acceptor.local_endpoint (), [&socket, &ec, &done] (boost::system::error_code const & ec_a) { diff --git a/nano/node/bootstrap/bootstrap_connections.cpp b/nano/node/bootstrap/bootstrap_connections.cpp index 21f86262bf..6287add5a3 100644 --- a/nano/node/bootstrap/bootstrap_connections.cpp +++ b/nano/node/bootstrap/bootstrap_connections.cpp @@ -151,7 +151,7 @@ std::shared_ptr nano::bootstrap_connections::find_connec void nano::bootstrap_connections::connect_client (nano::tcp_endpoint const & endpoint_a, bool push_front) { ++connections_count; - auto socket (std::make_shared (node)); + auto socket (std::make_shared (node)); auto this_l (shared_from_this ()); socket->async_connect (endpoint_a, [this_l, socket, endpoint_a, push_front] (boost::system::error_code const & ec) { diff --git a/nano/node/node.cpp b/nano/node/node.cpp index fb14c3c81c..7140fa38b2 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -171,7 +171,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons // Thus, be very careful if you change the order: if `bootstrap` gets constructed before `network`, // the latter would inherit the port from the former (if TCP is active, otherwise `network` picks first) // - tcp_listener (network.port, *this), + tcp_listener{ std::make_shared (network.port, *this, config.tcp_incoming_connections_max) }, application_path (application_path_a), port_mapping (*this), rep_crawler (*this), @@ -553,7 +553,7 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.ledger, "ledger")); composite->add_component (collect_container_info (node.active, "active")); composite->add_component (collect_container_info (node.bootstrap_initiator, "bootstrap_initiator")); - composite->add_component (collect_container_info (node.tcp_listener, "tcp_listener")); + composite->add_component (collect_container_info (*node.tcp_listener, "tcp_listener")); composite->add_component (collect_container_info (node.network, "network")); composite->add_component (node.telemetry.collect_container_info ("telemetry")); composite->add_component (collect_container_info (node.workers, "workers")); @@ -646,12 +646,18 @@ void nano::node::start () bool tcp_enabled = false; if (config.tcp_incoming_connections_max > 0 && !(flags.disable_bootstrap_listener && flags.disable_tcp_realtime)) { - tcp_listener.start (); + tcp_listener->start ([this] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { + if (!ec_a) + { + tcp_listener->accept_action (ec_a, new_connection); + } + return true; + }); tcp_enabled = true; - if (network.port != tcp_listener.port) + if (network.port != tcp_listener->endpoint ().port ()) { - network.port = tcp_listener.port; + network.port = tcp_listener->endpoint ().port (); } logger.always_log (boost::str (boost::format ("Node started with peering port `%1%`.") % network.port)); @@ -725,7 +731,7 @@ void nano::node::stop () websocket.stop (); bootstrap_server.stop (); bootstrap_initiator.stop (); - tcp_listener.stop (); + tcp_listener->stop (); port_mapping.stop (); wallets.stop (); stats.stop (); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index e396f20a0c..4e7b567444 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -163,7 +163,7 @@ class node final : public std::enable_shared_from_this nano::telemetry telemetry; nano::bootstrap_initiator bootstrap_initiator; nano::bootstrap_server bootstrap_server; - nano::transport::tcp_listener tcp_listener; + std::shared_ptr tcp_listener; std::filesystem::path application_path; nano::node_observers observers; nano::port_mapping port_mapping; diff --git a/nano/node/transport/socket.cpp b/nano/node/transport/socket.cpp index 629fe3b5fa..864c140a73 100644 --- a/nano/node/transport/socket.cpp +++ b/nano/node/transport/socket.cpp @@ -14,25 +14,6 @@ #include #include -namespace -{ -bool is_temporary_error (boost::system::error_code const & ec_a) -{ - switch (ec_a.value ()) - { -#if EAGAIN != EWOULDBLOCK - case EAGAIN: -#endif - - case EWOULDBLOCK: - case EINTR: - return true; - default: - return false; - } -} -} - /* * socket */ @@ -437,48 +418,6 @@ bool nano::transport::socket::write_queue::empty () const }); } -/* - * server_socket - */ - -nano::transport::server_socket::server_socket (nano::node & node_a, boost::asio::ip::tcp::endpoint local_a, std::size_t max_connections_a) : - socket{ node_a, endpoint_type_t::server }, - acceptor{ node_a.io_ctx }, - local{ std::move (local_a) }, - max_inbound_connections{ max_connections_a } -{ - default_timeout = std::chrono::seconds::max (); -} - -void nano::transport::server_socket::start (boost::system::error_code & ec_a) -{ - acceptor.open (local.protocol ()); - acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); - acceptor.bind (local, ec_a); - if (!ec_a) - { - acceptor.listen (boost::asio::socket_base::max_listen_connections, ec_a); - } -} - -void nano::transport::server_socket::close () -{ - auto this_l (std::static_pointer_cast (shared_from_this ())); - - boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l] () { - this_l->close_internal (); - this_l->acceptor.close (); - for (auto & address_connection_pair : this_l->connections_per_address) - { - if (auto connection_l = address_connection_pair.second.lock ()) - { - connection_l->close (); - } - } - this_l->connections_per_address.clear (); - })); -} - boost::asio::ip::network_v6 nano::transport::socket_functions::get_ipv6_subnet_address (boost::asio::ip::address_v6 const & ip_address, std::size_t network_prefix) { return boost::asio::ip::make_network_v6 (ip_address, static_cast (network_prefix)); @@ -514,156 +453,6 @@ std::size_t network_prefix) return counted_connections; } -bool nano::transport::server_socket::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection) -{ - debug_assert (strand.running_in_this_thread ()); - if (node.flags.disable_max_peers_per_subnetwork || nano::transport::is_ipv4_or_v4_mapped_address (new_connection->remote.address ())) - { - // If the limit is disabled, then it is unreachable. - // If the address is IPv4 we don't check for a network limit, since its address space isn't big as IPv6 /64. - return false; - } - auto const counted_connections = socket_functions::count_subnetwork_connections ( - connections_per_address, - new_connection->remote.address ().to_v6 (), - node.network_params.network.ipv6_subnetwork_prefix_for_limiting); - return counted_connections >= node.network_params.network.max_peers_per_subnetwork; -} - -bool nano::transport::server_socket::limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection) -{ - debug_assert (strand.running_in_this_thread ()); - if (node.flags.disable_max_peers_per_ip) - { - // If the limit is disabled, then it is unreachable. - return false; - } - auto const address_connections_range = connections_per_address.equal_range (new_connection->remote.address ()); - auto const counted_connections = static_cast (std::abs (std::distance (address_connections_range.first, address_connections_range.second))); - return counted_connections >= node.network_params.network.max_peers_per_ip; -} - -void nano::transport::server_socket::on_connection (std::function const &, boost::system::error_code const &)> callback_a) -{ - auto this_l (std::static_pointer_cast (shared_from_this ())); - - boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l, callback = std::move (callback_a)] () mutable { - if (!this_l->acceptor.is_open ()) - { - this_l->node.logger.always_log ("Network: Acceptor is not open"); - return; - } - - // Prepare new connection - auto new_connection = std::make_shared (this_l->node, endpoint_type_t::server); - this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote, - boost::asio::bind_executor (this_l->strand, - [this_l, new_connection, cbk = std::move (callback)] (boost::system::error_code const & ec_a) mutable { - this_l->evict_dead_connections (); - - if (this_l->connections_per_address.size () >= this_l->max_inbound_connections) - { - this_l->node.logger.try_log ("Network: max_inbound_connections reached, unable to open new connection"); - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - if (this_l->limit_reached_for_incoming_ip_connections (new_connection)) - { - auto const remote_ip_address = new_connection->remote_endpoint ().address (); - auto const log_message = boost::str ( - boost::format ("Network: max connections per IP (max_peers_per_ip) was reached for %1%, unable to open new connection") - % remote_ip_address.to_string ()); - this_l->node.logger.try_log (log_message); - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in); - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - if (this_l->limit_reached_for_incoming_subnetwork_connections (new_connection)) - { - auto const remote_ip_address = new_connection->remote_endpoint ().address (); - debug_assert (remote_ip_address.is_v6 ()); - auto const remote_subnet = socket_functions::get_ipv6_subnet_address (remote_ip_address.to_v6 (), this_l->node.network_params.network.max_peers_per_subnetwork); - auto const log_message = boost::str ( - boost::format ("Network: max connections per subnetwork (max_peers_per_subnetwork) was reached for subnetwork %1% (remote IP: %2%), unable to open new connection") - % remote_subnet.canonical ().to_string () - % remote_ip_address.to_string ()); - this_l->node.logger.try_log (log_message); - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in); - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - if (!ec_a) - { - // Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start - // an IO operation immediately, which will start a timer. - new_connection->start (); - new_connection->set_timeout (this_l->node.network_params.network.idle_timeout); - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); - this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection); - this_l->node.observers.socket_accepted.notify (*new_connection); - if (cbk (new_connection, ec_a)) - { - this_l->on_connection (std::move (cbk)); - return; - } - this_l->node.logger.always_log ("Network: Stopping to accept connections"); - return; - } - - // accept error - this_l->node.logger.try_log ("Network: Unable to accept connection: ", ec_a.message ()); - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); - - if (is_temporary_error (ec_a)) - { - // if it is a temporary error, just retry it - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - // if it is not a temporary error, check how the listener wants to handle this error - if (cbk (new_connection, ec_a)) - { - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - // No requeue if we reach here, no incoming socket connections will be handled - this_l->node.logger.always_log ("Network: Stopping to accept connections"); - })); - })); -} - -// If we are unable to accept a socket, for any reason, we wait just a little (1ms) before rescheduling the next connection accept. -// The intention is to throttle back the connection requests and break up any busy loops that could possibly form and -// give the rest of the system a chance to recover. -void nano::transport::server_socket::on_connection_requeue_delayed (std::function const &, boost::system::error_code const &)> callback_a) -{ - auto this_l (std::static_pointer_cast (shared_from_this ())); - node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [this_l, callback = std::move (callback_a)] () mutable { - this_l->on_connection (std::move (callback)); - }); -} - -// This must be called from a strand -void nano::transport::server_socket::evict_dead_connections () -{ - debug_assert (strand.running_in_this_thread ()); - for (auto it = connections_per_address.begin (); it != connections_per_address.end ();) - { - if (it->second.expired ()) - { - it = connections_per_address.erase (it); - continue; - } - ++it; - } -} - std::string nano::transport::socket_type_to_string (nano::transport::socket::type_t type) { switch (type) diff --git a/nano/node/transport/socket.hpp b/nano/node/transport/socket.hpp index d0afec196b..b2593b1ba2 100644 --- a/nano/node/transport/socket.hpp +++ b/nano/node/transport/socket.hpp @@ -38,14 +38,12 @@ enum class buffer_drop_policy no_socket_drop }; -class server_socket; - /** Socket class for tcp clients and newly accepted connections */ -class socket : public std::enable_shared_from_this +class socket final : public std::enable_shared_from_this { - friend class server_socket; friend class tcp_server; friend class tcp_channels; + friend class tcp_listener; public: static std::size_t constexpr default_max_queue_size = 128; @@ -69,8 +67,8 @@ class socket : public std::enable_shared_from_this * @param node Owning node * @param endpoint_type_a The endpoint's type: either server or client */ - explicit socket (nano::node & node, endpoint_type_t endpoint_type_a, std::size_t max_queue_size = default_max_queue_size); - virtual ~socket (); + explicit socket (nano::node & node, endpoint_type_t endpoint_type_a = endpoint_type_t::client, std::size_t max_queue_size = default_max_queue_size); + ~socket (); void start (); @@ -78,7 +76,7 @@ class socket : public std::enable_shared_from_this void async_read (std::shared_ptr> const &, std::size_t, std::function); void async_write (nano::shared_const_buffer const &, std::function callback = {}, nano::transport::traffic_type = nano::transport::traffic_type::generic); - virtual void close (); + void close (); boost::asio::ip::tcp::endpoint remote_endpoint () const; boost::asio::ip::tcp::endpoint local_endpoint () const; /** Returns true if the socket has timed out */ @@ -220,52 +218,4 @@ namespace socket_functions boost::asio::ip::address last_ipv6_subnet_address (boost::asio::ip::address_v6 const &, std::size_t); std::size_t count_subnetwork_connections (nano::transport::address_socket_mmap const &, boost::asio::ip::address_v6 const &, std::size_t); } - -/** Socket class for TCP servers */ -class server_socket final : public socket -{ -public: - /** - * Constructor - * @param node_a Owning node - * @param local_a Address and port to listen on - * @param max_connections_a Maximum number of concurrent connections - */ - explicit server_socket (nano::node & node_a, boost::asio::ip::tcp::endpoint local_a, std::size_t max_connections_a); - /**Start accepting new connections */ - void start (boost::system::error_code &); - /** Stop accepting new connections */ - void close () override; - /** Register callback for new connections. The callback must return true to keep accepting new connections. */ - void on_connection (std::function const & new_connection, boost::system::error_code const &)>); - uint16_t listening_port () - { - return acceptor.local_endpoint ().port (); - } - -private: - nano::transport::address_socket_mmap connections_per_address; - boost::asio::ip::tcp::acceptor acceptor; - boost::asio::ip::tcp::endpoint local; - std::size_t max_inbound_connections; - void evict_dead_connections (); - void on_connection_requeue_delayed (std::function const & new_connection, boost::system::error_code const &)>); - /** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */ - bool limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection); - bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection); -}; - -/** Socket class for TCP clients */ -class client_socket final : public socket -{ -public: - /** - * Constructor - * @param node_a Owning node - */ - explicit client_socket (nano::node & node_a, std::size_t max_queue_size = default_max_queue_size) : - socket{ node_a, endpoint_type_t::client, max_queue_size } - { - } -}; } diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index c62bf3dbed..543b13b5ea 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -541,7 +541,7 @@ void nano::transport::tcp_channels::update (nano::tcp_endpoint const & endpoint_ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a) { - auto socket = std::make_shared (node); + auto socket = std::make_shared (node); std::weak_ptr socket_w (socket); auto channel (std::make_shared (node, socket_w)); std::weak_ptr node_w (node.shared ()); diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index e9ea664a87..57222dc365 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -10,59 +10,57 @@ #include +namespace +{ +bool is_temporary_error (boost::system::error_code const & ec_a) +{ + switch (ec_a.value ()) + { +#if EAGAIN != EWOULDBLOCK + case EAGAIN: +#endif + + case EWOULDBLOCK: + case EINTR: + return true; + default: + return false; + } +} +} + /* * tcp_listener */ -nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a) : +nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a, std::size_t max_inbound_connections) : node (node_a), - port (port_a) + strand{ node_a.io_ctx.get_executor () }, + acceptor{ node_a.io_ctx }, + local{ boost::asio::ip::tcp::endpoint{ boost::asio::ip::address_v6::any (), port_a } }, + max_inbound_connections{ max_inbound_connections } { } -void nano::transport::tcp_listener::start () +void nano::transport::tcp_listener::start (std::function const &, boost::system::error_code const &)> callback_a) { nano::lock_guard lock{ mutex }; on = true; - listening_socket = std::make_shared (node, boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::any (), port), node.config.tcp_incoming_connections_max); + acceptor.open (local.protocol ()); + acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); boost::system::error_code ec; - listening_socket->start (ec); - if (ec) + acceptor.bind (local, ec); + if (!ec) { - node.logger.always_log (boost::str (boost::format ("Network: Error while binding for incoming TCP/bootstrap on port %1%: %2%") % listening_socket->listening_port () % ec.message ())); - throw std::runtime_error (ec.message ()); + acceptor.listen (boost::asio::socket_base::max_listen_connections, ec); } - - // the user can either specify a port value in the config or it can leave the choice up to the OS: - // (1): port specified - // (2): port not specified - // - const auto listening_port = listening_socket->listening_port (); + if (ec) { - // (1) -- nothing to do, just check that port values match everywhere - // - if (port == listening_port) - { - debug_assert (port == node.network.port); - debug_assert (port == node.network.endpoint ().port ()); - } - // (2) -- OS port choice happened at TCP socket bind time, so propagate this port value back; - // the propagation is done here for the `tcp_listener` itself, whereas for `network`, the node does it - // after calling `tcp_listener.start ()` - // - else - { - port = listening_port; - } + node.logger.always_log (boost::str (boost::format ("Network: Error while binding for incoming TCP/bootstrap on port %1%: %2%") % acceptor.local_endpoint ().port () % ec.message ())); + throw std::runtime_error (ec.message ()); } - listening_socket->on_connection ([this] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { - if (!ec_a) - { - accept_action (ec_a, new_connection); - } - return true; - }); + on_connection (callback_a); } void nano::transport::tcp_listener::stop () @@ -73,12 +71,18 @@ void nano::transport::tcp_listener::stop () on = false; connections_l.swap (connections); } - if (listening_socket) - { - nano::lock_guard lock{ mutex }; - listening_socket->close (); - listening_socket = nullptr; - } + nano::lock_guard lock{ mutex }; + boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this ()] () { + this_l->acceptor.close (); + for (auto & address_connection_pair : this_l->connections_per_address) + { + if (auto connection_l = address_connection_pair.second.lock ()) + { + connection_l->close (); + } + } + this_l->connections_per_address.clear (); + })); } std::size_t nano::transport::tcp_listener::connection_count () @@ -87,6 +91,153 @@ std::size_t nano::transport::tcp_listener::connection_count () return connections.size (); } +bool nano::transport::tcp_listener::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection) +{ + debug_assert (strand.running_in_this_thread ()); + if (node.flags.disable_max_peers_per_subnetwork || nano::transport::is_ipv4_or_v4_mapped_address (new_connection->remote.address ())) + { + // If the limit is disabled, then it is unreachable. + // If the address is IPv4 we don't check for a network limit, since its address space isn't big as IPv6 /64. + return false; + } + auto const counted_connections = socket_functions::count_subnetwork_connections ( + connections_per_address, + new_connection->remote.address ().to_v6 (), + node.network_params.network.ipv6_subnetwork_prefix_for_limiting); + return counted_connections >= node.network_params.network.max_peers_per_subnetwork; +} + +bool nano::transport::tcp_listener::limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection) +{ + debug_assert (strand.running_in_this_thread ()); + if (node.flags.disable_max_peers_per_ip) + { + // If the limit is disabled, then it is unreachable. + return false; + } + auto const address_connections_range = connections_per_address.equal_range (new_connection->remote.address ()); + auto const counted_connections = static_cast (std::abs (std::distance (address_connections_range.first, address_connections_range.second))); + return counted_connections >= node.network_params.network.max_peers_per_ip; +} + +void nano::transport::tcp_listener::on_connection (std::function const &, boost::system::error_code const &)> callback_a) +{ + boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable { + if (!this_l->acceptor.is_open ()) + { + this_l->node.logger.always_log ("Network: Acceptor is not open"); + return; + } + + // Prepare new connection + auto new_connection = std::make_shared (this_l->node, socket::endpoint_type_t::server); + this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote, + boost::asio::bind_executor (this_l->strand, + [this_l, new_connection, cbk = std::move (callback)] (boost::system::error_code const & ec_a) mutable { + this_l->evict_dead_connections (); + + if (this_l->connections_per_address.size () >= this_l->max_inbound_connections) + { + this_l->node.logger.try_log ("Network: max_inbound_connections reached, unable to open new connection"); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + if (this_l->limit_reached_for_incoming_ip_connections (new_connection)) + { + auto const remote_ip_address = new_connection->remote_endpoint ().address (); + auto const log_message = boost::str ( + boost::format ("Network: max connections per IP (max_peers_per_ip) was reached for %1%, unable to open new connection") + % remote_ip_address.to_string ()); + this_l->node.logger.try_log (log_message); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in); + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + if (this_l->limit_reached_for_incoming_subnetwork_connections (new_connection)) + { + auto const remote_ip_address = new_connection->remote_endpoint ().address (); + debug_assert (remote_ip_address.is_v6 ()); + auto const remote_subnet = socket_functions::get_ipv6_subnet_address (remote_ip_address.to_v6 (), this_l->node.network_params.network.max_peers_per_subnetwork); + auto const log_message = boost::str ( + boost::format ("Network: max connections per subnetwork (max_peers_per_subnetwork) was reached for subnetwork %1% (remote IP: %2%), unable to open new connection") + % remote_subnet.canonical ().to_string () + % remote_ip_address.to_string ()); + this_l->node.logger.try_log (log_message); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in); + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + if (!ec_a) + { + // Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start + // an IO operation immediately, which will start a timer. + new_connection->start (); + new_connection->set_timeout (this_l->node.network_params.network.idle_timeout); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); + this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection); + this_l->node.observers.socket_accepted.notify (*new_connection); + if (cbk (new_connection, ec_a)) + { + this_l->on_connection (std::move (cbk)); + return; + } + this_l->node.logger.always_log ("Network: Stopping to accept connections"); + return; + } + + // accept error + this_l->node.logger.try_log ("Network: Unable to accept connection: ", ec_a.message ()); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); + + if (is_temporary_error (ec_a)) + { + // if it is a temporary error, just retry it + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + // if it is not a temporary error, check how the listener wants to handle this error + if (cbk (new_connection, ec_a)) + { + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + // No requeue if we reach here, no incoming socket connections will be handled + this_l->node.logger.always_log ("Network: Stopping to accept connections"); + })); + })); +} + +// If we are unable to accept a socket, for any reason, we wait just a little (1ms) before rescheduling the next connection accept. +// The intention is to throttle back the connection requests and break up any busy loops that could possibly form and +// give the rest of the system a chance to recover. +void nano::transport::tcp_listener::on_connection_requeue_delayed (std::function const &, boost::system::error_code const &)> callback_a) +{ + node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable { + this_l->on_connection (std::move (callback)); + }); +} + +// This must be called from a strand +void nano::transport::tcp_listener::evict_dead_connections () +{ + debug_assert (strand.running_in_this_thread ()); + for (auto it = connections_per_address.begin (); it != connections_per_address.end ();) + { + if (it->second.expired ()) + { + it = connections_per_address.erase (it); + continue; + } + ++it; + } +} + void nano::transport::tcp_listener::accept_action (boost::system::error_code const & ec, std::shared_ptr const & socket_a) { if (!node.network.excluded_peers.check (socket_a->remote_endpoint ())) @@ -109,9 +260,9 @@ void nano::transport::tcp_listener::accept_action (boost::system::error_code con boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () { nano::lock_guard lock{ mutex }; - if (on && listening_socket) + if (on) { - return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), port); + return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port ()); } else { @@ -160,11 +311,11 @@ nano::transport::tcp_server::~tcp_server () if (socket->type () == nano::transport::socket::type_t::bootstrap) { - --node->tcp_listener.bootstrap_count; + --node->tcp_listener->bootstrap_count; } else if (socket->type () == nano::transport::socket::type_t::realtime) { - --node->tcp_listener.realtime_count; + --node->tcp_listener->realtime_count; // Clear temporary channel auto exisiting_response_channel (node->network.tcp_channels.find_channel (remote_endpoint)); @@ -177,8 +328,8 @@ nano::transport::tcp_server::~tcp_server () stop (); - nano::lock_guard lock{ node->tcp_listener.mutex }; - node->tcp_listener.connections.erase (this); + nano::lock_guard lock{ node->tcp_listener->mutex }; + node->tcp_listener->connections.erase (this); } void nano::transport::tcp_server::start () @@ -636,8 +787,8 @@ void nano::transport::tcp_server::timeout () node->logger.try_log ("Closing incoming tcp / bootstrap server by timeout"); } { - nano::lock_guard lock{ node->tcp_listener.mutex }; - node->tcp_listener.connections.erase (this); + nano::lock_guard lock{ node->tcp_listener->mutex }; + node->tcp_listener->connections.erase (this); } socket->close (); } @@ -658,7 +809,7 @@ bool nano::transport::tcp_server::to_bootstrap_connection () { return false; } - if (node->tcp_listener.bootstrap_count >= node->config.bootstrap_connections_max) + if (node->tcp_listener->bootstrap_count >= node->config.bootstrap_connections_max) { return false; } @@ -667,7 +818,7 @@ bool nano::transport::tcp_server::to_bootstrap_connection () return false; } - ++node->tcp_listener.bootstrap_count; + ++node->tcp_listener->bootstrap_count; socket->type_set (nano::transport::socket::type_t::bootstrap); return true; } @@ -682,7 +833,7 @@ bool nano::transport::tcp_server::to_realtime_connection (nano::account const & if (socket->type () == nano::transport::socket::type_t::undefined && !node->flags.disable_tcp_realtime) { remote_node_id = node_id; - ++node->tcp_listener.realtime_count; + ++node->tcp_listener->realtime_count; socket->type_set (nano::transport::socket::type_t::realtime); return true; } diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 2cb0fb39b2..e011c50630 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -19,11 +19,11 @@ class tcp_server; /** * Server side portion of bootstrap sessions. Listens for new socket connections and spawns tcp_server objects when connected. */ -class tcp_listener final +class tcp_listener final : public std::enable_shared_from_this { public: - tcp_listener (uint16_t, nano::node &); - void start (); + tcp_listener (uint16_t, nano::node &, std::size_t); + void start (std::function const &, boost::system::error_code const &)> callback_a); void stop (); void accept_action (boost::system::error_code const &, std::shared_ptr const &); std::size_t connection_count (); @@ -32,11 +32,22 @@ class tcp_listener final std::unordered_map> connections; nano::tcp_endpoint endpoint (); nano::node & node; - std::shared_ptr listening_socket; bool on{ false }; std::atomic bootstrap_count{ 0 }; std::atomic realtime_count{ 0 }; - uint16_t port; + +private: + boost::asio::strand strand; + nano::transport::address_socket_mmap connections_per_address; + boost::asio::ip::tcp::acceptor acceptor; + boost::asio::ip::tcp::endpoint local; + std::size_t max_inbound_connections; + void on_connection (std::function const &, boost::system::error_code const &)> callback_a); + void evict_dead_connections (); + void on_connection_requeue_delayed (std::function const & new_connection, boost::system::error_code const &)>); + /** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */ + bool limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection); + bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection); }; std::unique_ptr collect_container_info (tcp_listener & bootstrap_listener, std::string const & name); diff --git a/nano/test_common/system.cpp b/nano/test_common/system.cpp index cb0e1974cb..b0f2ff1dfc 100644 --- a/nano/test_common/system.cpp +++ b/nano/test_common/system.cpp @@ -64,8 +64,8 @@ std::shared_ptr nano::test::system::add_node (nano::node_config cons auto starting_size_1 = node1->network.size (); auto starting_size_2 = node2->network.size (); - auto starting_realtime_1 = node1->tcp_listener.realtime_count.load (); - auto starting_realtime_2 = node2->tcp_listener.realtime_count.load (); + auto starting_realtime_1 = node1->tcp_listener->realtime_count.load (); + auto starting_realtime_2 = node2->tcp_listener->realtime_count.load (); auto starting_keepalives_1 = node1->stats.count (stat::type::message, stat::detail::keepalive, stat::dir::in); auto starting_keepalives_2 = node2->stats.count (stat::type::message, stat::detail::keepalive, stat::dir::in); @@ -88,8 +88,8 @@ std::shared_ptr nano::test::system::add_node (nano::node_config cons { // Wait for initial connection finish auto ec = poll_until_true (5s, [&node1, &node2, starting_realtime_1, starting_realtime_2] () { - auto realtime_1 = node1->tcp_listener.realtime_count.load (); - auto realtime_2 = node2->tcp_listener.realtime_count.load (); + auto realtime_1 = node1->tcp_listener->realtime_count.load (); + auto realtime_2 = node2->tcp_listener->realtime_count.load (); return realtime_1 > starting_realtime_1 && realtime_2 > starting_realtime_2; }); debug_assert (!ec);