Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup inactive channels from rep crawler #2132

Merged
merged 2 commits into from
Jul 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1926,6 +1926,63 @@ TEST (node, rep_weight)
ASSERT_EQ (*channel0, reps[0].channel_ref ());
}

TEST (node, rep_remove)
{
nano::system system (24000, 1);
auto & node (*system.nodes[0]);
// Add inactive UDP representative channel
nano::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), 24001);
auto channel0 (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, endpoint0));
nano::amount amount100 (100);
node.network.udp_channels.insert (endpoint0, nano::protocol_version);
nano::keypair keypair1;
node.rep_crawler.response (channel0, keypair1.pub, amount100);
ASSERT_EQ (1, node.rep_crawler.representative_count ());
auto reps (node.rep_crawler.representatives (1));
ASSERT_EQ (1, reps.size ());
ASSERT_EQ (100, reps[0].weight.number ());
ASSERT_EQ (keypair1.pub, reps[0].account);
ASSERT_EQ (*channel0, reps[0].channel_ref ());
// Add working representative
auto node1 = system.add_node (nano::node_config (24002, system.logging));
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
auto channel1 (node.network.find_channel (node1->network.endpoint ()));
ASSERT_NE (nullptr, channel1);
node.rep_crawler.response (channel1, nano::test_genesis_key.pub, nano::genesis_amount);
ASSERT_EQ (2, node.rep_crawler.representative_count ());
// Add inactive TCP representative channel
nano::node_init init;
auto node2 (std::make_shared<nano::node> (init, system.io_ctx, nano::unique_path (), system.alarm, nano::node_config (24003, system.logging), system.work));
std::atomic<bool> done{ false };
std::weak_ptr<nano::node> node_w (node.shared ());
node.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w, &done](std::shared_ptr<nano::transport::channel> channel2) {
if (auto node_l = node_w.lock ())
{
nano::keypair keypair2;
node_l->rep_crawler.response (channel2, keypair2.pub, nano::Gxrb_ratio);
ASSERT_EQ (3, node_l->rep_crawler.representative_count ());
done = true;
}
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
node2->stop ();
// Remove inactive representatives
system.deadline_set (10s);
while (node.rep_crawler.representative_count () != 1)
{
ASSERT_NO_ERROR (system.poll ());
}
reps = node.rep_crawler.representatives (1);
ASSERT_EQ (nano::test_genesis_key.pub, reps[0].account);
ASSERT_EQ (1, node.network.size ());
auto list (node.network.list (1));
ASSERT_EQ (node1->network.endpoint (), list[0]->get_endpoint ());
}

// Test that nodes can disable representative voting
TEST (node, no_voting)
{
Expand Down
50 changes: 41 additions & 9 deletions nano/node/repcrawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void nano::rep_crawler::ongoing_crawl ()
{
auto now (std::chrono::steady_clock::now ());
auto total_weight_l (total_weight ());
cleanup_reps ();
query (get_crawl_targets (total_weight_l));
auto sufficient_weight (total_weight_l > node.config.online_weight_minimum.number ());
// If online weight drops below minimum, reach out to preconfigured peers
Expand All @@ -56,22 +57,14 @@ void nano::rep_crawler::ongoing_crawl ()

std::vector<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::get_crawl_targets (nano::uint128_t total_weight_a)
{
std::unordered_set<std::shared_ptr<nano::transport::channel>> channels;
constexpr size_t conservative_count = 10;
constexpr size_t aggressive_count = 40;

// Crawl more aggressively if we lack sufficient total peer weight.
bool sufficient_weight (total_weight_a > node.config.online_weight_minimum.number ());
uint16_t required_peer_count = sufficient_weight ? conservative_count : aggressive_count;
std::lock_guard<std::mutex> lock (probable_reps_mutex);

// First, add known rep endpoints, ordered by ascending last-requested time.
for (auto i (probable_reps.get<tag_last_request> ().begin ()), n (probable_reps.get<tag_last_request> ().end ()); i != n && channels.size () < required_peer_count; ++i)
{
channels.insert (i->channel);
};

// Add additional random peers. We do this even if we have enough weight, in order to pick up reps
// Add random peers. We do this even if we have enough weight, in order to pick up reps
// that didn't respond when first observed. If the current total weight isn't sufficient, this
// will be more aggressive. When the node first starts, the rep container is empty and all
// endpoints will originate from random peers.
Expand Down Expand Up @@ -205,6 +198,45 @@ void nano::rep_crawler::on_rep_request (std::shared_ptr<nano::transport::channel
}
}

void nano::rep_crawler::cleanup_reps ()
{
std::vector<std::shared_ptr<nano::transport::channel>> channels;
{
// Check known rep channels
std::lock_guard<std::mutex> lock (probable_reps_mutex);
for (auto i (probable_reps.get<tag_last_request> ().begin ()), n (probable_reps.get<tag_last_request> ().end ()); i != n; ++i)
{
channels.push_back (i->channel);
}
}
// Remove reps with inactive channels
for (auto i : channels)
{
bool equal (false);
if (i->get_type () == nano::transport::transport_type::tcp)
{
auto find_channel (node.network.tcp_channels.find_channel (i->get_tcp_endpoint ()));
if (find_channel != nullptr && *find_channel == *static_cast<nano::transport::channel_tcp *> (i.get ()))
{
equal = true;
}
}
else if (i->get_type () == nano::transport::transport_type::udp)
{
auto find_channel (node.network.udp_channels.channel (i->get_endpoint ()));
if (find_channel != nullptr && *find_channel == *static_cast<nano::transport::channel_udp *> (i.get ()))
{
equal = true;
}
}
if (!equal)
{
std::lock_guard<std::mutex> lock (probable_reps_mutex);
probable_reps.get<tag_channel_ref> ().erase (*i);
}
}
}

std::vector<nano::representative> nano::rep_crawler::representatives (size_t count_a)
{
std::vector<representative> result;
Expand Down
3 changes: 3 additions & 0 deletions nano/node/repcrawler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ class rep_crawler
/** When a rep request is made, this is called to update the last-request timestamp. */
void on_rep_request (std::shared_ptr<nano::transport::channel> channel_a);

/** Clean representatives with inactive channels */
void cleanup_reps ();

/** Protects the probable_reps container */
mutable std::mutex probable_reps_mutex;

Expand Down
8 changes: 6 additions & 2 deletions nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ bool nano::transport::tcp_channels::insert (std::shared_ptr<nano::transport::cha
assert (endpoint.address ().is_v6 ());
auto udp_endpoint (nano::transport::map_tcp_to_endpoint (endpoint));
bool error (true);
if (!node.network.not_a_peer (udp_endpoint, node.config.allow_local_peers))
if (!node.network.not_a_peer (udp_endpoint, node.config.allow_local_peers) && !stopped)
{
std::unique_lock<std::mutex> lock (mutex);
auto existing (channels.get<endpoint_tag> ().find (endpoint));
Expand Down Expand Up @@ -340,6 +340,7 @@ void nano::transport::tcp_channels::stop ()
i->channel->response_server = nullptr;
}
}
channels.clear ();
}

bool nano::transport::tcp_channels::max_ip_connections (nano::tcp_endpoint const & endpoint_a)
Expand Down Expand Up @@ -428,7 +429,10 @@ void nano::transport::tcp_channels::ongoing_keepalive ()
node.alarm.add (std::chrono::steady_clock::now () + node.network_params.node.half_period, [node_w]() {
if (auto node_l = node_w.lock ())
{
node_l->network.tcp_channels.ongoing_keepalive ();
if (!node_l->network.tcp_channels.stopped)
{
node_l->network.tcp_channels.ongoing_keepalive ();
}
}
});
}
Expand Down