From 9ab00efb1aa9cd35d85fec2aaad734eae71bb41e Mon Sep 17 00:00:00 2001 From: SergiySW Date: Fri, 5 Jul 2019 19:32:39 +0300 Subject: [PATCH 1/2] Cleanup inactive channels from rep crawler --- nano/core_test/node.cpp | 57 +++++++++++++++++++++++++++++++++++++ nano/node/repcrawler.cpp | 50 ++++++++++++++++++++++++++------ nano/node/repcrawler.hpp | 3 ++ nano/node/transport/tcp.cpp | 8 ++++-- 4 files changed, 107 insertions(+), 11 deletions(-) diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 67ef0798a2..9586ab6024 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -1926,6 +1926,63 @@ TEST (node, rep_weight) ASSERT_EQ (*channel0, reps[0].channel_ref ()); } +TEST (node, rep_remove) +{ + nano::system system (24000, 1); + auto & node (*system.nodes[0]); + // Add inactive UDP representative channel + nano::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), 24001); + auto channel0 (std::make_shared (node.network.udp_channels, endpoint0)); + nano::amount amount100 (100); + node.network.udp_channels.insert (endpoint0, nano::protocol_version); + nano::keypair keypair1; + node.rep_crawler.response (channel0, keypair1.pub, amount100); + ASSERT_EQ (1, node.rep_crawler.representative_count ()); + auto reps (node.rep_crawler.representatives (1)); + ASSERT_EQ (1, reps.size ()); + ASSERT_EQ (100, reps[0].weight.number ()); + ASSERT_EQ (keypair1.pub, reps[0].account); + ASSERT_EQ (*channel0, reps[0].channel_ref ()); + // Add working representative + auto node1 = system.add_node (nano::node_config (24002, system.logging)); + system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); + auto channel1 (node.network.find_channel (node1->network.endpoint ())); + ASSERT_NE (nullptr, channel1); + node.rep_crawler.response (channel1, nano::test_genesis_key.pub, nano::genesis_amount); + ASSERT_EQ (2, node.rep_crawler.representative_count ()); + // Add inactive TCP representative channel + nano::node_init init; + auto node2 (std::make_shared (init, system.io_ctx, nano::unique_path (), system.alarm, nano::node_config (24003, system.logging), system.work)); + std::atomic done{ false }; + std::weak_ptr node_w (node.shared ()); + node.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w, &done](std::shared_ptr channel2) { + if (auto node_l = node_w.lock ()) + { + nano::keypair keypair2; + node_l->rep_crawler.response (channel2, keypair2.pub, nano::Gxrb_ratio); + ASSERT_EQ (3, node_l->rep_crawler.representative_count ()); + done = true; + } + }); + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + node2->stop (); + // Remove inactive representatives + system.deadline_set (10s); + while (node.rep_crawler.representative_count () != 1) + { + ASSERT_NO_ERROR (system.poll ()); + } + reps = node.rep_crawler.representatives (1); + ASSERT_EQ (nano::test_genesis_key.pub, reps[0].account); + ASSERT_EQ (1, node.network.size ()); + auto list (node.network.list (1)); + ASSERT_EQ (node1->network.endpoint (), list[0]->get_endpoint ()); +} + // Test that nodes can disable representative voting TEST (node, no_voting) { diff --git a/nano/node/repcrawler.cpp b/nano/node/repcrawler.cpp index cc0191b038..1e6e0fa267 100644 --- a/nano/node/repcrawler.cpp +++ b/nano/node/repcrawler.cpp @@ -36,6 +36,7 @@ void nano::rep_crawler::ongoing_crawl () { auto now (std::chrono::steady_clock::now ()); auto total_weight_l (total_weight ()); + cleanup_reps (); query (get_crawl_targets (total_weight_l)); auto sufficient_weight (total_weight_l > node.config.online_weight_minimum.number ()); // If online weight drops below minimum, reach out to preconfigured peers @@ -56,22 +57,14 @@ void nano::rep_crawler::ongoing_crawl () std::vector> nano::rep_crawler::get_crawl_targets (nano::uint128_t total_weight_a) { - std::unordered_set> channels; constexpr size_t conservative_count = 10; constexpr size_t aggressive_count = 40; // Crawl more aggressively if we lack sufficient total peer weight. bool sufficient_weight (total_weight_a > node.config.online_weight_minimum.number ()); uint16_t required_peer_count = sufficient_weight ? conservative_count : aggressive_count; - std::lock_guard lock (probable_reps_mutex); - - // First, add known rep endpoints, ordered by ascending last-requested time. - for (auto i (probable_reps.get ().begin ()), n (probable_reps.get ().end ()); i != n && channels.size () < required_peer_count; ++i) - { - channels.insert (i->channel); - }; - // Add additional random peers. We do this even if we have enough weight, in order to pick up reps + // Add random peers. We do this even if we have enough weight, in order to pick up reps // that didn't respond when first observed. If the current total weight isn't sufficient, this // will be more aggressive. When the node first starts, the rep container is empty and all // endpoints will originate from random peers. @@ -205,6 +198,45 @@ void nano::rep_crawler::on_rep_request (std::shared_ptr> channels; + { + // Check known rep channels + std::lock_guard lock (probable_reps_mutex); + for (auto i (probable_reps.get ().begin ()), n (probable_reps.get ().end ()); i != n; ++i) + { + channels.push_back (i->channel); + } + } + // Remove reps with inactive channels + for (auto i : channels) + { + bool equal (false); + if (i->get_type () == nano::transport::transport_type::tcp) + { + auto find_channel (node.network.tcp_channels.find_channel (i->get_tcp_endpoint ())); + if (find_channel != nullptr && *find_channel == *dynamic_cast (i.get ())) + { + equal = true; + } + } + else if (i->get_type () == nano::transport::transport_type::udp) + { + auto find_channel (node.network.udp_channels.channel (i->get_endpoint ())); + if (find_channel != nullptr && *find_channel == *dynamic_cast (i.get ())) + { + equal = true; + } + } + if (!equal) + { + std::lock_guard lock (probable_reps_mutex); + probable_reps.get ().erase (*i); + } + } +} + std::vector nano::rep_crawler::representatives (size_t count_a) { std::vector result; diff --git a/nano/node/repcrawler.hpp b/nano/node/repcrawler.hpp index a964e00d71..a673b0d50a 100644 --- a/nano/node/repcrawler.hpp +++ b/nano/node/repcrawler.hpp @@ -130,6 +130,9 @@ class rep_crawler /** When a rep request is made, this is called to update the last-request timestamp. */ void on_rep_request (std::shared_ptr channel_a); + /** Clean representatives with inactive channels */ + void cleanup_reps (); + /** Protects the probable_reps container */ mutable std::mutex probable_reps_mutex; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index cfe245a70e..c7b34c77ec 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -91,7 +91,7 @@ bool nano::transport::tcp_channels::insert (std::shared_ptr lock (mutex); auto existing (channels.get ().find (endpoint)); @@ -340,6 +340,7 @@ void nano::transport::tcp_channels::stop () i->channel->response_server = nullptr; } } + channels.clear (); } bool nano::transport::tcp_channels::max_ip_connections (nano::tcp_endpoint const & endpoint_a) @@ -428,7 +429,10 @@ void nano::transport::tcp_channels::ongoing_keepalive () node.alarm.add (std::chrono::steady_clock::now () + node.network_params.node.half_period, [node_w]() { if (auto node_l = node_w.lock ()) { - node_l->network.tcp_channels.ongoing_keepalive (); + if (!node_l->network.tcp_channels.stopped) + { + node_l->network.tcp_channels.ongoing_keepalive (); + } } }); } From d0774e6e1d4ddbe13244b040c94aaa71770fac3e Mon Sep 17 00:00:00 2001 From: SergiySW Date: Fri, 5 Jul 2019 21:36:55 +0300 Subject: [PATCH 2/2] Use static_cast --- nano/node/repcrawler.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nano/node/repcrawler.cpp b/nano/node/repcrawler.cpp index 1e6e0fa267..265510feb7 100644 --- a/nano/node/repcrawler.cpp +++ b/nano/node/repcrawler.cpp @@ -216,7 +216,7 @@ void nano::rep_crawler::cleanup_reps () if (i->get_type () == nano::transport::transport_type::tcp) { auto find_channel (node.network.tcp_channels.find_channel (i->get_tcp_endpoint ())); - if (find_channel != nullptr && *find_channel == *dynamic_cast (i.get ())) + if (find_channel != nullptr && *find_channel == *static_cast (i.get ())) { equal = true; } @@ -224,7 +224,7 @@ void nano::rep_crawler::cleanup_reps () else if (i->get_type () == nano::transport::transport_type::udp) { auto find_channel (node.network.udp_channels.channel (i->get_endpoint ())); - if (find_channel != nullptr && *find_channel == *dynamic_cast (i.get ())) + if (find_channel != nullptr && *find_channel == *static_cast (i.get ())) { equal = true; }