Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework handshake visitor #4504

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,15 @@ enum class detail : uint8_t
tcp_read_error,
tcp_write_error,

// tcp_server
handshake,
handshake_abort,
handshake_error,
handshake_network_error,
handshake_initiate,
handshake_response,
handshake_response_invalid,

// ipc
invocations,

Expand Down
160 changes: 105 additions & 55 deletions nano/node/transport/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ nano::transport::tcp_server::~tcp_server ()
return;
}

node->logger.debug (nano::log::type::tcp_server, "Exiting TCP server ({})", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Exiting TCP server ({})", fmt::streamed (remote_endpoint));

if (socket->type () == nano::transport::socket::type_t::bootstrap)
{
Expand Down Expand Up @@ -341,7 +341,7 @@ void nano::transport::tcp_server::start ()
return;
}

node->logger.debug (nano::log::type::tcp_server, "Starting TCP server ({})", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Starting TCP server ({})", fmt::streamed (remote_endpoint));

receive_message ();
}
Expand Down Expand Up @@ -374,7 +374,7 @@ void nano::transport::tcp_server::receive_message ()
node->logger.debug (nano::log::type::tcp_server, "Error reading message: {}, status: {} ({})",
ec.message (),
to_string (this_l->message_deserializer->status),
nano::util::to_str (this_l->remote_endpoint));
fmt::streamed (this_l->remote_endpoint));

this_l->stop ();
}
Expand Down Expand Up @@ -412,14 +412,18 @@ void nano::transport::tcp_server::received_message (std::unique_ptr<nano::messag
// Avoid too much noise about `duplicate_publish_message` errors
node->logger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})",
to_string (message_deserializer->status),
nano::util::to_str (remote_endpoint));
fmt::streamed (remote_endpoint));
}
}

if (should_continue)
{
receive_message ();
}
else
{
stop ();
}
}

bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message> message)
Expand All @@ -429,6 +433,7 @@ bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message
{
return false;
}

node->stats.inc (nano::stat::type::tcp_server, to_stat_detail (message->header.type), nano::stat::dir::in);

debug_assert (is_undefined_connection () || is_realtime_connection () || is_bootstrap_connection ());
Expand All @@ -448,27 +453,39 @@ bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message
{
handshake_message_visitor handshake_visitor{ shared_from_this () };
message->visit (handshake_visitor);
if (handshake_visitor.process)
{
queue_realtime (std::move (message));
return true;
}
else if (handshake_visitor.bootstrap)

switch (handshake_visitor.result)
{
if (!to_bootstrap_connection ())
case handshake_message_visitor::status::abort:
{
stop ();
return false;
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_abort);
node->logger.debug (nano::log::type::tcp_server, "Aborting handshake: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint));

return false; // Stop receiving new messages
}
}
else
{
// Neither handshake nor bootstrap received when in handshake mode
node->logger.debug (nano::log::type::tcp_server, "Neither handshake nor bootstrap received when in handshake mode: {} ({})",
nano::to_string (message->header.type),
nano::util::to_str (remote_endpoint));
case handshake_message_visitor::status::progress:
{
return true; // Continue handshake
}
case handshake_message_visitor::status::realtime:
{
queue_realtime (std::move (message));
return true; // Continue receiving new messages
}
case handshake_message_visitor::status::bootstrap:
{
if (!to_bootstrap_connection ())
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Error switching to bootstrap mode: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint));

return true;
return false; // Switch failed, stop receiving new messages
}
else
{
// Fall through to process the bootstrap message
}
}
}
}
else if (is_realtime_connection ())
Expand All @@ -481,7 +498,7 @@ bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message
}
return true;
}
// the server will switch to bootstrap mode immediately after processing the first bootstrap message, thus no `else if`
// The server will switch to bootstrap mode immediately after processing the first bootstrap message, thus no `else if`
if (is_bootstrap_connection ())
{
bootstrap_message_visitor bootstrap_visitor{ shared_from_this () };
Expand Down Expand Up @@ -518,47 +535,71 @@ void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (
{
return;
}

if (node->flags.disable_tcp_realtime)
{
node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", nano::util::to_str (server->remote_endpoint));
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", fmt::streamed (server->remote_endpoint));

// Stop invalid handshake
server->stop ();
return;
result = status::abort;
return; // Abort
}
if (!message.query && !message.response)
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", fmt::streamed (server->remote_endpoint));

if (message.query && server->handshake_query_received)
result = status::abort;
return; // Abort
}
if (message.query && server->handshake_received) // Second handshake message should be a response only
{
node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", nano::util::to_str (server->remote_endpoint));
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", fmt::streamed (server->remote_endpoint));

// Stop invalid handshake
server->stop ();
return;
result = status::abort;
return; // Abort
}

server->handshake_query_received = true;
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in);

node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", nano::util::to_str (server->remote_endpoint));
server->handshake_received = true;

node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", fmt::streamed (server->remote_endpoint));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be within if (message.query) since it says it's a query? Or reworded to handshake message?


if (message.query)
{
// Sends response + our own query
server->send_handshake_response (*message.query, message.is_v2 ());
result = status::progress; // Continue handshake
}
if (message.response)
{
if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (server->remote_endpoint)))
{
server->to_realtime_connection (message.response->node_id);
bool success = server->to_realtime_connection (message.response->node_id);
if (success)
{
result = status::realtime; // Switched to realtime
}
else
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", fmt::streamed (server->remote_endpoint));

result = status::abort;
return;
}
}
else
{
// Stop invalid handshake
server->stop ();
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response_invalid);
node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", fmt::streamed (server->remote_endpoint));

result = status::abort;
return;
}
}

process = true;
}

void nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2)
Expand All @@ -568,11 +609,13 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha
{
return;
}

auto response = node->network.prepare_handshake_response (query, v2);
auto own_query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint));
nano::node_id_handshake handshake_response{ node->network_params.network, own_query, response };

// TODO: Use channel
node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", nano::util::to_str (remote_endpoint));

auto shared_const_buffer = handshake_response.to_shared_const_buffer ();
socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) {
auto node = this_l->node.lock ();
Expand All @@ -582,36 +625,38 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha
}
if (ec)
{
node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), nano::util::to_str (this_l->remote_endpoint));
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error);
node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), fmt::streamed (this_l->remote_endpoint));

// Stop invalid handshake
this_l->stop ();
}
else
{
node->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out);
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out);
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response, nano::stat::dir::out);
}
});
}

void nano::transport::tcp_server::handshake_message_visitor::bulk_pull (const nano::bulk_pull & message)
{
bootstrap = true;
result = status::bootstrap;
}

void nano::transport::tcp_server::handshake_message_visitor::bulk_pull_account (const nano::bulk_pull_account & message)
{
bootstrap = true;
result = status::bootstrap;
}

void nano::transport::tcp_server::handshake_message_visitor::bulk_push (const nano::bulk_push & message)
{
bootstrap = true;
result = status::bootstrap;
}

void nano::transport::tcp_server::handshake_message_visitor::frontier_req (const nano::frontier_req & message)
{
bootstrap = true;
result = status::bootstrap;
}

/*
Expand Down Expand Up @@ -780,7 +825,7 @@ void nano::transport::tcp_server::timeout ()
}
if (socket->has_timed_out ())
{
node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", fmt::streamed (remote_endpoint));

{
nano::lock_guard<nano::mutex> lock{ node->tcp_listener->mutex };
Expand Down Expand Up @@ -834,7 +879,7 @@ bool nano::transport::tcp_server::to_bootstrap_connection ()
++node->tcp_listener->bootstrap_count;
socket->type_set (nano::transport::socket::type_t::bootstrap);

node->logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", fmt::streamed (remote_endpoint));

return true;
}
Expand All @@ -846,17 +891,22 @@ bool nano::transport::tcp_server::to_realtime_connection (nano::account const &
{
return false;
}
if (socket->type () == nano::transport::socket::type_t::undefined && !node->flags.disable_tcp_realtime)
if (node->flags.disable_tcp_realtime)
{
return false;
}
if (socket->type () != nano::transport::socket::type_t::undefined)
{
remote_node_id = node_id;
++node->tcp_listener->realtime_count;
socket->type_set (nano::transport::socket::type_t::realtime);
return false;
}

node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", nano::util::to_str (remote_endpoint));
remote_node_id = node_id;
++node->tcp_listener->realtime_count;
socket->type_set (nano::transport::socket::type_t::realtime);

return true;
}
return false;
node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", fmt::streamed (remote_endpoint));

return true;
}

bool nano::transport::tcp_server::is_undefined_connection () const
Expand Down
13 changes: 10 additions & 3 deletions nano/node/transport/tcp_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class tcp_server final : public std::enable_shared_from_this<tcp_server>
std::weak_ptr<nano::node> const node;
nano::mutex mutex;
std::atomic<bool> stopped{ false };
std::atomic<bool> handshake_query_received{ false };
std::atomic<bool> handshake_received{ false };
// Remote enpoint used to remove response channel even after socket closing
nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 };
nano::account remote_node_id{};
Expand Down Expand Up @@ -99,8 +99,15 @@ class tcp_server final : public std::enable_shared_from_this<tcp_server>
class handshake_message_visitor : public nano::message_visitor
{
public:
bool process{ false };
bool bootstrap{ false };
enum class status
{
abort,
progress,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps "handshake" instead of "progress" to signify what's progressing?

realtime,
bootstrap,
};

status result{ status::abort };

explicit handshake_message_visitor (std::shared_ptr<tcp_server>);

Expand Down