Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add handling of exceptions to named_thread_pool #632

Merged
merged 11 commits into from
Jan 25, 2023
27 changes: 16 additions & 11 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,19 @@ struct controller_impl {
conf( cfg ),
chain_id( chain_id ),
read_mode( cfg.read_mode ),
thread_pool( "chain", cfg.thread_pool_size )
thread_pool( "chain" )
{
fork_db.open( [this]( block_timestamp_type timestamp,
const flat_set<digest_type>& cur_features,
const vector<digest_type>& new_features )
{ check_protocol_features( timestamp, cur_features, new_features ); }
);

thread_pool.start( cfg.thread_pool_size, [this]( const fc::exception& e ) {
elog( "Exception in chain thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
if( shutdown ) shutdown();
} );

set_activation_handler<builtin_protocol_feature_t::preactivate_feature>();
set_activation_handler<builtin_protocol_feature_t::replace_deferred>();
set_activation_handler<builtin_protocol_feature_t::get_sender>();
Expand Down Expand Up @@ -410,7 +415,7 @@ struct controller_impl {
std::vector<std::future<std::vector<char>>> v;
v.reserve( branch.size() );
for( auto bitr = branch.rbegin(); bitr != branch.rend(); ++bitr ) {
v.emplace_back( async_thread_pool( thread_pool.get_executor(), [b=(*bitr)->block]() { return fc::raw::pack(*b); } ) );
v.emplace_back( post_async_task( thread_pool.get_executor(), [b=(*bitr)->block]() { return fc::raw::pack(*b); } ) );
}
auto it = v.begin();

Expand Down Expand Up @@ -1835,17 +1840,17 @@ struct controller_impl {

auto& bb = std::get<building_block>(pending->_block_stage);

auto action_merkle_fut = async_thread_pool( thread_pool.get_executor(),
[ids{std::move( bb._action_receipt_digests )}]() mutable {
return merkle( std::move( ids ) );
} );
auto action_merkle_fut = post_async_task( thread_pool.get_executor(),
[ids{std::move( bb._action_receipt_digests )}]() mutable {
return merkle( std::move( ids ) );
} );
const bool calc_trx_merkle = !std::holds_alternative<checksum256_type>(bb._trx_mroot_or_receipt_digests);
std::future<checksum256_type> trx_merkle_fut;
if( calc_trx_merkle ) {
trx_merkle_fut = async_thread_pool( thread_pool.get_executor(),
[ids{std::move( std::get<digests_t>(bb._trx_mroot_or_receipt_digests) )}]() mutable {
return merkle( std::move( ids ) );
} );
trx_merkle_fut = post_async_task( thread_pool.get_executor(),
[ids{std::move( std::get<digests_t>(bb._trx_mroot_or_receipt_digests) )}]() mutable {
return merkle( std::move( ids ) );
} );
}

// Update resource limits:
Expand Down Expand Up @@ -2170,7 +2175,7 @@ struct controller_impl {
std::future<block_state_ptr> create_block_state_future( const block_id_type& id, const signed_block_ptr& b ) {
EOS_ASSERT( b, block_validate_exception, "null block" );

return async_thread_pool( thread_pool.get_executor(), [b, id, control=this]() {
return post_async_task( thread_pool.get_executor(), [b, id, control=this]() {
// no reason for a block_state if fork_db already knows about block
auto existing = control->fork_db.get_block( id );
EOS_ASSERT( !existing, fork_database_exception, "we already know about this block: ${id}", ("id", id) );
Expand Down
29 changes: 17 additions & 12 deletions libraries/chain/include/eosio/chain/thread_utils.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <fc/exception/exception.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <future>
Expand All @@ -10,25 +11,30 @@
namespace eosio { namespace chain {

/**
* Wrapper class for boost asio thread pool and io_context run.
* Wrapper class for thread pool of boost asio io_context run.
* Also names threads so that tools like htop can see thread name.
*/
class named_thread_pool {
public:
// name_prefix is name appended with -## of thread.
// short name_prefix (6 chars or under) is recommended as console_appender uses 9 chars for thread name
/// @param delay_start do not spawn threads in constructor, wait for start() call
named_thread_pool( std::string name_prefix, size_t num_threads, bool delay_start = false );
using on_except_t = std::function<void(const fc::exception& e)>;

// calls stop()
/// @param name_prefix is name appended with -## of thread.
/// A short name_prefix (6 chars or under) is recommended as console_appender uses 9 chars
/// for the thread name.
explicit named_thread_pool( std::string name_prefix );

/// calls stop()
~named_thread_pool();

boost::asio::io_context& get_executor() { return _ioc; }

/// Spawn threads, can be re-started after stop().
/// Assumes start()/stop() called from the same thread or externally protected.
/// @param num_threads is number of threads spawned
/// @param on_except is the function to call if io_context throws an exception, is called from thread pool thread.
/// if an empty function then logs and rethrows exception on thread which will terminate.
/// @throw assert_exception if already started and not stopped.
void start();
void start( size_t num_threads, on_except_t on_except );

/// destroy work guard, stop io_context, join thread_pool
void stop();
Expand All @@ -37,18 +43,17 @@ namespace eosio { namespace chain {
using ioc_work_t = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;

std::string _name_prefix;
size_t _num_threads;
std::vector<std::thread> _thread_pool;
boost::asio::io_context _ioc;
std::vector<std::thread> _thread_pool;
std::optional<ioc_work_t> _ioc_work;
};


// async on thread_pool and return future
// async on io_context and return future
template<typename F>
auto async_thread_pool( boost::asio::io_context& thread_pool, F&& f ) {
auto post_async_task( boost::asio::io_context& ioc, F&& f ) {
auto task = std::make_shared<std::packaged_task<decltype( f() )()>>( std::forward<F>( f ) );
boost::asio::post( thread_pool, [task]() { (*task)(); } );
boost::asio::post( ioc, [task]() { (*task)(); } );
return task->get_future();
}

Expand Down
47 changes: 35 additions & 12 deletions libraries/chain/thread_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,52 @@

namespace eosio { namespace chain {

named_thread_pool::named_thread_pool( std::string name_prefix, size_t num_threads, bool delay_start )
named_thread_pool::named_thread_pool( std::string name_prefix )
: _name_prefix( std::move(name_prefix) )
, _num_threads( num_threads )
, _ioc( num_threads )
, _ioc()
{
if( !delay_start ) {
start();
}
}

named_thread_pool::~named_thread_pool() {
stop();
}

void named_thread_pool::start() {
void named_thread_pool::start( size_t num_threads, on_except_t on_except ) {
FC_ASSERT( !_ioc_work, "Thread pool already started" );
_ioc_work.emplace( boost::asio::make_work_guard( _ioc ) );
_ioc.restart();
for( size_t i = 0; i < _num_threads; ++i ) {
_thread_pool.emplace_back( [&ioc = _ioc, &name_prefix = _name_prefix, i]() {
_thread_pool.reserve( num_threads );
for( size_t i = 0; i < num_threads; ++i ) {
_thread_pool.emplace_back( [&ioc = _ioc, &name_prefix = _name_prefix, on_except, i]() {
std::string tn = name_prefix + "-" + std::to_string( i );
fc::set_os_thread_name( tn );
ioc.run();
try {
fc::set_os_thread_name( tn );
ioc.run();
} catch( const fc::exception& e ) {
if( on_except ) {
on_except( e );
} else {
elog( "Exiting thread ${t} on exception: ${e}", ("t", tn)("e", e.to_detail_string()) );
throw;
}
} catch( const std::exception& e ) {
fc::std_exception_wrapper se( FC_LOG_MESSAGE( warn, "${what}: ", ("what", e.what()) ),
std::current_exception(), BOOST_CORE_TYPEID( e ).name(), e.what() );
if( on_except ) {
on_except( se );
} else {
elog( "Exiting thread ${t} on exception: ${e}", ("t", tn)("e", se.to_detail_string()) );
throw;
}
} catch( ... ) {
if( on_except ) {
fc::unhandled_exception ue( FC_LOG_MESSAGE( warn, "unknown exception" ), std::current_exception() );
on_except( ue );
} else {
elog( "Exiting thread ${t} on unknown exception", ("t", tn) );
throw;
}
}
} );
}
}
Expand All @@ -41,4 +64,4 @@ void named_thread_pool::stop() {
}


} } // eosio::chain
} } // eosio::chain
2 changes: 1 addition & 1 deletion libraries/chain/transaction_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ recover_keys_future transaction_metadata::start_recover_keys( packed_transaction
trx_type t,
uint32_t max_variable_sig_size )
{
return async_thread_pool( thread_pool, [trx{std::move(trx)}, chain_id, time_limit, t, max_variable_sig_size]() mutable {
return post_async_task( thread_pool, [trx{std::move(trx)}, chain_id, time_limit, t, max_variable_sig_size]() mutable {
fc::time_point deadline = time_limit == fc::microseconds::maximum() ?
fc::time_point::maximum() : fc::time_point::now() + time_limit;
check_variable_sig_size( trx, max_variable_sig_size );
Expand Down
3 changes: 2 additions & 1 deletion plugins/chain_plugin/test/test_account_query_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ BOOST_FIXTURE_TEST_CASE(updateauth_test_multi_threaded, TESTER) { try {
produce_block();
create_account(tester_account);

named_thread_pool thread_pool( "test", 5 );
named_thread_pool thread_pool( "test" );
thread_pool.start( 5, {} );

for( size_t i = 0; i < 100; ++i ) {
boost::asio::post( thread_pool.get_executor(), [&aq_db, tester_account, role]() {
Expand Down
11 changes: 6 additions & 5 deletions plugins/http_plugin/http_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,11 @@ class http_plugin_impl : public std::enable_shared_from_this<http_plugin_impl> {
app().post(appbase::priority::high, [this] ()
{
try {
my->plugin_state->thread_pool =
std::make_unique<eosio::chain::named_thread_pool>( "http", my->plugin_state->thread_pool_size );
my->plugin_state->thread_pool.start( my->plugin_state->thread_pool_size, [](const fc::exception& e) {
fc_elog( logger(), "Exception in http thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
app().quit();
} );

if(my->listen_endpoint) {
try {
my->create_beast_server(false);
Expand Down Expand Up @@ -465,9 +468,7 @@ class http_plugin_impl : public std::enable_shared_from_this<http_plugin_impl> {
if(my->beast_unix_server)
my->beast_unix_server->stop_listening();

if( my->plugin_state->thread_pool ) {
my->plugin_state->thread_pool->stop();
}
my->plugin_state->thread_pool.stop();

my->beast_server.reset();
my->beast_https_server.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class beast_http_listener : public std::enable_shared_from_this<beast_http_liste
beast_http_listener& operator=(const beast_http_listener&) = delete;
beast_http_listener& operator=(beast_http_listener&&) = delete;

beast_http_listener(std::shared_ptr<http_plugin_state> plugin_state) : is_listening_(false), plugin_state_(std::move(plugin_state)), acceptor_(plugin_state_->thread_pool->get_executor()), socket_(plugin_state_->thread_pool->get_executor()), accept_error_timer_(plugin_state_->thread_pool->get_executor()) {}
beast_http_listener(std::shared_ptr<http_plugin_state> plugin_state) : is_listening_(false), plugin_state_(std::move(plugin_state)), acceptor_(plugin_state_->thread_pool.get_executor()), socket_(plugin_state_->thread_pool.get_executor()), accept_error_timer_(plugin_state_->thread_pool.get_executor()) {}

virtual ~beast_http_listener() {
try {
Expand Down Expand Up @@ -99,7 +99,7 @@ class beast_http_listener : public std::enable_shared_from_this<beast_http_liste

void stop_listening() {
if(is_listening_) {
plugin_state_->thread_pool->stop();
plugin_state_->thread_pool.stop();
is_listening_ = false;
}
}
Expand Down
4 changes: 2 additions & 2 deletions plugins/http_plugin/include/eosio/http_plugin/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ struct http_plugin_state {
bool keep_alive = false;

uint16_t thread_pool_size = 2;
std::unique_ptr<eosio::chain::named_thread_pool> thread_pool;
eosio::chain::named_thread_pool thread_pool{ "http" };

fc::logger& logger;

Expand Down Expand Up @@ -162,7 +162,7 @@ auto make_http_response_handler(std::shared_ptr<http_plugin_state> plugin_state,
plugin_state->bytes_in_flight += payload_size;

// post back to an HTTP thread to allow the response handler to be called from any thread
boost::asio::post(plugin_state->thread_pool->get_executor(),
boost::asio::post(plugin_state->thread_pool.get_executor(),
[plugin_state, session_ptr, code, deadline, start, payload_size, response = std::move(response)]() {
try {
plugin_state->bytes_in_flight -= payload_size;
Expand Down
Loading