Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use coroutines in tcp listener #4581

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nano/core_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ add_executable(
fakes/websocket_client.hpp
fakes/work_peer.hpp
active_transactions.cpp
async.cpp
backlog.cpp
block.cpp
block_store.cpp
Expand Down
52 changes: 52 additions & 0 deletions nano/core_test/async.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#include <nano/lib/async.hpp>
#include <nano/lib/thread_runner.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>

#include <gtest/gtest.h>

#include <boost/asio.hpp>

#include <chrono>

using namespace std::chrono_literals;

TEST (async, sleep)
{
auto io_ctx = std::make_shared<asio::io_context> ();
nano::thread_runner runner{ io_ctx, 1 };
nano::async::strand strand{ io_ctx->get_executor () };

auto fut = asio::co_spawn (
strand,
[&] () -> asio::awaitable<void> {
co_await nano::async::sleep_for (500ms);
},
asio::use_future);

ASSERT_EQ (fut.wait_for (100ms), std::future_status::timeout);
ASSERT_EQ (fut.wait_for (1s), std::future_status::ready);
}

TEST (async, cancellation)
{
auto io_ctx = std::make_shared<asio::io_context> ();
nano::thread_runner runner{ io_ctx, 1 };
nano::async::strand strand{ io_ctx->get_executor () };

nano::async::cancellation cancellation{ strand };

auto fut = asio::co_spawn (
strand,
[&] () -> asio::awaitable<void> {
co_await nano::async::sleep_for (10s);
},
asio::bind_cancellation_slot (cancellation.slot (), asio::use_future));

ASSERT_EQ (fut.wait_for (500ms), std::future_status::timeout);

cancellation.emit ();

ASSERT_EQ (fut.wait_for (500ms), std::future_status::ready);
ASSERT_NO_THROW (fut.get ());
}
60 changes: 60 additions & 0 deletions nano/lib/async.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#pragma once

#include <nano/lib/utility.hpp>

#include <boost/asio.hpp>

namespace asio = boost::asio;

namespace nano::async
{
using strand = asio::strand<asio::io_context::executor_type>;

inline asio::awaitable<void> setup_this_coro ()
{
co_await asio::this_coro::throw_if_cancelled (false);
}

inline asio::awaitable<void> sleep_for (auto duration)
{
asio::steady_timer timer{ co_await asio::this_coro::executor };
timer.expires_after (duration);
boost::system::error_code ec; // Swallow potential error from coroutine cancellation
co_await timer.async_wait (asio::redirect_error (asio::use_awaitable, ec));
debug_assert (!ec || ec == asio::error::operation_aborted);
}

/**
* A cancellation signal that can be emitted from any thread.
* I follows the same semantics as asio::cancellation_signal.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I follows

Type-o here.

*/
class cancellation
{
public:
explicit cancellation (nano::async::strand & strand) :
strand{ strand }
{
}

void emit (asio::cancellation_type type = asio::cancellation_type::all)
{
asio::dispatch (strand, asio::use_future ([this, type] () {
signal.emit (type);
}))
.wait ();
}

auto slot ()
{
// Ensure that the slot is only connected once
debug_assert (std::exchange (slotted, true) == false);
return signal.slot ();
}

private:
nano::async::strand & strand;
asio::cancellation_signal signal;

bool slotted{ false };
};
}