Skip to content

Commit

Permalink
Add tests for purging dead channels
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Nov 9, 2022
1 parent 602d4c8 commit 618052a
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 1 deletion.
146 changes: 146 additions & 0 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include <nano/node/network.hpp>
#include <nano/node/nodeconfig.hpp>
#include <nano/node/socket.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/transport/udp.hpp>
#include <nano/test_common/network.hpp>
Expand Down Expand Up @@ -1412,3 +1414,147 @@ TEST (network, fill_keepalive_self)
system.nodes[0]->network.fill_keepalive_self (target);
ASSERT_TRUE (target[2].port () == system.nodes[1]->network.port);
}

/*
* Tests that channel and channel container removes channels with dead local sockets
*/
TEST (network, purge_dead_channel_outgoing)
{
nano::test::system system{};

nano::node_flags flags;
// Disable non realtime sockets
flags.disable_bootstrap_bulk_push_client = true;
flags.disable_bootstrap_bulk_pull_server = true;
flags.disable_bootstrap_listener = true;
flags.disable_lazy_bootstrap = true;
flags.disable_legacy_bootstrap = true;
flags.disable_wallet_bootstrap = true;

auto & node1 = *system.add_node (flags);

// We expect one incoming and one outgoing connection
std::shared_ptr<nano::socket> outgoing;
std::shared_ptr<nano::socket> incoming;

std::atomic<int> connected_count{ 0 };
node1.observers.socket_connected.add ([&] (nano::socket & socket) {
connected_count++;
outgoing = socket.shared_from_this ();

std::cout << "connected: " << socket.remote_endpoint () << std::endl;
});

std::atomic<int> accepted_count{ 0 };
node1.observers.socket_accepted.add ([&] (nano::socket & socket) {
accepted_count++;
incoming = socket.shared_from_this ();

std::cout << "accepted: " << socket.remote_endpoint () << std::endl;
});

auto & node2 = *system.add_node (flags);

ASSERT_TIMELY_EQ (5s, connected_count, 1);
ASSERT_ALWAYS_EQ (1s, connected_count, 1);

ASSERT_TIMELY_EQ (5s, accepted_count, 1);
ASSERT_ALWAYS_EQ (1s, accepted_count, 1);

ASSERT_EQ (node1.network.size (), 1);
ASSERT_ALWAYS_EQ (1s, node1.network.size (), 1);

// Store reference to the only channel
auto channels = node1.network.list ();
ASSERT_EQ (channels.size (), 1);
auto channel = channels.front ();
ASSERT_TRUE (channel);

// When socket is dead ensure channel knows about that
ASSERT_TRUE (channel->alive ());
outgoing->close ();
ASSERT_TIMELY (5s, !channel->alive ());

// Shortly after that a new channel should be established
ASSERT_TIMELY_EQ (5s, connected_count, 2);
ASSERT_ALWAYS_EQ (1s, connected_count, 2);

// Check that a new channel is healthy
auto channels2 = node1.network.list ();
ASSERT_EQ (channels2.size (), 1);
auto channel2 = channels2.front ();
ASSERT_TRUE (channel2);
ASSERT_TRUE (channel2->alive ());
}

/*
* Tests that channel and channel container removes channels with dead remote sockets
*/
TEST (network, purge_dead_channel_incoming)
{
nano::test::system system{};

nano::node_flags flags;
// Disable non realtime sockets
flags.disable_bootstrap_bulk_push_client = true;
flags.disable_bootstrap_bulk_pull_server = true;
flags.disable_bootstrap_listener = true;
flags.disable_lazy_bootstrap = true;
flags.disable_legacy_bootstrap = true;
flags.disable_wallet_bootstrap = true;

auto & node1 = *system.add_node (flags);

// We expect one incoming and one outgoing connection
std::shared_ptr<nano::socket> outgoing;
std::shared_ptr<nano::socket> incoming;

std::atomic<int> connected_count{ 0 };
node1.observers.socket_connected.add ([&] (nano::socket & socket) {
connected_count++;
outgoing = socket.shared_from_this ();

std::cout << "connected: " << socket.remote_endpoint () << std::endl;
});

std::atomic<int> accepted_count{ 0 };
node1.observers.socket_accepted.add ([&] (nano::socket & socket) {
accepted_count++;
incoming = socket.shared_from_this ();

std::cout << "accepted: " << socket.remote_endpoint () << std::endl;
});

auto & node2 = *system.add_node (flags);

ASSERT_TIMELY_EQ (5s, connected_count, 1);
ASSERT_ALWAYS_EQ (1s, connected_count, 1);

ASSERT_TIMELY_EQ (5s, accepted_count, 1);
ASSERT_ALWAYS_EQ (1s, accepted_count, 1);

ASSERT_EQ (node2.network.size (), 1);
ASSERT_ALWAYS_EQ (1s, node2.network.size (), 1);

// Store reference to the only channel
auto channels = node2.network.list ();
ASSERT_EQ (channels.size (), 1);
auto channel = channels.front ();
ASSERT_TRUE (channel);

// When remote socket is dead ensure channel knows about that
ASSERT_TRUE (channel->alive ());
incoming->close ();
ASSERT_TIMELY (5s, !channel->alive ());

// Shortly after that a new channel should be established
ASSERT_TIMELY_EQ (5s, accepted_count, 2);
ASSERT_ALWAYS_EQ (1s, accepted_count, 2);

// Check that a new channel is healthy
auto channels2 = node2.network.list ();
ASSERT_EQ (channels2.size (), 1);
auto channel2 = channels2.front ();
ASSERT_TRUE (channel2);
ASSERT_TRUE (channel2->alive ());
}
2 changes: 1 addition & 1 deletion nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (std::
tcp_channels.list (result, minimum_version_a, include_tcp_temporary_channels_a);
udp_channels.list (result, minimum_version_a);
nano::random_pool_shuffle (result.begin (), result.end ());
if (result.size () > count_a)
if (count_a > 0 && result.size () > count_a)
{
result.resize (count_a, nullptr);
}
Expand Down
3 changes: 3 additions & 0 deletions nano/node/node_observers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class node_observers final
nano::observer_set<> disconnect;
nano::observer_set<nano::root const &> work_cancel;
nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> telemetry;

nano::observer_set<nano::socket &> socket_connected;
nano::observer_set<nano::socket &> socket_accepted;
};

std::unique_ptr<container_info_component> collect_container_info (node_observers & node_observers, std::string const & name);
Expand Down
3 changes: 3 additions & 0 deletions nano/node/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ void nano::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::fu
checkup ();
auto this_l (shared_from_this ());
set_default_timeout ();

this_l->tcp_socket.async_connect (endpoint_a,
boost::asio::bind_executor (this_l->strand,
[this_l, callback = std::move (callback_a), endpoint_a] (boost::system::error_code const & ec) {
Expand All @@ -74,6 +75,7 @@ void nano::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::fu
this_l->set_last_completion ();
}
this_l->remote = endpoint_a;
this_l->node.observers.socket_connected.notify (*this_l);
callback (ec);
}));
}
Expand Down Expand Up @@ -467,6 +469,7 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
new_connection->set_timeout (this_l->node.network_params.network.idle_timeout);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection);
this_l->node.observers.socket_accepted.notify (*new_connection);
if (cbk (new_connection, ec_a))
{
this_l->on_connection (std::move (cbk));
Expand Down
10 changes: 10 additions & 0 deletions nano/test_common/testutil.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@
ASSERT_TRUE (condition); \
}

/*
* Asserts that condition is always true during the specified amount of time
*/
#define ASSERT_ALWAYS_EQ(time, val1, val2) \
system.deadline_set (time); \
while (!system.poll ()) \
{ \
ASSERT_EQ (val1, val2); \
}

/*
* Asserts that condition is never true during the specified amount of time
*/
Expand Down

0 comments on commit 618052a

Please sign in to comment.