Skip to content

Commit

Permalink
Deadline timer cleanup (#2921)
Browse files Browse the repository at this point in the history
* Converting the socket write queue to an atomic number since strands already ensure non-concurrent execution.

* Moving responsibility for message drop policy in to tcp channel, rather than in the socket class itself.
Having the socket class handle the limiter drop policy is a layering violation where the socket class knows about specific usages by its clients.
This simplifies the socket class to be a more light weight wrapper around raw TCP sockets and reduces complexity.

* Making max value a static constexpr.

* Removing boost::optional for io_timeout as it's unnecessary.

* With io_timeout being atomic it can be set without strand synchronization.

Co-authored-by: Sergey Kroshnin <[email protected]>
Co-authored-by: Thiago Silva <[email protected]>
  • Loading branch information
3 people authored Nov 16, 2021
1 parent e8bf333 commit b3f6f27
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 31 deletions.
20 changes: 10 additions & 10 deletions nano/core_test/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ TEST (socket, max_connections)

// start 3 clients, 2 will persist but 1 will be dropped

auto client1 = std::make_shared<nano::socket> (*node, boost::none);
auto client1 = std::make_shared<nano::socket> (*node);
client1->async_connect (dst_endpoint, connect_handler);

auto client2 = std::make_shared<nano::socket> (*node, boost::none);
auto client2 = std::make_shared<nano::socket> (*node);
client2->async_connect (dst_endpoint, connect_handler);

auto client3 = std::make_shared<nano::socket> (*node, boost::none);
auto client3 = std::make_shared<nano::socket> (*node);
client3->async_connect (dst_endpoint, connect_handler);

auto get_tcp_accept_failures = [&node] () {
Expand All @@ -70,10 +70,10 @@ TEST (socket, max_connections)

server_sockets[0].reset ();

auto client4 = std::make_shared<nano::socket> (*node, boost::none);
auto client4 = std::make_shared<nano::socket> (*node);
client4->async_connect (dst_endpoint, connect_handler);

auto client5 = std::make_shared<nano::socket> (*node, boost::none);
auto client5 = std::make_shared<nano::socket> (*node);
client5->async_connect (dst_endpoint, connect_handler);

ASSERT_TIMELY (5s, get_tcp_accept_failures () == 2);
Expand All @@ -87,13 +87,13 @@ TEST (socket, max_connections)
server_sockets[2].reset ();
ASSERT_EQ (server_sockets.size (), 3);

auto client6 = std::make_shared<nano::socket> (*node, boost::none);
auto client6 = std::make_shared<nano::socket> (*node);
client6->async_connect (dst_endpoint, connect_handler);

auto client7 = std::make_shared<nano::socket> (*node, boost::none);
auto client7 = std::make_shared<nano::socket> (*node);
client7->async_connect (dst_endpoint, connect_handler);

auto client8 = std::make_shared<nano::socket> (*node, boost::none);
auto client8 = std::make_shared<nano::socket> (*node);
client8->async_connect (dst_endpoint, connect_handler);

ASSERT_TIMELY (5s, get_tcp_accept_failures () == 3);
Expand Down Expand Up @@ -132,7 +132,7 @@ TEST (socket, drop_policy)
return true;
});

auto client = std::make_shared<nano::socket> (*node, boost::none);
auto client = std::make_shared<nano::socket> (*node);
nano::transport::channel_tcp channel{ *node, client };
nano::util::counted_completion write_completion (static_cast<unsigned> (total_message_count));

Expand Down Expand Up @@ -244,7 +244,7 @@ TEST (socket, concurrent_writes)
std::vector<std::shared_ptr<nano::socket>> clients;
for (unsigned i = 0; i < client_count; i++)
{
auto client = std::make_shared<nano::socket> (*node, boost::none);
auto client = std::make_shared<nano::socket> (*node);
clients.push_back (client);
client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), 25000),
[&connection_count_completion] (boost::system::error_code const & ec_a) {
Expand Down
4 changes: 2 additions & 2 deletions nano/node/bootstrap/bootstrap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void nano::bootstrap_server::stop ()
void nano::bootstrap_server::receive ()
{
// Increase timeout to receive TCP header (idle server socket)
socket->set_timeout (node->network_params.network.idle_timeout);
socket->timeout_set (node->network_params.network.idle_timeout);
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, 8, [this_l] (boost::system::error_code const & ec, std::size_t size_a) {
// Set remote_endpoint
Expand All @@ -152,7 +152,7 @@ void nano::bootstrap_server::receive ()
this_l->remote_endpoint = this_l->socket->remote_endpoint ();
}
// Decrease timeout to default
this_l->socket->set_timeout (this_l->node->config.tcp_io_timeout);
this_l->socket->timeout_set (this_l->node->config.tcp_io_timeout);
// Receive header
this_l->receive_header_action (ec, size_a);
});
Expand Down
24 changes: 9 additions & 15 deletions nano/node/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,14 @@

#include <limits>

nano::socket::socket (nano::node & node_a, boost::optional<std::chrono::seconds> io_timeout_a) :
nano::socket::socket (nano::node & node_a) :
strand{ node_a.io_ctx.get_executor () },
tcp_socket{ node_a.io_ctx },
node{ node_a },
next_deadline{ std::numeric_limits<uint64_t>::max () },
last_completion_time{ 0 },
io_timeout{ io_timeout_a }
io_timeout{ node_a.config.tcp_io_timeout }
{
if (!io_timeout)
{
io_timeout = node_a.config.tcp_io_timeout;
}
}

nano::socket::~socket ()
Expand Down Expand Up @@ -108,7 +104,7 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std:

void nano::socket::start_timer ()
{
start_timer (io_timeout.get ());
start_timer (io_timeout);
}

void nano::socket::start_timer (std::chrono::seconds deadline_a)
Expand Down Expand Up @@ -156,12 +152,9 @@ bool nano::socket::has_timed_out () const
return timed_out;
}

void nano::socket::set_timeout (std::chrono::seconds io_timeout_a)
void nano::socket::timeout_set (std::chrono::seconds io_timeout_a)
{
auto this_l (shared_from_this ());
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l, io_timeout_a] () {
this_l->io_timeout = io_timeout_a;
}));
io_timeout = io_timeout_a;
}

void nano::socket::close ()
Expand All @@ -177,7 +170,7 @@ void nano::socket::close_internal ()
{
if (!closed.exchange (true))
{
io_timeout = boost::none;
io_timeout = std::chrono::seconds (0);
boost::system::error_code ec;

// Ignore error code for shutdown as it is best-effort
Expand All @@ -202,11 +195,12 @@ nano::tcp_endpoint nano::socket::local_endpoint () const
}

nano::server_socket::server_socket (nano::node & node_a, boost::asio::ip::tcp::endpoint local_a, std::size_t max_connections_a) :
socket{ node_a, std::chrono::seconds::max () },
socket{ node_a },
acceptor{ node_a.io_ctx },
local{ local_a },
max_inbound_connections{ max_connections_a }
{
io_timeout = std::chrono::seconds::max ();
}

void nano::server_socket::start (boost::system::error_code & ec_a)
Expand Down Expand Up @@ -250,7 +244,7 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
}

// Prepare new connection
auto new_connection = std::make_shared<nano::socket> (this_l->node, boost::none);
auto new_connection = std::make_shared<nano::socket> (this_l->node);
this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote,
boost::asio::bind_executor (this_l->strand,
[this_l, new_connection, callback_a] (boost::system::error_code const & ec_a) {
Expand Down
6 changes: 3 additions & 3 deletions nano/node/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class socket : public std::enable_shared_from_this<nano::socket>
* @param io_timeout If tcp async operation is not completed within the timeout, the socket is closed. If not set, the tcp_io_timeout config option is used.
* @param concurrency write concurrency
*/
explicit socket (nano::node & node, boost::optional<std::chrono::seconds> io_timeout = boost::none);
explicit socket (nano::node & node);
virtual ~socket ();
void async_connect (boost::asio::ip::tcp::endpoint const &, std::function<void (boost::system::error_code const &)>);
void async_read (std::shared_ptr<std::vector<uint8_t>> const &, std::size_t, std::function<void (boost::system::error_code const &, std::size_t)>);
Expand All @@ -58,7 +58,7 @@ class socket : public std::enable_shared_from_this<nano::socket>
/** Returns true if the socket has timed out */
bool has_timed_out () const;
/** This can be called to change the maximum idle time, e.g. based on the type of traffic detected. */
void set_timeout (std::chrono::seconds io_timeout_a);
void timeout_set (std::chrono::seconds io_timeout_a);
void start_timer (std::chrono::seconds deadline_a);
bool max () const
{
Expand Down Expand Up @@ -96,7 +96,7 @@ class socket : public std::enable_shared_from_this<nano::socket>
std::atomic<uint64_t> next_deadline;
std::atomic<uint64_t> last_completion_time;
std::atomic<bool> timed_out{ false };
boost::optional<std::chrono::seconds> io_timeout;
std::atomic<std::chrono::seconds> io_timeout;
std::atomic<std::size_t> queue_size{ 0 };

/** Set by close() - completion handlers must check this. This is more reliable than checking
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
node.network.tcp_channels.udp_fallback (endpoint_a);
return;
}
auto socket = std::make_shared<nano::socket> (node, boost::none);
auto socket = std::make_shared<nano::socket> (node);
std::weak_ptr<nano::socket> socket_w (socket);
auto channel (std::make_shared<nano::transport::channel_tcp> (node, socket_w));
std::weak_ptr<nano::node> node_w (node.shared ());
Expand Down

0 comments on commit b3f6f27

Please sign in to comment.