Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry results not correctly utilising cache timeouts #2650

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nano/core_test/node_telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ TEST (node_telemetry, all_peers_use_single_request_cache)
ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));

std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test);
std::this_thread::sleep_for (node_server->telemetry->cache_plus_buffer_cutoff_time ());

// Should be empty
responses = node_client->telemetry->get_metrics ();
Expand Down
109 changes: 82 additions & 27 deletions nano/node/telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <numeric>
#include <set>

using namespace std::chrono_literals;

nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a, bool disable_ongoing_requests_a) :
network (network_a),
alarm (alarm_a),
Expand Down Expand Up @@ -56,34 +58,69 @@ void nano::telemetry::set (nano::telemetry_data const & telemetry_data_a, nano::
}
}

std::chrono::milliseconds nano::telemetry::cache_plus_buffer_cutoff_time () const
{
// This include the waiting time for the response as well as a buffer (1 second) waiting for the alarm operation to be scheduled and completed
return cache_cutoff + response_time_cutoff + 1s;
}

bool nano::telemetry::within_cache_plus_buffer_cutoff (telemetry_info const & telemetry_info) const
{
auto is_within = (telemetry_info.last_response + cache_plus_buffer_cutoff_time ()) >= std::chrono::steady_clock::now ();
return !telemetry_info.awaiting_first_response () && is_within;
}

bool nano::telemetry::within_cache_cutoff (telemetry_info const & telemetry_info) const
{
auto is_within = (telemetry_info.last_request + nano::telemetry_cache_cutoffs::network_to_time (network_params.network)) >= std::chrono::steady_clock::now ();
auto is_within = (telemetry_info.last_response + cache_cutoff) >= std::chrono::steady_clock::now ();
return !telemetry_info.awaiting_first_response () && is_within;
}

void nano::telemetry::ongoing_req_all_peers (std::chrono::milliseconds next_request_interval)
{
// Check if any peers actually need requesting
alarm.add (std::chrono::steady_clock::now () + next_request_interval, [this_w = std::weak_ptr<telemetry> (shared_from_this ())]() {
if (auto this_l = this_w.lock ())
{
// Check if there are any peers which are in the peers list which haven't been request, or any which are below or equal to the cache cutoff time
if (!this_l->stopped)
{
auto peers = this_l->network.list (std::numeric_limits<size_t>::max (), this_l->network_params.protocol.telemetry_protocol_version_min, false);
class tag_channel
{
};

struct channel_wrapper
{
std::unordered_set<nano::endpoint> temp_peers;
std::transform (peers.begin (), peers.end (), std::inserter (temp_peers, temp_peers.end ()), [](auto const & channel_a) {
return channel_a->get_endpoint ();
});
std::shared_ptr<nano::transport::channel> channel;
channel_wrapper (std::shared_ptr<nano::transport::channel> const & channel_a) :
channel (channel_a)
{
}
nano::endpoint endpoint () const
{
return channel->get_endpoint ();
}
};

namespace mi = boost::multi_index;
boost::multi_index_container<channel_wrapper,
mi::indexed_by<
mi::hashed_unique<mi::tag<tag_endpoint>,
mi::const_mem_fun<channel_wrapper, nano::endpoint, &channel_wrapper::endpoint>>,
mi::hashed_unique<mi::tag<tag_channel>,
mi::member<channel_wrapper, std::shared_ptr<nano::transport::channel>, &channel_wrapper::channel>>>>
peers;

{
auto temp_peers = this_l->network.list (std::numeric_limits<size_t>::max (), this_l->network_params.protocol.telemetry_protocol_version_min, false);
peers.insert (temp_peers.begin (), temp_peers.end ());
}

{
// Cleanup any stale saved telemetry data for non-existent peers
nano::lock_guard<std::mutex> guard (this_l->mutex);
for (auto it = this_l->recent_or_initial_request_telemetry_data.begin (); it != this_l->recent_or_initial_request_telemetry_data.end ();)
{
if (!it->undergoing_request && !this_l->within_cache_cutoff (*it) && temp_peers.count (it->endpoint) == 0)
if (!it->undergoing_request && !this_l->within_cache_cutoff (*it) && peers.count (it->endpoint) == 0)
{
it = this_l->recent_or_initial_request_telemetry_data.erase (it);
}
Expand All @@ -93,32 +130,48 @@ void nano::telemetry::ongoing_req_all_peers (std::chrono::milliseconds next_requ
}
}

peers.erase (std::remove_if (peers.begin (), peers.end (), [&this_l](auto const & channel_a) {
// Remove from peers list if it exists and is within the cache cutoff
auto it = this_l->recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ());
return it != this_l->recent_or_initial_request_telemetry_data.end () && this_l->within_cache_cutoff (*it);
}),
peers.end ());
// Remove from peers list if it exists and is within the cache cutoff
for (auto peers_it = peers.begin (); peers_it != peers.end ();)
{
auto it = this_l->recent_or_initial_request_telemetry_data.find (peers_it->endpoint ());
if (it != this_l->recent_or_initial_request_telemetry_data.cend () && this_l->within_cache_cutoff (*it))
{
peers_it = peers.erase (peers_it);
}
else
{
++peers_it;
}
}
}

// Request data from new peers, or ones which are out of date
for (auto const & peer : peers)
for (auto const & peer : boost::make_iterator_range (peers))
{
this_l->get_metrics_single_peer_async (peer, [](auto const &) {
this_l->get_metrics_single_peer_async (peer.channel, [](auto const &) {
// Intentionally empty, just using to refresh the cache
});
}

nano::lock_guard<std::mutex> guard (this_l->mutex);
long long next_round = std::chrono::duration_cast<std::chrono::milliseconds> (nano::telemetry_cache_cutoffs::network_to_time (this_l->network_params.network)).count ();
long long next_round = std::chrono::duration_cast<std::chrono::milliseconds> (this_l->cache_cutoff + this_l->response_time_cutoff).count ();
if (!this_l->recent_or_initial_request_telemetry_data.empty ())
{
// Use the default request time unless a telemetry request cache expires sooner
auto const cache_cutoff = nano::telemetry_cache_cutoffs::network_to_time (this_l->network_params.network);
auto const last_request = this_l->recent_or_initial_request_telemetry_data.get<tag_last_updated> ().begin ()->last_request;
if (std::chrono::steady_clock::now () > last_request + cache_cutoff)
// Find the closest time with doesn't
auto range = boost::make_iterator_range (this_l->recent_or_initial_request_telemetry_data.get<tag_last_updated> ());
for (auto i : range)
{
next_round = std::min<long long> (next_round, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::steady_clock::now () - (last_request + cache_cutoff)).count ());
if (peers.count (i.endpoint) == 0)
{
auto const last_response = i.last_response;
auto now = std::chrono::steady_clock::now ();
if (now > last_response + this_l->cache_cutoff)
{
next_round = std::min<long long> (next_round, std::chrono::duration_cast<std::chrono::milliseconds> (now - (last_response + this_l->cache_cutoff)).count ());
}
break;
}
}
}

Expand All @@ -134,10 +187,9 @@ std::unordered_map<nano::endpoint, nano::telemetry_data> nano::telemetry::get_me

nano::lock_guard<std::mutex> guard (mutex);
auto range = boost::make_iterator_range (recent_or_initial_request_telemetry_data);

// clang-format off
nano::transform_if (range.begin (), range.end (), std::inserter (telemetry_data, telemetry_data.end ()),
[this](auto const & telemetry_info) { return this->within_cache_cutoff (telemetry_info); },
[this](auto const & telemetry_info) { return this->within_cache_plus_buffer_cutoff (telemetry_info); },
[](auto const & telemetry_info) { return std::pair<const nano::endpoint, nano::telemetry_data>{ telemetry_info.endpoint, telemetry_info.data }; });
// clang-format on

Expand Down Expand Up @@ -195,7 +247,6 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::trans
{
recent_or_initial_request_telemetry_data.modify (it, [](nano::telemetry_info & telemetry_info_a) {
telemetry_info_a.undergoing_request = true;
telemetry_info_a.last_request = std::chrono::steady_clock::now ();
});
}
callbacks[it->endpoint].push_back (callback_a);
Expand Down Expand Up @@ -271,8 +322,12 @@ void nano::telemetry::fire_request_message (std::shared_ptr<nano::transport::cha

void nano::telemetry::channel_processed (nano::endpoint const & endpoint_a, bool error_a)
{
if (recent_or_initial_request_telemetry_data.count (endpoint_a) > 0)
auto it = recent_or_initial_request_telemetry_data.find (endpoint_a);
if (it != recent_or_initial_request_telemetry_data.end ())
{
recent_or_initial_request_telemetry_data.modify (it, [](nano::telemetry_info & telemetry_info_a) {
telemetry_info_a.last_response = std::chrono::steady_clock::now ();
});
if (error_a)
{
recent_or_initial_request_telemetry_data.erase (endpoint_a);
Expand Down Expand Up @@ -328,10 +383,10 @@ size_t nano::telemetry::telemetry_data_size ()
return recent_or_initial_request_telemetry_data.size ();
}

nano::telemetry_info::telemetry_info (nano::endpoint const & endpoint_a, nano::telemetry_data const & data_a, std::chrono::steady_clock::time_point last_request_a, bool undergoing_request_a) :
nano::telemetry_info::telemetry_info (nano::endpoint const & endpoint_a, nano::telemetry_data const & data_a, std::chrono::steady_clock::time_point last_response_a, bool undergoing_request_a) :
endpoint (endpoint_a),
data (data_a),
last_request (last_request_a),
last_response (last_response_a),
undergoing_request (undergoing_request_a)
{
}
Expand Down
14 changes: 11 additions & 3 deletions nano/node/telemetry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ class telemetry_info final
{
public:
telemetry_info () = default;
telemetry_info (nano::endpoint const & endpoint, nano::telemetry_data const & data, std::chrono::steady_clock::time_point last_request, bool undergoing_request);
telemetry_info (nano::endpoint const & endpoint, nano::telemetry_data const & data, std::chrono::steady_clock::time_point last_response, bool undergoing_request);
bool awaiting_first_response () const;

nano::endpoint endpoint;
nano::telemetry_data data;
std::chrono::steady_clock::time_point last_request;
std::chrono::steady_clock::time_point last_response;
bool undergoing_request{ false };
uint64_t round{ 0 };
};
Expand Down Expand Up @@ -87,6 +87,11 @@ class telemetry : public std::enable_shared_from_this<telemetry>
*/
size_t telemetry_data_size ();

/*
* Returns the time for the cache, response and a small buffer for alarm operations to be scheduled and completed
*/
std::chrono::milliseconds cache_plus_buffer_cutoff_time () const;

private:
class tag_endpoint
{
Expand All @@ -111,11 +116,13 @@ class telemetry : public std::enable_shared_from_this<telemetry>
mi::hashed_unique<mi::tag<tag_endpoint>,
mi::member<nano::telemetry_info, nano::endpoint, &nano::telemetry_info::endpoint>>,
mi::ordered_non_unique<mi::tag<tag_last_updated>,
mi::member<nano::telemetry_info, std::chrono::steady_clock::time_point, &nano::telemetry_info::last_request>>>> recent_or_initial_request_telemetry_data;
mi::member<nano::telemetry_info, std::chrono::steady_clock::time_point, &nano::telemetry_info::last_response>>>> recent_or_initial_request_telemetry_data;
// clang-format on

// Anything older than this requires requesting metrics from other nodes.
std::chrono::seconds const cache_cutoff{ nano::telemetry_cache_cutoffs::network_to_time (network_params.network) };

// The maximum time spent waiting for a response to a telemetry request
std::chrono::seconds const response_time_cutoff{ is_sanitizer_build || nano::running_within_valgrind () ? 6 : 3 };

std::unordered_map<nano::endpoint, std::vector<std::function<void(telemetry_data_response const &)>>> callbacks;
Expand All @@ -128,6 +135,7 @@ class telemetry : public std::enable_shared_from_this<telemetry>
void invoke_callbacks (nano::endpoint const &, bool);

bool within_cache_cutoff (nano::telemetry_info const &) const;
bool within_cache_plus_buffer_cutoff (telemetry_info const & telemetry_info) const;
friend std::unique_ptr<nano::container_info_component> collect_container_info (telemetry & telemetry, const std::string & name);
};

Expand Down
2 changes: 1 addition & 1 deletion nano/slow_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ TEST (node_telemetry, ongoing_requests)

// Wait till the next ongoing will be called, and add a 1s buffer for the actual processing
auto time = std::chrono::steady_clock::now ();
while (std::chrono::steady_clock::now () < (time + nano::telemetry_cache_cutoffs::test + 1s))
while (std::chrono::steady_clock::now () < (time + node_client->telemetry->cache_plus_buffer_cutoff_time () + 1s))
{
ASSERT_NO_ERROR (system.poll ());
}
Expand Down