Skip to content

Commit

Permalink
Merge server_socket in to tcp_listener since it's only purpose was to…
Browse files Browse the repository at this point in the history
… attach a boost tcp acceptor to the socket class. It made no use of socket functionality.
  • Loading branch information
clemahieu committed Jan 9, 2024
1 parent b2b4232 commit 335d5bc
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 392 deletions.
24 changes: 12 additions & 12 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ TEST (network, construction_with_specified_port)
auto const node = system.add_node (nano::node_config{ port, system.logging });
EXPECT_EQ (port, node->network.port);
EXPECT_EQ (port, node->network.endpoint ().port ());
EXPECT_EQ (port, node->tcp_listener.endpoint ().port ());
EXPECT_EQ (port, node->tcp_listener->endpoint ().port ());
}

TEST (network, construction_without_specified_port)
Expand All @@ -71,7 +71,7 @@ TEST (network, construction_without_specified_port)
auto const port = node->network.port.load ();
EXPECT_NE (0, port);
EXPECT_EQ (port, node->network.endpoint ().port ());
EXPECT_EQ (port, node->tcp_listener.endpoint ().port ());
EXPECT_EQ (port, node->tcp_listener->endpoint ().port ());
}

TEST (network, send_node_id_handshake_tcp)
Expand Down Expand Up @@ -647,7 +647,7 @@ TEST (tcp_listener, tcp_node_id_handshake)
{
nano::test::system system (1);
auto socket (std::make_shared<nano::transport::client_socket> (*system.nodes[0]));
auto bootstrap_endpoint (system.nodes[0]->tcp_listener.endpoint ());
auto bootstrap_endpoint (system.nodes[0]->tcp_listener->endpoint ());
auto cookie (system.nodes[0]->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (bootstrap_endpoint)));
ASSERT_TRUE (cookie);
nano::node_id_handshake::query_payload query{ *cookie };
Expand Down Expand Up @@ -686,7 +686,7 @@ TEST (tcp_listener, DISABLED_tcp_listener_timeout_empty)
auto node0 (system.nodes[0]);
auto socket (std::make_shared<nano::transport::client_socket> (*node0));
std::atomic<bool> connected (false);
socket->async_connect (node0->tcp_listener.endpoint (), [&connected] (boost::system::error_code const & ec) {
socket->async_connect (node0->tcp_listener->endpoint (), [&connected] (boost::system::error_code const & ec) {
ASSERT_FALSE (ec);
connected = true;
});
Expand All @@ -696,8 +696,8 @@ TEST (tcp_listener, DISABLED_tcp_listener_timeout_empty)
while (!disconnected)
{
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener.mutex);
disconnected = node0->tcp_listener.connections.empty ();
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
disconnected = node0->tcp_listener->connections.empty ();
}
ASSERT_NO_ERROR (system.poll ());
}
Expand All @@ -708,29 +708,29 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake)
nano::test::system system (1);
auto node0 (system.nodes[0]);
auto socket (std::make_shared<nano::transport::client_socket> (*node0));
auto cookie (node0->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (node0->tcp_listener.endpoint ())));
auto cookie (node0->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (node0->tcp_listener->endpoint ())));
ASSERT_TRUE (cookie);
nano::node_id_handshake::query_payload query{ *cookie };
nano::node_id_handshake node_id_handshake{ nano::dev::network_params.network, query };
auto channel = std::make_shared<nano::transport::channel_tcp> (*node0, socket);
socket->async_connect (node0->tcp_listener.endpoint (), [&node_id_handshake, channel] (boost::system::error_code const & ec) {
socket->async_connect (node0->tcp_listener->endpoint (), [&node_id_handshake, channel] (boost::system::error_code const & ec) {
ASSERT_FALSE (ec);
channel->send (node_id_handshake, [] (boost::system::error_code const & ec, size_t size_a) {
ASSERT_FALSE (ec);
});
});
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake) != 0);
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener.mutex);
ASSERT_EQ (node0->tcp_listener.connections.size (), 1);
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
ASSERT_EQ (node0->tcp_listener->connections.size (), 1);
}
bool disconnected (false);
system.deadline_set (std::chrono::seconds (20));
while (!disconnected)
{
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener.mutex);
disconnected = node0->tcp_listener.connections.empty ();
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
disconnected = node0->tcp_listener->connections.empty ();
}
ASSERT_NO_ERROR (system.poll ());
}
Expand Down
2 changes: 1 addition & 1 deletion nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3184,7 +3184,7 @@ TEST (node, peers)
node2->start ();
ASSERT_TIMELY (10s, !node2->network.empty () && !node1->network.empty ())
// Wait to finish TCP node ID handshakes
ASSERT_TIMELY (10s, node1->tcp_listener.realtime_count != 0 && node2->tcp_listener.realtime_count != 0);
ASSERT_TIMELY (10s, node1->tcp_listener->realtime_count != 0 && node2->tcp_listener->realtime_count != 0);
// Confirm that the peers match with the endpoints we are expecting
ASSERT_EQ (1, node1->network.size ());
auto list1 (node1->network.list (2));
Expand Down
95 changes: 33 additions & 62 deletions nano/core_test/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,19 @@ TEST (socket, max_connections)
auto node = system.add_node ();

auto server_port = system.get_available_port ();
boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port };

// start a server socket that allows max 2 live connections
auto server_socket = std::make_shared<nano::transport::server_socket> (*node, listen_endpoint, 2);
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);

boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_socket->listening_port () };

// successful incoming connections are stored in server_sockets to keep them alive (server side)
std::vector<std::shared_ptr<nano::transport::socket>> server_sockets;
server_socket->on_connection ([&server_sockets] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {

// start a server socket that allows max 2 live connections
auto listener = std::make_shared<nano::transport::tcp_listener> (server_port, *node, 2);
listener->start ([&server_sockets] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
server_sockets.push_back (new_connection);
return true;
});

boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), listener->endpoint ().port () };

// client side connection tracking
std::atomic<size_t> connection_attempts = 0;
auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) {
Expand Down Expand Up @@ -118,27 +114,23 @@ TEST (socket, max_connections_per_ip)
ASSERT_FALSE (node->flags.disable_max_peers_per_ip);

auto server_port = system.get_available_port ();
boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port };

const auto max_ip_connections = node->network_params.network.max_peers_per_ip;
ASSERT_TRUE (max_ip_connections >= 1);

const auto max_global_connections = 1000;

auto server_socket = std::make_shared<nano::transport::server_socket> (*node, listen_endpoint, max_global_connections);
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);

boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_socket->listening_port () };

// successful incoming connections are stored in server_sockets to keep them alive (server side)
std::vector<std::shared_ptr<nano::transport::socket>> server_sockets;
server_socket->on_connection ([&server_sockets] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {

auto listener = std::make_shared<nano::transport::tcp_listener> (server_port, *node, max_global_connections);
listener->start ([&server_sockets] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
server_sockets.push_back (new_connection);
return true;
});

boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), listener->endpoint ().port () };

// client side connection tracking
std::atomic<size_t> connection_attempts = 0;
auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) {
Expand Down Expand Up @@ -248,20 +240,17 @@ TEST (socket, max_connections_per_subnetwork)

const auto max_global_connections = 1000;

auto server_socket = std::make_shared<nano::transport::server_socket> (*node, listen_endpoint, max_global_connections);
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);

boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_socket->listening_port () };

// successful incoming connections are stored in server_sockets to keep them alive (server side)
std::vector<std::shared_ptr<nano::transport::socket>> server_sockets;
server_socket->on_connection ([&server_sockets] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {

auto listener = std::make_shared<nano::transport::tcp_listener> (server_port, *node, max_global_connections);
listener->start ([&server_sockets] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
server_sockets.push_back (new_connection);
return true;
});

boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), listener->endpoint ().port () };

// client side connection tracking
std::atomic<size_t> connection_attempts = 0;
auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) {
Expand Down Expand Up @@ -305,27 +294,23 @@ TEST (socket, disabled_max_peers_per_ip)
ASSERT_TRUE (node->flags.disable_max_peers_per_ip);

auto server_port = system.get_available_port ();
boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port };

const auto max_ip_connections = node->network_params.network.max_peers_per_ip;
ASSERT_TRUE (max_ip_connections >= 1);

const auto max_global_connections = 1000;

auto server_socket = std::make_shared<nano::transport::server_socket> (*node, listen_endpoint, max_global_connections);
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);

boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_socket->listening_port () };

// successful incoming connections are stored in server_sockets to keep them alive (server side)
std::vector<std::shared_ptr<nano::transport::socket>> server_sockets;
server_socket->on_connection ([&server_sockets] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {

auto server_socket = std::make_shared<nano::transport::tcp_listener> (server_port, *node, max_global_connections);
server_socket->start ([&server_sockets] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
server_sockets.push_back (new_connection);
return true;
});

boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_socket->endpoint ().port () };

// client side connection tracking
std::atomic<size_t> connection_attempts = 0;
auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) {
Expand Down Expand Up @@ -373,23 +358,19 @@ TEST (socket, disconnection_of_silent_connections)
auto node = system.add_node (config);

auto server_port = system.get_available_port ();
boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port };

// start a server listening socket
auto server_socket = std::make_shared<nano::transport::server_socket> (*node, listen_endpoint, 1);
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);

boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_socket->listening_port () };

// on a connection, a server data socket is created. The shared pointer guarantees the object's lifecycle until the end of this test.
std::shared_ptr<nano::transport::socket> server_data_socket;
server_socket->on_connection ([&server_data_socket] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {

// start a server listening socket
auto listener = std::make_shared<nano::transport::tcp_listener> (server_port, *node, 1);
listener->start ([&server_data_socket] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
server_data_socket = new_connection;
return true;
});

boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), listener->endpoint ().port () };

// Instantiates a client to simulate an incoming connection.
auto client_socket = std::make_shared<nano::transport::client_socket> (*node);
std::atomic<bool> connected{ false };
Expand Down Expand Up @@ -432,15 +413,9 @@ TEST (socket, drop_policy)

auto func = [&] (size_t total_message_count, nano::transport::buffer_drop_policy drop_policy) {
auto server_port (system.get_available_port ());
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::any (), server_port);

auto server_socket = std::make_shared<nano::transport::server_socket> (*node, endpoint, 1);
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);

// Accept connection, but don't read so the writer will drop.
server_socket->on_connection ([&connections] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
auto listener = std::make_shared<nano::transport::tcp_listener> (server_port, *node, 1);
listener->start ([&connections] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
connections.push_back (new_connection);
return true;
});
Expand All @@ -449,7 +424,7 @@ TEST (socket, drop_policy)
nano::transport::channel_tcp channel{ *node, client };
nano::test::counted_completion write_completion (static_cast<unsigned> (total_message_count));

client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), server_socket->listening_port ()),
client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), listener->endpoint ().port ()),
[&channel, total_message_count, node, &write_completion, &drop_policy, client] (boost::system::error_code const & ec_a) mutable {
for (int i = 0; i < total_message_count; i++)
{
Expand Down Expand Up @@ -525,14 +500,10 @@ TEST (socket, concurrent_writes)
auto server_port (system.get_available_port ());
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::any (), server_port);

auto server_socket = std::make_shared<nano::transport::server_socket> (*node, endpoint, max_connections);
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);
std::vector<std::shared_ptr<nano::transport::socket>> connections;

// On every new connection, start reading data
server_socket->on_connection ([&connections, &reader] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
auto listener = std::make_shared<nano::transport::tcp_listener> (server_port, *node, max_connections);
listener->start ([&connections, &reader] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
if (ec_a)
{
std::cerr << "on_connection: " << ec_a.message () << std::endl;
Expand All @@ -552,7 +523,7 @@ TEST (socket, concurrent_writes)
{
auto client = std::make_shared<nano::transport::client_socket> (*node);
clients.push_back (client);
client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), server_socket->listening_port ()),
client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), listener->endpoint ().port ()),
[&connection_count_completion] (boost::system::error_code const & ec_a) {
if (ec_a)
{
Expand Down
18 changes: 12 additions & 6 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons
// Thus, be very careful if you change the order: if `bootstrap` gets constructed before `network`,
// the latter would inherit the port from the former (if TCP is active, otherwise `network` picks first)
//
tcp_listener (network.port, *this),
tcp_listener{ std::make_shared<nano::transport::tcp_listener> (network.port, *this, config.tcp_incoming_connections_max) },
application_path (application_path_a),
port_mapping (*this),
rep_crawler (*this),
Expand Down Expand Up @@ -553,7 +553,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.ledger, "ledger"));
composite->add_component (collect_container_info (node.active, "active"));
composite->add_component (collect_container_info (node.bootstrap_initiator, "bootstrap_initiator"));
composite->add_component (collect_container_info (node.tcp_listener, "tcp_listener"));
composite->add_component (collect_container_info (*node.tcp_listener, "tcp_listener"));
composite->add_component (collect_container_info (node.network, "network"));
composite->add_component (node.telemetry.collect_container_info ("telemetry"));
composite->add_component (collect_container_info (node.workers, "workers"));
Expand Down Expand Up @@ -646,12 +646,18 @@ void nano::node::start ()
bool tcp_enabled = false;
if (config.tcp_incoming_connections_max > 0 && !(flags.disable_bootstrap_listener && flags.disable_tcp_realtime))
{
tcp_listener.start ();
tcp_listener->start ([this] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
if (!ec_a)
{
tcp_listener->accept_action (ec_a, new_connection);
}
return true;
});
tcp_enabled = true;

if (network.port != tcp_listener.port)
if (network.port != tcp_listener->endpoint ().port ())
{
network.port = tcp_listener.port;
network.port = tcp_listener->endpoint ().port ();
}

logger.always_log (boost::str (boost::format ("Node started with peering port `%1%`.") % network.port));
Expand Down Expand Up @@ -725,7 +731,7 @@ void nano::node::stop ()
websocket.stop ();
bootstrap_server.stop ();
bootstrap_initiator.stop ();
tcp_listener.stop ();
tcp_listener->stop ();
port_mapping.stop ();
wallets.stop ();
stats.stop ();
Expand Down
2 changes: 1 addition & 1 deletion nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class node final : public std::enable_shared_from_this<nano::node>
nano::telemetry telemetry;
nano::bootstrap_initiator bootstrap_initiator;
nano::bootstrap_server bootstrap_server;
nano::transport::tcp_listener tcp_listener;
std::shared_ptr<nano::transport::tcp_listener> tcp_listener;
std::filesystem::path application_path;
nano::node_observers observers;
nano::port_mapping port_mapping;
Expand Down
Loading

0 comments on commit 335d5bc

Please sign in to comment.