Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Networking cleanup continued #4495

Merged
10 changes: 5 additions & 5 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ TEST (network, peer_max_tcp_attempts)
node->network.merge_peer (node2->network.endpoint ());
}
ASSERT_EQ (0, node->network.size ());
ASSERT_TRUE (node->network.tcp_channels.reachout (nano::endpoint (node->network.endpoint ().address (), system.get_available_port ())));
ASSERT_FALSE (node->network.tcp_channels.track_reachout (nano::endpoint (node->network.endpoint ().address (), system.get_available_port ())));
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::out));
}
#endif
Expand All @@ -779,11 +779,11 @@ namespace transport
{
auto address (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0x7f000001 + i))); // 127.0.0.1 hex
nano::endpoint endpoint (address, system.get_available_port ());
ASSERT_FALSE (node->network.tcp_channels.reachout (endpoint));
ASSERT_TRUE (node->network.tcp_channels.track_reachout (endpoint));
}
ASSERT_EQ (0, node->network.size ());
ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::out));
ASSERT_TRUE (node->network.tcp_channels.reachout (nano::endpoint (boost::asio::ip::make_address_v6 ("::ffff:127.0.0.1"), system.get_available_port ())));
ASSERT_FALSE (node->network.tcp_channels.track_reachout (nano::endpoint (boost::asio::ip::make_address_v6 ("::ffff:127.0.0.1"), system.get_available_port ())));
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::out));
}
}
Expand Down Expand Up @@ -974,7 +974,7 @@ TEST (network, tcp_no_connect_excluded_peers)
ASSERT_EQ (nullptr, node0->network.find_node_id (node1->get_node_id ()));

// Should not actively reachout to excluded peers
ASSERT_TRUE (node0->network.reachout (node1->network.endpoint (), true));
ASSERT_FALSE (node0->network.track_reachout (node1->network.endpoint ()));

// Erasing from excluded peers should allow a connection
node0->network.excluded_peers.remove (endpoint1_tcp);
Expand Down Expand Up @@ -1080,7 +1080,7 @@ TEST (network, cleanup_purge)
ASSERT_EQ (1, node1.network.size ());

node1.network.cleanup (std::chrono::steady_clock::now ());
ASSERT_EQ (0, node1.network.size ());
ASSERT_TIMELY_EQ (5s, 0, node1.network.size ());
}

TEST (network, loopback_channel)
Expand Down
11 changes: 6 additions & 5 deletions nano/core_test/peer_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,18 +217,19 @@ TEST (peer_container, reachout)
auto outer_node1 = nano::test::add_outer_node (system);
ASSERT_NE (nullptr, nano::test::establish_tcp (system, node1, outer_node1->network.endpoint ()));
// Make sure having been contacted by them already indicates we shouldn't reach out
ASSERT_TRUE (node1.network.reachout (outer_node1->network.endpoint ()));
ASSERT_FALSE (node1.network.track_reachout (outer_node1->network.endpoint ()));
auto outer_node2 = nano::test::add_outer_node (system);
ASSERT_FALSE (node1.network.reachout (outer_node2->network.endpoint ()));
ASSERT_TRUE (node1.network.track_reachout (outer_node2->network.endpoint ()));
ASSERT_NE (nullptr, nano::test::establish_tcp (system, node1, outer_node2->network.endpoint ()));
// Reaching out to them once should signal we shouldn't reach out again.
ASSERT_TRUE (node1.network.reachout (outer_node2->network.endpoint ()));
ASSERT_FALSE (node1.network.track_reachout (outer_node2->network.endpoint ()));
// Make sure we don't purge new items
node1.network.cleanup (std::chrono::steady_clock::now () - std::chrono::seconds (10));
ASSERT_TRUE (node1.network.reachout (outer_node2->network.endpoint ()));
ASSERT_FALSE (node1.network.track_reachout (outer_node2->network.endpoint ()));
// Make sure we purge old items
node1.network.cleanup (std::chrono::steady_clock::now () + std::chrono::seconds (10));
ASSERT_FALSE (node1.network.reachout (outer_node2->network.endpoint ()));
ASSERT_TIMELY (5s, node1.network.empty ());
ASSERT_TRUE (node1.network.track_reachout (outer_node2->network.endpoint ()));
}

// This test is similar to network.filter_invalid_version_using with the difference that
Expand Down
4 changes: 2 additions & 2 deletions nano/core_test/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,15 +420,15 @@ TEST (socket, drop_policy)
});

auto client = std::make_shared<nano::transport::socket> (*node);
nano::transport::channel_tcp channel{ *node, client };
auto channel = std::make_shared<nano::transport::channel_tcp> (*node, client);
nano::test::counted_completion write_completion (static_cast<unsigned> (total_message_count));

client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), listener->endpoint ().port ()),
[&channel, total_message_count, node, &write_completion, &drop_policy, client] (boost::system::error_code const & ec_a) mutable {
for (int i = 0; i < total_message_count; i++)
{
std::vector<uint8_t> buff (1);
channel.send_buffer (
channel->send_buffer (
nano::shared_const_buffer (std::move (buff)), [&write_completion, client] (boost::system::error_code const & ec, size_t size_a) mutable {
client.reset ();
write_completion.increment ();
Expand Down
1 change: 1 addition & 0 deletions nano/lib/logging_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ enum class type
tcp,
tcp_server,
tcp_listener,
tcp_channels,
prunning,
conf_processor_bounded,
conf_processor_unbounded,
Expand Down
31 changes: 31 additions & 0 deletions nano/lib/random.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include <random>

namespace nano
{
/**
* Not safe for any crypto related code, use for non-crypto PRNG only.
*/
class random_generator final
{
public:
/// Generate a random number in the range [min, max)
auto random (auto min, auto max)
{
release_assert (min < max);
std::uniform_int_distribution<decltype (min)> dist (min, max - 1);
return dist (rng);
}

/// Generate a random number in the range [0, max)
auto random (auto max)
{
return random (decltype (max){ 0 }, max);
}

private:
std::random_device device;
std::default_random_engine rng{ device () };
};
}
7 changes: 6 additions & 1 deletion nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ enum class type : uint8_t
http_callback,
ipc,
tcp,
tcp_channels,
channel,
socket,
confirmation_height,
Expand Down Expand Up @@ -70,7 +71,6 @@ enum class detail : uint8_t
ok,
loop,
loop_cleanup,
loop_keepalive,
total,
process,
processed,
Expand Down Expand Up @@ -216,6 +216,11 @@ enum class detail : uint8_t
message_size_too_big,
outdated_version,

// network
loop_keepalive,
loop_reachout,
merge_peer,

// tcp
tcp_accept_success,
tcp_accept_failure,
Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::network_keepalive:
thread_role_name_string = "Net keepalive";
break;
case nano::thread_role::name::network_reachout:
thread_role_name_string = "Net reachout";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ enum class name
rep_tiers,
network_cleanup,
network_keepalive,
network_reachout,
};

/*
Expand Down
68 changes: 56 additions & 12 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ void nano::network::start ()
run_keepalive ();
});

reachout_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::network_reachout);
run_reachout ();
});

if (!node.flags.disable_tcp_realtime)
{
tcp_channels.start ();
Expand Down Expand Up @@ -87,6 +92,10 @@ void nano::network::stop ()
{
cleanup_thread.join ();
}
if (reachout_thread.joinable ())
{
reachout_thread.join ();
}

port = 0;
}
Expand Down Expand Up @@ -126,12 +135,11 @@ void nano::network::run_cleanup ()
while (!stopped)
{
condition.wait_for (lock, node.network_params.network.is_dev_network () ? 1s : 5s);
lock.unlock ();

if (stopped)
{
return;
}
lock.unlock ();

node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_cleanup);

Expand All @@ -154,18 +162,54 @@ void nano::network::run_keepalive ()
while (!stopped)
{
condition.wait_for (lock, node.network_params.network.keepalive_period);
lock.unlock ();

if (stopped)
{
return;
}
lock.unlock ();

node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_keepalive);

flood_keepalive (0.75f);
flood_keepalive_self (0.25f);

tcp_channels.keepalive ();

lock.lock ();
}
}

void nano::network::run_reachout ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait_for (lock, node.network_params.network.merge_period);
if (stopped)
{
return;
}
lock.unlock ();

node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_reachout);

auto keepalive = tcp_channels.sample_keepalive ();
if (keepalive)
{
for (auto const & peer : keepalive->peers)
{
if (stopped)
{
return;
}

merge_peer (peer);

// Throttle reachout attempts
std::this_thread::sleep_for (node.network_params.network.merge_period);
}
}

lock.lock ();
}
}
Expand Down Expand Up @@ -411,10 +455,11 @@ void nano::network::merge_peers (std::array<nano::endpoint, 8> const & peers_a)

void nano::network::merge_peer (nano::endpoint const & peer_a)
{
if (!reachout (peer_a, node.config.allow_local_peers))
if (track_reachout (peer_a))
{
std::weak_ptr<nano::node> node_w (node.shared ());
node.network.tcp_channels.start_tcp (peer_a);
node.stats.inc (nano::stat::type::network, nano::stat::detail::merge_peer);

tcp_channels.start_tcp (peer_a);
}
}

Expand All @@ -436,15 +481,14 @@ bool nano::network::not_a_peer (nano::endpoint const & endpoint_a, bool allow_lo
return result;
}

bool nano::network::reachout (nano::endpoint const & endpoint_a, bool allow_local_peers)
bool nano::network::track_reachout (nano::endpoint const & endpoint_a)
{
// Don't contact invalid IPs
bool error = not_a_peer (endpoint_a, allow_local_peers);
if (!error)
if (not_a_peer (endpoint_a, node.config.allow_local_peers))
{
error = tcp_channels.reachout (endpoint_a);
return false;
}
return error;
return tcp_channels.track_reachout (endpoint_a);
}

std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (std::size_t count_a, uint8_t minimum_version_a, bool include_tcp_temporary_channels_a)
Expand Down
8 changes: 5 additions & 3 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ class network final
void send_keepalive_self (std::shared_ptr<nano::transport::channel> const &);
std::shared_ptr<nano::transport::channel> find_node_id (nano::account const &);
std::shared_ptr<nano::transport::channel> find_channel (nano::endpoint const &);
bool not_a_peer (nano::endpoint const &, bool);
// Should we reach out to this endpoint with a keepalive message
bool reachout (nano::endpoint const &, bool = false);
bool not_a_peer (nano::endpoint const &, bool allow_local_peers);
// Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt
bool track_reachout (nano::endpoint const &);
std::deque<std::shared_ptr<nano::transport::channel>> list (std::size_t max_count = 0, uint8_t = 0, bool = true);
std::deque<std::shared_ptr<nano::transport::channel>> list_non_pr (std::size_t);
// Desired fanout for a given scale
Expand Down Expand Up @@ -111,6 +111,7 @@ class network final
void run_processing ();
void run_cleanup ();
void run_keepalive ();
void run_reachout ();
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);

private: // Dependencies
Expand All @@ -137,6 +138,7 @@ class network final
std::vector<boost::thread> processing_threads; // Using boost::thread to enable increased stack size
std::thread cleanup_thread;
std::thread keepalive_thread;
std::thread reachout_thread;

public:
static unsigned const broadcast_interval_ms = 10;
Expand Down
20 changes: 14 additions & 6 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,6 @@ void nano::node::stop ()
generator.stop ();
final_generator.stop ();
confirmation_height_processor.stop ();
network.stop ();
telemetry.stop ();
websocket.stop ();
bootstrap_server.stop ();
Expand All @@ -696,6 +695,8 @@ void nano::node::stop ()
epoch_upgrader.stop ();
workers.stop ();
local_block_broadcaster.stop ();
network.stop (); // Stop network last to avoid killing in-use sockets

// work pool is not stopped on purpose due to testing setup
}

Expand Down Expand Up @@ -1116,15 +1117,22 @@ void nano::node::add_initial_peers ()
return;
}

auto transaction (store.tx_begin_read ());
for (auto i (store.peer.begin (transaction)), n (store.peer.end ()); i != n; ++i)
std::vector<nano::endpoint> initial_peers;
{
nano::endpoint endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ());
if (!network.reachout (endpoint, config.allow_local_peers))
auto transaction = store.tx_begin_read ();
for (auto i (store.peer.begin (transaction)), n (store.peer.end ()); i != n; ++i)
{
network.tcp_channels.start_tcp (endpoint);
nano::endpoint endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ());
initial_peers.push_back (endpoint);
}
}

logger.info (nano::log::type::node, "Adding cached initial peers: {}", initial_peers.size ());

for (auto const & peer : initial_peers)
{
network.merge_peer (peer);
}
}

void nano::node::start_election (std::shared_ptr<nano::block> const & block)
Expand Down
3 changes: 3 additions & 0 deletions nano/node/transport/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class channel
nano::transport::traffic_type = nano::transport::traffic_type::generic)
= 0;

virtual void close () = 0;

virtual std::string to_string () const = 0;
virtual nano::endpoint get_endpoint () const = 0;
virtual nano::tcp_endpoint get_tcp_endpoint () const = 0;
Expand All @@ -50,6 +52,7 @@ class channel
{
return false;
}

virtual bool alive () const
{
return true;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/fake.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace transport
return nano::transport::transport_type::fake;
}

void close ()
void close () override
{
closed = true;
}
Expand Down
Loading
Loading