From 722e3967f6ddcbfbe7873445be1ac9bd527cb3e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Fri, 19 Apr 2024 23:21:48 +0200 Subject: [PATCH 1/3] Async utils --- nano/core_test/CMakeLists.txt | 1 + nano/core_test/async.cpp | 52 ++++++++++++++++++++++++++++++ nano/lib/async.hpp | 60 +++++++++++++++++++++++++++++++++++ 3 files changed, 113 insertions(+) create mode 100644 nano/core_test/async.cpp create mode 100644 nano/lib/async.hpp diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 0a9f771fba..9aadbac052 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -4,6 +4,7 @@ add_executable( fakes/websocket_client.hpp fakes/work_peer.hpp active_transactions.cpp + async.cpp backlog.cpp block.cpp block_store.cpp diff --git a/nano/core_test/async.cpp b/nano/core_test/async.cpp new file mode 100644 index 0000000000..8b27e6ef0c --- /dev/null +++ b/nano/core_test/async.cpp @@ -0,0 +1,52 @@ +#include +#include +#include +#include + +#include + +#include + +#include + +using namespace std::chrono_literals; + +TEST (async, sleep) +{ + auto io_ctx = std::make_shared (); + nano::thread_runner runner{ io_ctx, 1 }; + nano::async::strand strand{ io_ctx->get_executor () }; + + auto fut = asio::co_spawn ( + strand, + [&] () -> asio::awaitable { + co_await nano::async::sleep_for (500ms); + }, + asio::use_future); + + ASSERT_EQ (fut.wait_for (100ms), std::future_status::timeout); + ASSERT_EQ (fut.wait_for (1s), std::future_status::ready); +} + +TEST (async, cancellation) +{ + auto io_ctx = std::make_shared (); + nano::thread_runner runner{ io_ctx, 1 }; + nano::async::strand strand{ io_ctx->get_executor () }; + + nano::async::cancellation cancellation{ strand }; + + auto fut = asio::co_spawn ( + strand, + [&] () -> asio::awaitable { + co_await nano::async::sleep_for (10s); + }, + asio::bind_cancellation_slot (cancellation.slot (), asio::use_future)); + + ASSERT_EQ (fut.wait_for (500ms), std::future_status::timeout); + + cancellation.emit (); + + ASSERT_EQ (fut.wait_for (500ms), std::future_status::ready); + ASSERT_NO_THROW (fut.get ()); +} \ No newline at end of file diff --git a/nano/lib/async.hpp b/nano/lib/async.hpp new file mode 100644 index 0000000000..ce4a1ef4cf --- /dev/null +++ b/nano/lib/async.hpp @@ -0,0 +1,60 @@ +#pragma once + +#include + +#include + +namespace asio = boost::asio; + +namespace nano::async +{ +using strand = asio::strand; + +inline asio::awaitable setup_this_coro () +{ + co_await asio::this_coro::throw_if_cancelled (false); +} + +inline asio::awaitable sleep_for (auto duration) +{ + asio::steady_timer timer{ co_await asio::this_coro::executor }; + timer.expires_after (duration); + boost::system::error_code ec; // Swallow potential error from coroutine cancellation + co_await timer.async_wait (asio::redirect_error (asio::use_awaitable, ec)); + debug_assert (!ec || ec == asio::error::operation_aborted); +} + +/** + * A cancellation signal that can be emitted from any thread. + * I follows the same semantics as asio::cancellation_signal. + */ +class cancellation +{ +public: + explicit cancellation (nano::async::strand & strand) : + strand{ strand } + { + } + + void emit (asio::cancellation_type type = asio::cancellation_type::all) + { + asio::dispatch (strand, asio::use_future ([this, type] () { + signal.emit (type); + })) + .wait (); + } + + auto slot () + { + // Ensure that the slot is only connected once + debug_assert (std::exchange (slotted, true) == false); + return signal.slot (); + } + +private: + nano::async::strand & strand; + asio::cancellation_signal signal; + + bool slotted{ false }; +}; +} \ No newline at end of file From f6f15ca8ad6033c1828ccc0caea691146fd731e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 18 Apr 2024 12:00:32 +0200 Subject: [PATCH 2/3] Use coroutines --- nano/core_test/node.cpp | 3 +- nano/lib/async.hpp | 5 -- nano/node/transport/tcp_listener.cpp | 122 ++++++++++++++------------- nano/node/transport/tcp_listener.hpp | 17 ++-- 4 files changed, 75 insertions(+), 72 deletions(-) diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 980841a036..7ade6ec18a 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -73,10 +73,9 @@ TEST (node, work_generate) TEST (node, block_store_path_failure) { nano::test::system system; - auto io_ctx = std::make_shared (); auto path (nano::unique_path ()); nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits::max () }; - auto node (std::make_shared (io_ctx, system.get_available_port (), path, pool)); + auto node (std::make_shared (system.io_ctx, system.get_available_port (), path, pool)); system.register_node (node); ASSERT_TRUE (node->wallets.items.empty ()); } diff --git a/nano/lib/async.hpp b/nano/lib/async.hpp index ce4a1ef4cf..0dd4751d59 100644 --- a/nano/lib/async.hpp +++ b/nano/lib/async.hpp @@ -10,11 +10,6 @@ namespace nano::async { using strand = asio::strand; -inline asio::awaitable setup_this_coro () -{ - co_await asio::this_coro::throw_if_cancelled (false); -} - inline asio::awaitable sleep_for (auto duration) { asio::steady_timer timer{ co_await asio::this_coro::executor }; diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp index 7cb676fb73..a27e0a6ae2 100644 --- a/nano/node/transport/tcp_listener.cpp +++ b/nano/node/transport/tcp_listener.cpp @@ -21,8 +21,9 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_ logger{ node_a.logger }, port{ port_a }, max_inbound_connections{ max_inbound_connections }, - acceptor{ node_a.io_ctx }, - local{ asio::ip::tcp::endpoint{ asio::ip::address_v6::any (), port_a } } + strand{ node_a.io_ctx.get_executor () }, + cancellation{ strand }, + acceptor{ strand } { connection_accepted.add ([this] (auto const & socket, auto const & server) { node.observers.socket_accepted.notify (*socket); @@ -32,49 +33,66 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_ nano::transport::tcp_listener::~tcp_listener () { // Thread should be stopped before destruction - debug_assert (!thread.joinable ()); + debug_assert (!cleanup_thread.joinable ()); + debug_assert (!future.valid () || future.wait_for (0s) == std::future_status::ready); } void nano::transport::tcp_listener::start () { - debug_assert (!thread.joinable ()); debug_assert (!cleanup_thread.joinable ()); + debug_assert (!future.valid ()); try { - acceptor.open (local.protocol ()); + asio::ip::tcp::endpoint target{ asio::ip::address_v6::any (), port }; + + acceptor.open (target.protocol ()); acceptor.set_option (asio::ip::tcp::acceptor::reuse_address (true)); - acceptor.bind (local); + acceptor.bind (target); acceptor.listen (asio::socket_base::max_listen_connections); + { + std::lock_guard lock{ mutex }; + local = acceptor.local_endpoint (); + } + logger.info (nano::log::type::tcp_listener, "Listening for incoming connections on: {}", fmt::streamed (acceptor.local_endpoint ())); } catch (boost::system::system_error const & ex) { logger.critical (nano::log::type::tcp_listener, "Error while binding for incoming TCP: {} (port: {})", ex.what (), port); - - throw std::runtime_error (ex.code ().message ()); + throw; } - thread = std::thread ([this] { - nano::thread_role::set (nano::thread_role::name::tcp_listener); + future = asio::co_spawn ( + strand, [this] () -> asio::awaitable { try { - logger.debug (nano::log::type::tcp_listener, "Starting acceptor thread"); - run (); - logger.debug (nano::log::type::tcp_listener, "Stopped acceptor thread"); + logger.debug (nano::log::type::tcp_listener, "Starting acceptor"); + + try + { + co_await run (); + } + catch (boost::system::system_error const & ex) + { + // Operation aborted is expected when cancelling the acceptor + debug_assert (ex.code () == asio::error::operation_aborted); + } + debug_assert (strand.running_in_this_thread ()); + + logger.debug (nano::log::type::tcp_listener, "Stopped acceptor"); } catch (std::exception const & ex) { logger.critical (nano::log::type::tcp_listener, "Error: {}", ex.what ()); - release_assert (false); // Should be handled earlier + release_assert (false); // Unexpected error } catch (...) { logger.critical (nano::log::type::tcp_listener, "Unknown error"); - release_assert (false); // Should be handled earlier - } - }); + release_assert (false); // Unexpected error + } }, asio::bind_cancellation_slot (cancellation.slot (), asio::use_future)); cleanup_thread = std::thread ([this] { nano::thread_role::set (nano::thread_role::name::tcp_listener); @@ -87,28 +105,31 @@ void nano::transport::tcp_listener::stop () debug_assert (!stopped); logger.info (nano::log::type::tcp_listener, "Stopping listening for incoming connections and closing all sockets..."); + { nano::lock_guard lock{ mutex }; stopped = true; - - boost::system::error_code ec; - acceptor.close (ec); // Best effort to close the acceptor, ignore errors - if (ec) - { - logger.error (nano::log::type::tcp_listener, "Error while closing acceptor: {}", ec.message ()); - } + local = {}; } condition.notify_all (); - if (thread.joinable ()) + if (future.valid ()) { - thread.join (); + cancellation.emit (); + future.wait (); } if (cleanup_thread.joinable ()) { cleanup_thread.join (); } + boost::system::error_code ec; + acceptor.close (ec); // Best effort to close the acceptor, ignore errors + if (ec) + { + logger.error (nano::log::type::tcp_listener, "Error while closing acceptor: {}", ec.message ()); + } + decltype (connections) connections_l; { nano::lock_guard lock{ mutex }; @@ -157,23 +178,18 @@ void nano::transport::tcp_listener::cleanup () }); } -void nano::transport::tcp_listener::run () +asio::awaitable nano::transport::tcp_listener::run () { - nano::unique_lock lock{ mutex }; + debug_assert (strand.running_in_this_thread ()); + while (!stopped && acceptor.is_open ()) { - lock.unlock (); - - wait_available_slots (); - - if (stopped) - { - return; - } + co_await wait_available_slots (); try { - auto result = accept_one (); + auto socket = co_await accept_socket (); + auto result = accept_one (std::move (socket)); if (result != accept_result::accepted) { stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure, nano::stat::dir::in); @@ -186,10 +202,8 @@ void nano::transport::tcp_listener::run () logger.log (nano::log::level::debug, nano::log::type::tcp_listener, "Error accepting incoming connection: {}", ex.what ()); } - lock.lock (); - // Sleep for a while to prevent busy loop - condition.wait_for (lock, 10ms, [this] () { return stopped.load (); }); + co_await nano::async::sleep_for (10ms); } if (!stopped) { @@ -198,20 +212,15 @@ void nano::transport::tcp_listener::run () } } -asio::ip::tcp::socket nano::transport::tcp_listener::accept_socket () +asio::awaitable nano::transport::tcp_listener::accept_socket () { - std::future future; - { - nano::unique_lock lock{ mutex }; - future = acceptor.async_accept (asio::use_future); - } - future.wait (); - return future.get (); + debug_assert (strand.running_in_this_thread ()); + + co_return co_await acceptor.async_accept (asio::use_awaitable); } -auto nano::transport::tcp_listener::accept_one () -> accept_result +auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket) -> accept_result { - auto raw_socket = accept_socket (); auto const remote_endpoint = raw_socket.remote_endpoint (); auto const local_endpoint = raw_socket.local_endpoint (); @@ -255,7 +264,7 @@ auto nano::transport::tcp_listener::accept_one () -> accept_result return accept_result::accepted; } -void nano::transport::tcp_listener::wait_available_slots () +asio::awaitable nano::transport::tcp_listener::wait_available_slots () const { nano::interval log_interval; while (connection_count () >= max_inbound_connections && !stopped) @@ -266,7 +275,7 @@ void nano::transport::tcp_listener::wait_available_slots () connection_count (), max_inbound_connections); } - std::this_thread::sleep_for (100ms); + co_await nano::async::sleep_for (100ms); } } @@ -367,14 +376,7 @@ size_t nano::transport::tcp_listener::count_per_subnetwork (asio::ip::address co asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const { nano::lock_guard lock{ mutex }; - if (!stopped) - { - return { asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port () }; - } - else - { - return { asio::ip::address_v6::loopback (), 0 }; - } + return { asio::ip::address_v6::loopback (), local.port () }; } std::unique_ptr nano::transport::tcp_listener::collect_container_info (std::string const & name) diff --git a/nano/node/transport/tcp_listener.hpp b/nano/node/transport/tcp_listener.hpp index 5bad7a7dd4..7c8d6fe5e6 100644 --- a/nano/node/transport/tcp_listener.hpp +++ b/nano/node/transport/tcp_listener.hpp @@ -1,12 +1,15 @@ #pragma once +#include #include +#include #include #include #include #include +#include #include #include @@ -54,10 +57,11 @@ class tcp_listener final nano::logger & logger; private: - void run (); + asio::awaitable run (); + asio::awaitable wait_available_slots () const; + void run_cleanup (); void cleanup (); - void wait_available_slots (); enum class accept_result { @@ -68,9 +72,9 @@ class tcp_listener final excluded, }; - accept_result accept_one (); + accept_result accept_one (asio::ip::tcp::socket); accept_result check_limits (asio::ip::address const & ip); - asio::ip::tcp::socket accept_socket (); + asio::awaitable accept_socket (); size_t count_per_ip (asio::ip::address const & ip) const; size_t count_per_subnetwork (asio::ip::address const & ip) const; @@ -103,13 +107,16 @@ class tcp_listener final // clang-format on ordered_connections connections; + nano::async::strand strand; + nano::async::cancellation cancellation; + asio::ip::tcp::acceptor acceptor; asio::ip::tcp::endpoint local; std::atomic stopped; nano::condition_variable condition; mutable nano::mutex mutex; - std::thread thread; + std::future future; std::thread cleanup_thread; }; } \ No newline at end of file From 76a5211adfa0d0400441de4a7d48ffc204b9ff1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Wed, 24 Apr 2024 07:28:39 +0200 Subject: [PATCH 3/3] Typo --- nano/lib/async.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nano/lib/async.hpp b/nano/lib/async.hpp index 0dd4751d59..c1031ec527 100644 --- a/nano/lib/async.hpp +++ b/nano/lib/async.hpp @@ -21,7 +21,7 @@ inline asio::awaitable sleep_for (auto duration) /** * A cancellation signal that can be emitted from any thread. - * I follows the same semantics as asio::cancellation_signal. + * It follows the same semantics as asio::cancellation_signal. */ class cancellation {