Skip to content

Commit

Permalink
This converts the socket IO timer to a timer started/stopped with raii.
Browse files Browse the repository at this point in the history
Each socket has a deadline_next value that specifies when the next i/o deadline is to occur.
Deadlines are either: a timeout for an I/O operation or, a timeout for inactivity. When an I/O operation completes, a deadline for inactivity is started.
Each creation of a timer stores a timestamp and sets the deadline in the socket to be the same value. Upon timer destruction, iff the socket value equals the deadline timer, we'll set a new timeout deadline, otherwise we have overlapping I/O operations and another timer will do the deadline setting.
  • Loading branch information
clemahieu committed Sep 22, 2020
1 parent a11c539 commit fe0f654
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 65 deletions.
36 changes: 34 additions & 2 deletions nano/core_test/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ TEST (socket, drop_policy)
return true;
});

auto client = std::make_shared<nano::socket> (*node, boost::none);
auto client = std::make_shared<nano::socket> (*node);
nano::transport::channel_tcp channel{ *node, client };
nano::util::counted_completion write_completion (total_message_count);

Expand Down Expand Up @@ -145,7 +145,7 @@ TEST (socket, concurrent_writes)
std::vector<std::shared_ptr<nano::socket>> clients;
for (unsigned i = 0; i < client_count; i++)
{
auto client = std::make_shared<nano::socket> (*node, boost::none);
auto client = std::make_shared<nano::socket> (*node);
clients.push_back (client);
client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), 25000),
[&connection_count_completion](boost::system::error_code const & ec_a) {
Expand Down Expand Up @@ -202,3 +202,35 @@ TEST (socket, concurrent_writes)
t.join ();
}
}

TEST (socket_timer, construct_destruct)
{
nano::system system (1);
auto socket = std::make_shared<nano::socket> (*system.nodes[0]);
nano::socket::timer timer{ socket };
}

TEST (socket_timer, construct_release)
{
nano::system system (1);
auto socket = std::make_shared<nano::socket> (*system.nodes[0]);
nano::socket::timer timer{ socket };
timer.release ();
}

TEST (socket_timer, construct_move_destruct)
{
nano::system system (1);
auto socket = std::make_shared<nano::socket> (*system.nodes[0]);
nano::socket::timer timer0{ socket };
nano::socket::timer timer1{ std::move (timer0) };
}

TEST (socket_timer, construct_move_release)
{
nano::system system (1);
auto socket = std::make_shared<nano::socket> (*system.nodes[0]);
nano::socket::timer timer0{ socket };
nano::socket::timer timer1{ std::move (timer0) };
timer1.release ();
}
1 change: 0 additions & 1 deletion nano/node/bootstrap/bootstrap_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ void nano::bootstrap_connections::pool_connection (std::shared_ptr<nano::bootstr
auto const & socket_l = client_a->socket;
if (!stopped && !client_a->pending_stop && !node.network.excluded_peers.check (client_a->channel->get_tcp_endpoint ()))
{
socket_l->start_timer (node.network_params.node.idle_timeout);
// Push into idle deque
if (!push_front)
{
Expand Down
4 changes: 0 additions & 4 deletions nano/node/bootstrap/bootstrap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,13 @@ void nano::bootstrap_server::stop ()

void nano::bootstrap_server::receive ()
{
// Increase timeout to receive TCP header (idle server socket)
socket->set_timeout (node->network_params.node.idle_timeout);
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, 8, [this_l](boost::system::error_code const & ec, size_t size_a) {
// Set remote_endpoint
if (this_l->remote_endpoint.port () == 0)
{
this_l->remote_endpoint = this_l->socket->remote_endpoint ();
}
// Decrease timeout to default
this_l->socket->set_timeout (this_l->node->config.tcp_io_timeout);
// Receive header
this_l->receive_header_action (ec, size_a);
});
Expand Down
91 changes: 43 additions & 48 deletions nano/node/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,11 @@

#include <limits>

nano::socket::socket (nano::node & node_a, boost::optional<std::chrono::seconds> io_timeout_a) :
nano::socket::socket (nano::node & node_a) :
strand{ node_a.io_ctx.get_executor () },
tcp_socket{ node_a.io_ctx },
node{ node_a },
next_deadline{ std::numeric_limits<uint64_t>::max () },
last_completion_time{ 0 },
io_timeout{ io_timeout_a }
node{ node_a }
{
if (!io_timeout)
{
io_timeout = node_a.config.tcp_io_timeout;
}
}

nano::socket::~socket ()
Expand All @@ -31,11 +24,9 @@ void nano::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::fu
{
checkup ();
auto this_l (shared_from_this ());
start_timer ();
this_l->tcp_socket.async_connect (endpoint_a,
boost::asio::bind_executor (this_l->strand,
[this_l, callback_a, endpoint_a](boost::system::error_code const & ec) {
this_l->stop_timer ();
[this_l, callback_a, endpoint_a, timer = socket::timer{ shared_from_this () }](boost::system::error_code const & ec) mutable {
this_l->remote = endpoint_a;
callback_a (ec);
}));
Expand All @@ -48,13 +39,11 @@ void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, s
auto this_l (shared_from_this ());
if (!closed)
{
start_timer ();
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, size_a, this_l]() {
boost::asio::async_read (this_l->tcp_socket, boost::asio::buffer (buffer_a->data (), size_a),
boost::asio::bind_executor (this_l->strand,
[this_l, buffer_a, callback_a](boost::system::error_code const & ec, size_t size_a) {
[this_l, buffer_a, callback_a, timer = socket::timer{ this_l->shared_from_this () }](boost::system::error_code const & ec, size_t size_a) mutable {
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::in, size_a);
this_l->stop_timer ();
callback_a (ec, size_a);
}));
}));
Expand All @@ -76,13 +65,11 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std:
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, this_l = shared_from_this ()]() {
if (!this_l->closed)
{
this_l->start_timer ();
nano::async_write (this_l->tcp_socket, buffer_a,
boost::asio::bind_executor (this_l->strand,
[buffer_a, callback_a, this_l](boost::system::error_code ec, std::size_t size_a) {
[buffer_a, callback_a, this_l, timer = socket::timer{ this_l->shared_from_this () }](boost::system::error_code ec, std::size_t size_a) mutable {
--this_l->queue_size;
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);
this_l->stop_timer ();
if (callback_a)
{
callback_a (ec, size_a);
Expand All @@ -106,29 +93,14 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std:
}
}

void nano::socket::start_timer ()
{
start_timer (io_timeout.get ());
}

void nano::socket::start_timer (std::chrono::seconds deadline_a)
{
next_deadline = deadline_a.count ();
}

void nano::socket::stop_timer ()
{
last_completion_time = nano::seconds_since_epoch ();
}

void nano::socket::checkup ()
{
std::weak_ptr<nano::socket> this_w (shared_from_this ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 2), [this_w]() {
if (auto this_l = this_w.lock ())
{
uint64_t now (nano::seconds_since_epoch ());
if (this_l->next_deadline != std::numeric_limits<uint64_t>::max () && now - this_l->last_completion_time > this_l->next_deadline)
auto now = std::chrono::steady_clock::now ();
if (this_l->deadline_next.load () < now.time_since_epoch ().count ())
{
if (this_l->node.config.logging.network_timeout_logging ())
{
Expand All @@ -151,17 +123,43 @@ void nano::socket::checkup ()
});
}

bool nano::socket::has_timed_out () const
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
// an IO operation immediately, which will start a timer.
void nano::socket::deadline_start ()
{
return timed_out;
debug_assert (deadline_next.load () == std::numeric_limits<uint64_t>::max ());
deadline_next = (std::chrono::steady_clock::now () + std::chrono::seconds (2)).time_since_epoch ().count ();
}

void nano::socket::set_timeout (std::chrono::seconds io_timeout_a)
nano::socket::timer::timer (std::shared_ptr<nano::socket> socket_a) :
socket{ socket_a },
idle{ socket_a->node.network_params.node.idle_timeout },
value{ static_cast<uint64_t> ((std::chrono::steady_clock::now () + socket_a->node.config.tcp_io_timeout).time_since_epoch ().count ()) }
{
auto this_l (shared_from_this ());
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l, io_timeout_a]() {
this_l->io_timeout = io_timeout_a;
}));
socket_a->deadline_next = value;
}

nano::socket::timer::timer (nano::socket::timer && other_a) :
socket{ other_a.socket },
idle{ other_a.idle },
value{ other_a.value }
{
other_a.value = std::numeric_limits<uint64_t>::max ();
}

nano::socket::timer::~timer ()
{
release ();
}

void nano::socket::timer::release ()
{
socket->deadline_next.compare_exchange_strong (value, (std::chrono::steady_clock::now () + idle).time_since_epoch ().count ());
}

bool nano::socket::has_timed_out () const
{
return timed_out;
}

void nano::socket::close ()
Expand All @@ -177,7 +175,6 @@ void nano::socket::close_internal ()
{
if (!closed.exchange (true))
{
io_timeout = boost::none;
boost::system::error_code ec;

// Ignore error code for shutdown as it is best-effort
Expand All @@ -197,7 +194,7 @@ nano::tcp_endpoint nano::socket::remote_endpoint () const
}

nano::server_socket::server_socket (nano::node & node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a) :
socket{ node_a, std::chrono::seconds::max () },
socket{ node_a },
acceptor{ node_a.io_ctx },
local{ local_a },
max_inbound_connections{ max_connections_a }
Expand Down Expand Up @@ -243,7 +240,7 @@ void nano::server_socket::on_connection (std::function<bool(std::shared_ptr<nano
if (this_l->connections.size () < this_l->max_inbound_connections)
{
// Prepare new connection
auto new_connection = std::make_shared<nano::socket> (this_l->node, boost::none);
auto new_connection = std::make_shared<nano::socket> (this_l->node);
this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote,
boost::asio::bind_executor (this_l->strand,
[this_l, new_connection, callback_a](boost::system::error_code const & ec_a) {
Expand All @@ -252,10 +249,8 @@ void nano::server_socket::on_connection (std::function<bool(std::shared_ptr<nano
{
if (!ec_a)
{
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
// an IO operation immediately, which will start a timer.
new_connection->deadline_start ();
new_connection->checkup ();
new_connection->start_timer (this_l->node.network_params.network.is_dev_network () ? std::chrono::seconds (2) : this_l->node.network_params.node.idle_timeout);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
this_l->connections.push_back (new_connection);
}
Expand Down
29 changes: 20 additions & 9 deletions nano/node/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class socket : public std::enable_shared_from_this<nano::socket>
* @param io_timeout If tcp async operation is not completed within the timeout, the socket is closed. If not set, the tcp_io_timeout config option is used.
* @param concurrency write concurrency
*/
explicit socket (nano::node & node, boost::optional<std::chrono::seconds> io_timeout = boost::none);
explicit socket (nano::node & node);
virtual ~socket ();
void async_connect (boost::asio::ip::tcp::endpoint const &, std::function<void(boost::system::error_code const &)>);
void async_read (std::shared_ptr<std::vector<uint8_t>>, size_t, std::function<void(boost::system::error_code const &, size_t)>);
Expand All @@ -49,9 +49,6 @@ class socket : public std::enable_shared_from_this<nano::socket>
boost::asio::ip::tcp::endpoint remote_endpoint () const;
/** Returns true if the socket has timed out */
bool has_timed_out () const;
/** This can be called to change the maximum idle time, e.g. based on the type of traffic detected. */
void set_timeout (std::chrono::seconds io_timeout_a);
void start_timer (std::chrono::seconds deadline_a);
bool max () const
{
return queue_size >= queue_size_max;
Expand All @@ -77,22 +74,36 @@ class socket : public std::enable_shared_from_this<nano::socket>
/** The other end of the connection */
boost::asio::ip::tcp::endpoint remote;

std::atomic<uint64_t> next_deadline;
std::atomic<uint64_t> last_completion_time;
std::atomic<uint64_t> deadline_next{ std::numeric_limits<uint64_t>::max () };
std::atomic<bool> timed_out{ false };
boost::optional<std::chrono::seconds> io_timeout;
std::atomic<size_t> queue_size{ 0 };

/** Set by close() - completion handlers must check this. This is more reliable than checking
error codes as the OS may have already completed the async operation. */
std::atomic<bool> closed{ false };
void close_internal ();
void start_timer ();
void stop_timer ();
void checkup ();
void deadline_start ();

public:
static size_t constexpr queue_size_max = 128;
class timer
{
// Non-copyable
timer (nano::socket::timer &) = delete;
timer & operator= (nano::socket::timer const &) = delete;

public:
timer (std::shared_ptr<nano::socket> socket_a);
timer (nano::socket::timer && other_a);
~timer ();
void release ();

private:
std::shared_ptr<nano::socket> socket;
std::chrono::seconds idle;
uint64_t value;
};
};

/** Socket class for TCP servers */
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
node.network.tcp_channels.udp_fallback (endpoint_a, callback_a);
return;
}
auto socket = std::make_shared<nano::socket> (node, boost::none);
auto socket = std::make_shared<nano::socket> (node);
std::weak_ptr<nano::socket> socket_w (socket);
auto channel (std::make_shared<nano::transport::channel_tcp> (node, socket_w));
std::weak_ptr<nano::node> node_w (node.shared ());
Expand Down

0 comments on commit fe0f654

Please sign in to comment.