From 9baeff2265f98fe8f358c89718b8c3521e6fc83c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 19 Mar 2024 16:01:37 +0100 Subject: [PATCH] Store `io_context` as shared pointer --- nano/core_test/network.cpp | 10 ++--- nano/core_test/node.cpp | 8 ++-- nano/core_test/socket.cpp | 20 ++++----- nano/lib/thread_runner.cpp | 12 +++-- nano/lib/thread_runner.hpp | 6 ++- nano/load_test/entry.cpp | 6 ++- nano/nano_node/daemon.cpp | 22 ++++++--- nano/nano_node/entry.cpp | 8 ++-- nano/nano_rpc/entry.cpp | 14 ++++-- nano/nano_wallet/entry.cpp | 3 +- nano/node/ipc/ipc_server.cpp | 9 ++-- nano/node/node.cpp | 9 ++-- nano/node/node.hpp | 7 +-- nano/rpc/rpc.cpp | 9 ++-- nano/rpc/rpc.hpp | 14 +++--- nano/rpc_test/rpc.cpp | 10 ++--- nano/rpc_test/rpc_context.cpp | 4 +- nano/slow_test/bootstrap.cpp | 2 +- nano/test_common/system.cpp | 85 ++++++++++++++++++----------------- nano/test_common/system.hpp | 2 +- 20 files changed, 148 insertions(+), 112 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index c7152c6641..2dcea59d2f 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -21,14 +21,14 @@ using namespace std::chrono_literals; TEST (network, tcp_connection) { nano::test::system system; - boost::asio::ip::tcp::acceptor acceptor (system.io_ctx); + boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx); auto port = system.get_available_port (); boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), port); acceptor.open (endpoint.protocol ()); acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); acceptor.bind (endpoint); acceptor.listen (); - boost::asio::ip::tcp::socket incoming (system.io_ctx); + boost::asio::ip::tcp::socket incoming (*system.io_ctx); std::atomic done1 (false); std::string message1; acceptor.async_accept (incoming, [&done1, &message1] (boost::system::error_code const & ec_a) { @@ -39,7 +39,7 @@ TEST (network, tcp_connection) } done1 = true; }); - boost::asio::ip::tcp::socket connector (system.io_ctx); + boost::asio::ip::tcp::socket connector (*system.io_ctx); std::atomic done2 (false); std::string message2; connector.async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), acceptor.local_endpoint ().port ()), @@ -538,13 +538,13 @@ TEST (network, ipv6_bind_send_ipv4) std::array bytes1{}; std::atomic finish1{ false }; nano::endpoint endpoint3; - boost::asio::ip::udp::socket socket1 (system.io_ctx, endpoint1); + boost::asio::ip::udp::socket socket1 (*system.io_ctx, endpoint1); socket1.async_receive_from (boost::asio::buffer (bytes1.data (), bytes1.size ()), endpoint3, [&finish1] (boost::system::error_code const & error, size_t size_a) { ASSERT_FALSE (error); ASSERT_EQ (16, size_a); finish1 = true; }); - boost::asio::ip::udp::socket socket2 (system.io_ctx, endpoint2); + boost::asio::ip::udp::socket socket2 (*system.io_ctx, endpoint2); nano::endpoint endpoint5 (boost::asio::ip::address_v4::loopback (), socket1.local_endpoint ().port ()); nano::endpoint endpoint6 (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4::loopback ()), socket2.local_endpoint ().port ()); socket2.async_send_to (boost::asio::buffer (std::array{}, 16), endpoint5, [] (boost::system::error_code const & error, size_t size_a) { diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index cf81a51bdb..03c05211b1 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -36,7 +36,7 @@ TEST (node, stop) nano::test::system system (1); ASSERT_NE (system.nodes[0]->wallets.items.end (), system.nodes[0]->wallets.items.begin ()); system.nodes[0]->stop (); - system.io_ctx.run (); + system.io_ctx->run (); ASSERT_TRUE (true); } @@ -68,10 +68,10 @@ TEST (node, work_generate) TEST (node, block_store_path_failure) { nano::test::system system; - auto service (std::make_shared ()); + auto io_ctx = std::make_shared (); auto path (nano::unique_path ()); nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits::max () }; - auto node (std::make_shared (*service, system.get_available_port (), path, pool)); + auto node (std::make_shared (io_ctx, system.get_available_port (), path, pool)); ASSERT_TRUE (node->wallets.items.empty ()); node->stop (); } @@ -97,7 +97,7 @@ TEST (node_DeathTest, readonly_block_store_not_exist) TEST (node, password_fanout) { nano::test::system system; - boost::asio::io_context io_ctx; + auto io_ctx = std::make_shared (); auto path (nano::unique_path ()); nano::node_config config; config.peering_port = system.get_available_port (); diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index 1ec356391b..11a0862b10 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -406,7 +406,7 @@ TEST (socket, drop_policy) nano::inactive_node inactivenode (nano::unique_path (), node_flags); auto node = inactivenode.node; - nano::thread_runner runner (node->io_ctx, 1); + nano::thread_runner runner (node->io_ctx_shared, 1); std::vector> connections; @@ -469,7 +469,7 @@ TEST (socket, concurrent_writes) // This gives more realistic execution than using system#poll, allowing writes to // queue up and drain concurrently. - nano::thread_runner runner (node->io_ctx, 1); + nano::thread_runner runner (node->io_ctx_shared, 1); constexpr size_t max_connections = 4; constexpr size_t client_count = max_connections; @@ -622,13 +622,13 @@ TEST (socket_timeout, read) // create a server socket boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ()); - boost::asio::ip::tcp::acceptor acceptor (system.io_ctx); + boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx); acceptor.open (endpoint.protocol ()); acceptor.bind (endpoint); acceptor.listen (boost::asio::socket_base::max_listen_connections); // asynchronously accept an incoming connection and create a newsock and do not send any data - boost::asio::ip::tcp::socket newsock (system.io_ctx); + boost::asio::ip::tcp::socket newsock (*system.io_ctx); acceptor.async_accept (newsock, [] (boost::system::error_code const & ec_a) { EXPECT_FALSE (ec_a); }); @@ -668,13 +668,13 @@ TEST (socket_timeout, write) // create a server socket boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ()); - boost::asio::ip::tcp::acceptor acceptor (system.io_ctx); + boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx); acceptor.open (endpoint.protocol ()); acceptor.bind (endpoint); acceptor.listen (boost::asio::socket_base::max_listen_connections); // asynchronously accept an incoming connection and create a newsock and do not receive any data - boost::asio::ip::tcp::socket newsock (system.io_ctx); + boost::asio::ip::tcp::socket newsock (*system.io_ctx); acceptor.async_accept (newsock, [] (boost::system::error_code const & ec_a) { EXPECT_FALSE (ec_a); }); @@ -719,13 +719,13 @@ TEST (socket_timeout, read_overlapped) // create a server socket boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ()); - boost::asio::ip::tcp::acceptor acceptor (system.io_ctx); + boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx); acceptor.open (endpoint.protocol ()); acceptor.bind (endpoint); acceptor.listen (boost::asio::socket_base::max_listen_connections); // asynchronously accept an incoming connection and send one byte only - boost::asio::ip::tcp::socket newsock (system.io_ctx); + boost::asio::ip::tcp::socket newsock (*system.io_ctx); acceptor.async_accept (newsock, [&newsock] (boost::system::error_code const & ec_a) { EXPECT_FALSE (ec_a); @@ -777,13 +777,13 @@ TEST (socket_timeout, write_overlapped) // create a server socket boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ()); - boost::asio::ip::tcp::acceptor acceptor (system.io_ctx); + boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx); acceptor.open (endpoint.protocol ()); acceptor.bind (endpoint); acceptor.listen (boost::asio::socket_base::max_listen_connections); // asynchronously accept an incoming connection and read 2 bytes only - boost::asio::ip::tcp::socket newsock (system.io_ctx); + boost::asio::ip::tcp::socket newsock (*system.io_ctx); auto buffer = std::make_shared> (1); acceptor.async_accept (newsock, [&newsock, &buffer] (boost::system::error_code const & ec_a) { EXPECT_FALSE (ec_a); diff --git a/nano/lib/thread_runner.cpp b/nano/lib/thread_runner.cpp index 26fa307b24..363a217952 100644 --- a/nano/lib/thread_runner.cpp +++ b/nano/lib/thread_runner.cpp @@ -10,17 +10,20 @@ * thread_runner */ -nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned num_threads, const nano::thread_role::name thread_role_a) : - io_guard{ boost::asio::make_work_guard (io_ctx_a) }, +nano::thread_runner::thread_runner (std::shared_ptr io_ctx_a, unsigned num_threads, const nano::thread_role::name thread_role_a) : + io_ctx{ io_ctx_a }, + io_guard{ boost::asio::make_work_guard (*io_ctx_a) }, role{ thread_role_a } { + debug_assert (io_ctx != nullptr); + for (auto i (0u); i < num_threads; ++i) { - threads.emplace_back (nano::thread_attributes::get_default (), [this, &io_ctx_a] () { + threads.emplace_back (nano::thread_attributes::get_default (), [this] () { nano::thread_role::set (role); try { - run (io_ctx_a); + run (*io_ctx); } catch (std::exception const & ex) { @@ -78,6 +81,7 @@ void nano::thread_runner::join () i.join (); } } + io_ctx.reset (); } void nano::thread_runner::stop_event_processing () diff --git a/nano/lib/thread_runner.hpp b/nano/lib/thread_runner.hpp index 7fbba2e082..975beab3f2 100644 --- a/nano/lib/thread_runner.hpp +++ b/nano/lib/thread_runner.hpp @@ -12,18 +12,20 @@ namespace nano class thread_runner final { public: - thread_runner (boost::asio::io_context &, unsigned num_threads, nano::thread_role::name thread_role = nano::thread_role::name::io); + thread_runner (std::shared_ptr, unsigned num_threads, nano::thread_role::name thread_role = nano::thread_role::name::io); ~thread_runner (); /** Tells the IO context to stop processing events.*/ void stop_event_processing (); + /** Wait for IO threads to complete */ void join (); private: + std::shared_ptr io_ctx; + boost::asio::executor_work_guard io_guard; nano::thread_role::name const role; std::vector threads; - boost::asio::executor_work_guard io_guard; private: void run (boost::asio::io_context &); diff --git a/nano/load_test/entry.cpp b/nano/load_test/entry.cpp index 7a012fdeb0..c47989fd63 100644 --- a/nano/load_test/entry.cpp +++ b/nano/load_test/entry.cpp @@ -592,7 +592,8 @@ int main (int argc, char * const * argv) std::this_thread::sleep_for (std::chrono::seconds (7)); std::cout << "Connecting nodes..." << std::endl; - boost::asio::io_context ioc; + std::shared_ptr ioc_shared = std::make_shared (); + boost::asio::io_context & ioc{ *ioc_shared }; debug_assert (!nano::signal_handler_impl); nano::signal_handler_impl = [&ioc] () { @@ -715,7 +716,8 @@ int main (int argc, char * const * argv) // Stop main node stop_rpc (ioc, primary_node_results); }); - nano::thread_runner runner (ioc, simultaneous_process_calls); + + nano::thread_runner runner (ioc_shared, simultaneous_process_calls); t.join (); runner.join (); diff --git a/nano/nano_node/daemon.cpp b/nano/nano_node/daemon.cpp index 2becd930a5..879bc5e2d1 100644 --- a/nano/nano_node/daemon.cpp +++ b/nano/nano_node/daemon.cpp @@ -98,7 +98,8 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag config.node.websocket_config.tls_config = tls_config; } - boost::asio::io_context io_ctx; + std::shared_ptr io_ctx = std::make_shared (); + auto opencl = nano::opencl_work::create (config.opencl_enable, config.opencl, logger, config.node.network_params.work); nano::opencl_work_func_t opencl_work_func; if (opencl) @@ -132,7 +133,7 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag config.node.peering_port = network_params.network.default_node_port; } - auto node (std::make_shared (io_ctx, data_path, config.node, opencl_work, flags)); + auto node = std::make_shared (io_ctx, data_path, config.node, opencl_work, flags); if (!node->init_error ()) { auto network_label = node->network_params.network.get_current_network_as_string (); @@ -165,10 +166,14 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag } rpc_config.tls_config = tls_config; - rpc_handler = std::make_unique (*node, ipc_server, config.rpc, [&ipc_server, &workers = node->workers, &io_ctx] () { + rpc_handler = std::make_unique (*node, ipc_server, config.rpc, + [&ipc_server, &workers = node->workers, io_ctx_w = std::weak_ptr{ io_ctx }] () { ipc_server.stop (); - workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (3), [&io_ctx] () { - io_ctx.stop (); + workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (3), [io_ctx_w] () { + if (auto io_ctx_l = io_ctx_w.lock ()) + { + io_ctx_l->stop (); + } }); }); rpc = nano::get_rpc (io_ctx, rpc_config, *rpc_handler); @@ -189,10 +194,13 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag } debug_assert (!nano::signal_handler_impl); - nano::signal_handler_impl = [this, &io_ctx] () { + nano::signal_handler_impl = [this, io_ctx_w = std::weak_ptr{ io_ctx }] () { logger.warn (nano::log::type::daemon, "Interrupt signal received, stopping..."); - io_ctx.stop (); + if (auto io_ctx_l = io_ctx_w.lock ()) + { + io_ctx_l->stop (); + } sig_int_or_term = 1; }; diff --git a/nano/nano_node/entry.cpp b/nano/nano_node/entry.cpp index 0b3c67a56d..fa4534e6ec 100644 --- a/nano/nano_node/entry.cpp +++ b/nano/nano_node/entry.cpp @@ -1129,8 +1129,8 @@ int main (int argc, char * const * argv) } } std::cout << boost::str (boost::format ("Starting generating %1% blocks...\n") % (count * 2)); - boost::asio::io_context io_ctx1; - boost::asio::io_context io_ctx2; + auto io_ctx1 = std::make_shared (); + auto io_ctx2 = std::make_shared (); nano::work_pool work{ network_params.network, std::numeric_limits::max () }; auto path1 (nano::unique_path ()); auto path2 (nano::unique_path ()); @@ -1283,8 +1283,8 @@ int main (int argc, char * const * argv) auto end (std::chrono::high_resolution_clock::now ()); auto time (std::chrono::duration_cast (end - begin).count ()); std::cout << boost::str (boost::format ("%|1$ 12d| us \n%2% frontiers per second\n") % time % ((count + 1) * 1000000 / time)); - io_ctx1.stop (); - io_ctx2.stop (); + io_ctx1->stop (); + io_ctx2->stop (); runner1.join (); runner2.join (); node1->stop (); diff --git a/nano/nano_rpc/entry.cpp b/nano/nano_rpc/entry.cpp index 65e1de81e9..b198d20820 100644 --- a/nano/nano_rpc/entry.cpp +++ b/nano/nano_rpc/entry.cpp @@ -49,17 +49,23 @@ void run (std::filesystem::path const & data_path, std::vector cons rpc_config.tls_config = tls_config; } - boost::asio::io_context io_ctx; + std::shared_ptr io_ctx = std::make_shared (); + nano::signal_manager sigman; try { - nano::ipc_rpc_processor ipc_rpc_processor (io_ctx, rpc_config); + nano::ipc_rpc_processor ipc_rpc_processor (*io_ctx, rpc_config); auto rpc = nano::get_rpc (io_ctx, rpc_config, ipc_rpc_processor); rpc->start (); debug_assert (!nano::signal_handler_impl); - nano::signal_handler_impl = [&io_ctx] () { - io_ctx.stop (); + nano::signal_handler_impl = [io_ctx_w = std::weak_ptr{ io_ctx }] () { + logger.warn (nano::log::type::daemon, "Interrupt signal received, stopping..."); + + if (auto io_ctx_l = io_ctx_w.lock ()) + { + io_ctx_l->stop (); + } sig_int_or_term = 1; }; diff --git a/nano/nano_wallet/entry.cpp b/nano/nano_wallet/entry.cpp index 69d8262aa7..5c3a0a29af 100644 --- a/nano/nano_wallet/entry.cpp +++ b/nano/nano_wallet/entry.cpp @@ -122,7 +122,8 @@ int run_wallet (QApplication & application, int argc, char * const * argv, std:: config.node.websocket_config.tls_config = tls_config; } - boost::asio::io_context io_ctx; + std::shared_ptr io_ctx = std::make_shared (); + nano::thread_runner runner (io_ctx, config.node.io_threads); std::shared_ptr node; diff --git a/nano/node/ipc/ipc_server.cpp b/nano/node/ipc/ipc_server.cpp index 781f4051ac..3d13d6676a 100644 --- a/nano/node/ipc/ipc_server.cpp +++ b/nano/node/ipc/ipc_server.cpp @@ -463,12 +463,13 @@ class socket_transport : public nano::ipc::transport { public: socket_transport (nano::ipc::ipc_server & server_a, ENDPOINT_TYPE endpoint_a, nano::ipc::ipc_config_transport & config_transport_a, int concurrency_a) : - server (server_a), config_transport (config_transport_a) + server (server_a), + config_transport (config_transport_a) { // Using a per-transport event dispatcher? if (concurrency_a > 0) { - io_ctx = std::make_unique (); + io_ctx = std::make_shared (); } boost::asio::socket_base::reuse_address option (true); @@ -482,7 +483,7 @@ class socket_transport : public nano::ipc::transport // A separate io_context for domain sockets may facilitate better performance on some systems. if (concurrency_a > 0) { - runner = std::make_unique (*io_ctx, static_cast (concurrency_a)); + runner = std::make_unique (io_ctx, static_cast (concurrency_a)); } } @@ -544,7 +545,7 @@ class socket_transport : public nano::ipc::transport nano::ipc::ipc_server & server; nano::ipc::ipc_config_transport & config_transport; std::unique_ptr runner; - std::unique_ptr io_ctx; + std::shared_ptr io_ctx; std::unique_ptr acceptor; }; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 054e3669f7..6f16048f3b 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -120,15 +120,16 @@ nano::keypair nano::load_or_create_node_id (std::filesystem::path const & applic } } -nano::node::node (boost::asio::io_context & io_ctx_a, uint16_t peering_port_a, std::filesystem::path const & application_path_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) : +nano::node::node (std::shared_ptr io_ctx_a, uint16_t peering_port_a, std::filesystem::path const & application_path_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) : node (io_ctx_a, application_path_a, nano::node_config (peering_port_a), work_a, flags_a, seq) { } -nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path const & application_path_a, nano::node_config const & config_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) : +nano::node::node (std::shared_ptr io_ctx_a, std::filesystem::path const & application_path_a, nano::node_config const & config_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) : + io_ctx_shared{ io_ctx_a }, + io_ctx{ *io_ctx_shared }, node_id{ load_or_create_node_id (application_path_a) }, write_database_queue (!flags_a.force_use_write_database_queue && (config_a.rocksdb_config.enable)), - io_ctx (io_ctx_a), node_initialized_latch (1), config (config_a), network_params{ config.network_params }, @@ -1414,7 +1415,7 @@ nano::node_wrapper::node_wrapper (std::filesystem::path const & path_a, std::fil auto & node_config = daemon_config.node; node_config.peering_port = 24000; - node = std::make_shared (*io_context, path_a, node_config, work, node_flags_a); + node = std::make_shared (io_context, path_a, node_config, work, node_flags_a); } nano::node_wrapper::~node_wrapper () diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 65ad016ab1..d91c9424c0 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -63,11 +63,11 @@ namespace scheduler backlog_population::config backlog_population_config (node_config const &); outbound_bandwidth_limiter::config outbound_bandwidth_limiter_config (node_config const &); -class node final : public std::enable_shared_from_this +class node final : public std::enable_shared_from_this { public: - node (boost::asio::io_context &, uint16_t, std::filesystem::path const &, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0); - node (boost::asio::io_context &, std::filesystem::path const &, nano::node_config const &, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0); + node (std::shared_ptr, uint16_t peering_port, std::filesystem::path const & application_path, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0); + node (std::shared_ptr, std::filesystem::path const & application_path, nano::node_config const &, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0); ~node (); public: @@ -134,6 +134,7 @@ class node final : public std::enable_shared_from_this public: const nano::keypair node_id; nano::write_database_queue write_database_queue; + std::shared_ptr io_ctx_shared; boost::asio::io_context & io_ctx; boost::latch node_initialized_latch; nano::node_config config; diff --git a/nano/rpc/rpc.cpp b/nano/rpc/rpc.cpp index 687c9b316e..196feee1ad 100644 --- a/nano/rpc/rpc.cpp +++ b/nano/rpc/rpc.cpp @@ -12,10 +12,11 @@ #include #endif -nano::rpc::rpc (boost::asio::io_context & io_ctx_a, nano::rpc_config config_a, nano::rpc_handler_interface & rpc_handler_interface_a) : +nano::rpc::rpc (std::shared_ptr io_ctx_a, nano::rpc_config config_a, nano::rpc_handler_interface & rpc_handler_interface_a) : config (std::move (config_a)), - acceptor (io_ctx_a), - io_ctx (io_ctx_a), + io_ctx_shared (io_ctx_a), + io_ctx (*io_ctx_shared), + acceptor (io_ctx), rpc_handler_interface (rpc_handler_interface_a) { rpc_handler_interface.rpc_instance (*this); @@ -78,7 +79,7 @@ void nano::rpc::stop () acceptor.close (); } -std::unique_ptr nano::get_rpc (boost::asio::io_context & io_ctx_a, nano::rpc_config const & config_a, nano::rpc_handler_interface & rpc_handler_interface_a) +std::unique_ptr nano::get_rpc (std::shared_ptr io_ctx_a, nano::rpc_config const & config_a, nano::rpc_handler_interface & rpc_handler_interface_a) { std::unique_ptr impl; diff --git a/nano/rpc/rpc.hpp b/nano/rpc/rpc.hpp index 99f66b260a..353d263138 100644 --- a/nano/rpc/rpc.hpp +++ b/nano/rpc/rpc.hpp @@ -20,25 +20,29 @@ class rpc_handler_interface; class rpc { public: - rpc (boost::asio::io_context & io_ctx_a, nano::rpc_config config_a, nano::rpc_handler_interface & rpc_handler_interface_a); + rpc (std::shared_ptr, nano::rpc_config config_a, nano::rpc_handler_interface & rpc_handler_interface_a); virtual ~rpc (); + void start (); - virtual void accept (); void stop (); - std::uint16_t listening_port () + virtual void accept (); + + std::uint16_t listening_port () const { return acceptor.local_endpoint ().port (); } +public: nano::logger logger{ "rpc" }; nano::rpc_config config; - boost::asio::ip::tcp::acceptor acceptor; + std::shared_ptr io_ctx_shared; boost::asio::io_context & io_ctx; + boost::asio::ip::tcp::acceptor acceptor; nano::rpc_handler_interface & rpc_handler_interface; bool stopped{ false }; }; /** Returns the correct RPC implementation based on TLS configuration */ -std::unique_ptr get_rpc (boost::asio::io_context & io_ctx_a, nano::rpc_config const & config_a, nano::rpc_handler_interface & rpc_handler_interface_a); +std::unique_ptr get_rpc (std::shared_ptr, nano::rpc_config const & config_a, nano::rpc_handler_interface & rpc_handler_interface_a); } diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 8eef6f5d3b..7d4e0705f7 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -1741,7 +1741,7 @@ TEST (rpc, version) auto const rpc_ctx = add_rpc (system, node1); boost::property_tree::ptree request1; request1.put ("action", "version"); - test_response response1 (request1, rpc_ctx.rpc->listening_port (), system.io_ctx); + test_response response1 (request1, rpc_ctx.rpc->listening_port (), *system.io_ctx); ASSERT_TIMELY (5s, response1.status != 0); ASSERT_EQ (200, response1.status); ASSERT_EQ ("1", response1.json.get ("rpc_version")); @@ -2506,7 +2506,7 @@ TEST (rpc, bootstrap) request.put ("action", "bootstrap"); request.put ("address", "::ffff:127.0.0.1"); request.put ("port", node1->network.endpoint ().port ()); - test_response response (request, rpc_ctx.rpc->listening_port (), system0.io_ctx); + test_response response (request, rpc_ctx.rpc->listening_port (), *system0.io_ctx); while (response.status == 0) { system0.poll (); @@ -6046,7 +6046,7 @@ TEST (rpc, simultaneous_calls) const auto ipc_tcp_port = ipc_server.listening_tcp_port (); ASSERT_TRUE (ipc_tcp_port.has_value ()); rpc_config.rpc_process.num_ipc_connections = 8; - nano::ipc_rpc_processor ipc_rpc_processor (system.io_ctx, rpc_config, ipc_tcp_port.value ()); + nano::ipc_rpc_processor ipc_rpc_processor (*system.io_ctx, rpc_config, ipc_tcp_port.value ()); nano::rpc rpc (system.io_ctx, rpc_config, ipc_rpc_processor); rpc.start (); boost::property_tree::ptree request; @@ -6057,7 +6057,7 @@ TEST (rpc, simultaneous_calls) std::array, num> test_responses; for (int i = 0; i < num; ++i) { - test_responses[i] = std::make_unique (request, system.io_ctx); + test_responses[i] = std::make_unique (request, *system.io_ctx); } std::promise promise; @@ -6087,7 +6087,7 @@ TEST (rpc, simultaneous_calls) rpc.stop (); system.stop (); ipc_server.stop (); - system.io_ctx.stop (); + system.io_ctx->stop (); runner.join (); } diff --git a/nano/rpc_test/rpc_context.cpp b/nano/rpc_test/rpc_context.cpp index fc5a601d59..31cadfdc4f 100644 --- a/nano/rpc_test/rpc_context.cpp +++ b/nano/rpc_test/rpc_context.cpp @@ -22,7 +22,7 @@ nano::test::rpc_context::rpc_context (std::shared_ptr & rpc_a, std::u void nano::test::wait_response_impl (nano::test::system & system, rpc_context const & rpc_ctx, boost::property_tree::ptree & request, std::chrono::duration const & time, boost::property_tree::ptree & response_json) { - test_response response (request, rpc_ctx.rpc->listening_port (), system.io_ctx); + test_response response (request, rpc_ctx.rpc->listening_port (), *system.io_ctx); ASSERT_TIMELY (time, response.status != 0); ASSERT_EQ (200, response.status); response_json = response.json; @@ -49,7 +49,7 @@ nano::test::rpc_context nano::test::add_rpc (nano::test::system & system, std::s nano::rpc_config rpc_config (node_a->network_params.network, system.get_available_port (), true); const auto ipc_tcp_port = ipc_server->listening_tcp_port (); debug_assert (ipc_tcp_port.has_value ()); - auto ipc_rpc_processor (std::make_unique (system.io_ctx, rpc_config, ipc_tcp_port.value ())); + auto ipc_rpc_processor (std::make_unique (*system.io_ctx, rpc_config, ipc_tcp_port.value ())); auto rpc (std::make_shared (system.io_ctx, rpc_config, *ipc_rpc_processor)); rpc->start (); diff --git a/nano/slow_test/bootstrap.cpp b/nano/slow_test/bootstrap.cpp index 6492ad6bd4..95662612ad 100644 --- a/nano/slow_test/bootstrap.cpp +++ b/nano/slow_test/bootstrap.cpp @@ -34,7 +34,7 @@ class rpc_wrapper node_rpc_config{}, rpc_config{ node.network_params.network, port, true }, ipc{ node, node_rpc_config }, - ipc_rpc_processor{ system.io_ctx, rpc_config }, + ipc_rpc_processor{ *system.io_ctx, rpc_config }, rpc{ system.io_ctx, rpc_config, ipc_rpc_processor } { } diff --git a/nano/test_common/system.cpp b/nano/test_common/system.cpp index f4522c62b3..379d0ec7b3 100644 --- a/nano/test_common/system.cpp +++ b/nano/test_common/system.cpp @@ -25,6 +25,49 @@ std::string nano::error_system_messages::message (int ev) const return "Invalid error code"; } +/* + * system + */ + +nano::test::system::system () : + io_ctx{ std::make_shared () } +{ + auto scale_str = std::getenv ("DEADLINE_SCALE_FACTOR"); + if (scale_str) + { + deadline_scaling_factor = std::stod (scale_str); + } +} + +nano::test::system::system (uint16_t count_a, nano::transport::transport_type type_a, nano::node_flags flags_a) : + system () +{ + nodes.reserve (count_a); + for (uint16_t i (0); i < count_a; ++i) + { + add_node (default_config (), flags_a, type_a); + } +} + +nano::test::system::~system () +{ + // Only stop system in destructor to avoid confusing and random bugs when debugging assertions that hit deadline expired condition + stop (); + +#ifndef _WIN32 + // Windows cannot remove the log and data files while they are still owned by this process. + // They will be removed later + + // Clean up tmp directories created by the tests. Since it's sometimes useful to + // see log files after test failures, an environment variable is supported to + // retain the files. + if (std::getenv ("TEST_KEEP_TMPDIRS") == nullptr) + { + nano::remove_temporary_directories (); + } +#endif +} + nano::node & nano::test::system::node (std::size_t index) const { debug_assert (index < nodes.size ()); @@ -142,44 +185,6 @@ std::shared_ptr nano::test::system::make_disconnected_node (std::opt return node; } -nano::test::system::system () -{ - auto scale_str = std::getenv ("DEADLINE_SCALE_FACTOR"); - if (scale_str) - { - deadline_scaling_factor = std::stod (scale_str); - } -} - -nano::test::system::system (uint16_t count_a, nano::transport::transport_type type_a, nano::node_flags flags_a) : - system () -{ - nodes.reserve (count_a); - for (uint16_t i (0); i < count_a; ++i) - { - add_node (default_config (), flags_a, type_a); - } -} - -nano::test::system::~system () -{ - // Only stop system in destructor to avoid confusing and random bugs when debugging assertions that hit deadline expired condition - stop (); - -#ifndef _WIN32 - // Windows cannot remove the log and data files while they are still owned by this process. - // They will be removed later - - // Clean up tmp directories created by the tests. Since it's sometimes useful to - // see log files after test failures, an environment variable is supported to - // retain the files. - if (std::getenv ("TEST_KEEP_TMPDIRS") == nullptr) - { - nano::remove_temporary_directories (); - } -#endif -} - void nano::test::system::ledger_initialization_set (std::vector const & reps, nano::amount const & reserve) { nano::block_hash previous = nano::dev::genesis->hash (); @@ -285,7 +290,7 @@ void nano::test::system::deadline_set (std::chrono::duration std::error_code nano::test::system::poll (std::chrono::nanoseconds const & wait_time) { #if NANO_ASIO_HANDLER_TRACKING == 0 - io_ctx.run_one_for (wait_time); + io_ctx->run_one_for (wait_time); #else nano::timer<> timer; timer.start (); @@ -331,7 +336,7 @@ void nano::test::system::delay_ms (std::chrono::milliseconds const & delay) auto endtime = now + delay; while (now <= endtime) { - io_ctx.run_one_for (endtime - now); + io_ctx->run_one_for (endtime - now); now = std::chrono::steady_clock::now (); } } diff --git a/nano/test_common/system.hpp b/nano/test_common/system.hpp index 4230742e90..00808006aa 100644 --- a/nano/test_common/system.hpp +++ b/nano/test_common/system.hpp @@ -74,7 +74,7 @@ namespace test uint16_t get_available_port (); public: - boost::asio::io_context io_ctx; + std::shared_ptr io_ctx; std::vector> nodes; nano::stats stats; nano::logger logger{ "tests" };