Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bandwidth limiting #2035

Merged
merged 33 commits into from
Jun 11, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8d8a6b1
bandwidth_limit in bytes added to config
May 27, 2019
4e2049d
bandwidth limiter class
May 27, 2019
cfe89a1
hook to nano::transport::channel
May 27, 2019
2aa91e3
Merge branch 'master' into network_limit
May 27, 2019
c163f01
initialize array with brace-enclosed initializer
May 27, 2019
d9a9ba3
Merge branch 'master' into network_limit
May 27, 2019
7d39876
correctly 1.5Mb
May 27, 2019
a9d39df
update tests to account for correct math and full confirm_ack confirm…
May 27, 2019
920c0a0
update config test
May 27, 2019
ba99e9f
formatting
May 27, 2019
1fbd0d2
update default value in error text
May 27, 2019
a8d3f8d
add logging message and convenience detail_raw_to_string for dropped …
May 28, 2019
f4b016a
should always log dropped messages, this could be improved with a log…
May 28, 2019
416e2a7
stuff under logging;packet logging
May 28, 2019
4e4702d
Merge branch 'master' into network_limit
May 28, 2019
78f074f
typo and log out limit on node start
May 28, 2019
ef8f5db
remove duplicate
May 28, 2019
2eefa02
Readability and simplification
May 29, 2019
c207c6e
Merge branch 'master' into network_limit
May 30, 2019
89306dd
use static constexpr in function
May 31, 2019
2518673
unsigned int < 0 always false
May 31, 2019
053887f
add republish_vote to could_drop
May 31, 2019
959054d
trend rate over minimum of 1 sec
Jun 1, 2019
e0c64bf
update test to validate ramp down to 0 rate again
Jun 1, 2019
0052dc9
merge master
Jun 7, 2019
bfbe9ad
remove unused variables in tests
Jun 10, 2019
48ad5a4
clean up tests and add multiple limiters
Jun 10, 2019
7f29d21
Merge branch 'master' into network_limit
Jun 10, 2019
85a6d80
formatting
Jun 10, 2019
fb4b334
fix merge issues
Jun 11, 2019
2b95564
allow dropping of keep alives
Jun 11, 2019
a4d7373
whitespace
Jun 11, 2019
24b7b65
use send instead of send_buffer a couple more places
Jun 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 8 additions & 58 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2211,10 +2211,6 @@ TEST (network, replace_port)
TEST (bandwidth_limiter, validate)
{
size_t full_confirm_ack (488 + 8);
size_t full_confirm_req (385 + 8);

auto detail_req = nano::stat::detail::confirm_req;
auto detail_ack = nano::stat::detail::confirm_ack;

// test unbounded
{
Expand All @@ -2223,7 +2219,7 @@ TEST (bandwidth_limiter, validate)

while (now + 1s >= std::chrono::steady_clock::now ())
{
ASSERT_FALSE (limiter.should_drop (detail_ack, full_confirm_ack));
ASSERT_FALSE (limiter.should_drop (full_confirm_ack));
std::this_thread::sleep_for (50ms);
}
}
Expand All @@ -2235,14 +2231,15 @@ TEST (bandwidth_limiter, validate)

while (now + 1s >= std::chrono::steady_clock::now ())
{
auto should_drop (limiter.should_drop (detail_ack, full_confirm_ack));
auto should_drop (limiter.should_drop (full_confirm_ack));
std::this_thread::sleep_for (50ms);
}

ASSERT_LT (limiter.get_rate (), config.bandwidth_limit);
//adding another should not drop as 1s has elapsed
ASSERT_EQ (limiter.should_drop (detail_ack, full_confirm_ack), 0);
//rate should have reset when time was reset and should now be 144 bytes
ASSERT_EQ (limiter.should_drop (full_confirm_ack), 0);
std::this_thread::sleep_for (1s);
//rate should have reset when time was reset and should now be full_confirm_ack
ASSERT_EQ (limiter.get_rate (), full_confirm_ack);
}
// test bounded 3Mb
Expand All @@ -2253,63 +2250,16 @@ TEST (bandwidth_limiter, validate)

while (now + 1s >= std::chrono::steady_clock::now ())
{
auto should_drop (limiter.should_drop (detail_ack, full_confirm_ack));
auto should_drop (limiter.should_drop (full_confirm_ack));
std::this_thread::sleep_for (50ms);
}

//max we can send per second would be 20 messages of payload size 144 bytes passing 3Mbps drops messages
ASSERT_LT (limiter.get_rate (), config.bandwidth_limit * 2);
//adding another should not drop as 1s has elapsed
ASSERT_EQ (limiter.should_drop (detail_ack, full_confirm_ack), 0);
ASSERT_EQ (limiter.should_drop (full_confirm_ack), 0);
//rate should have reset when time was reset and should now be 144 bytes
std::this_thread::sleep_for (1s);
ASSERT_EQ (limiter.get_rate (), full_confirm_ack);
}
// test unbounded req
{
nano::bandwidth_limiter limiter (0);
auto now (std::chrono::steady_clock::now ());

while (now + 1s >= std::chrono::steady_clock::now ())
{
ASSERT_FALSE (limiter.should_drop (detail_req, full_confirm_req));
std::this_thread::sleep_for (50ms);
}
}
// test bounded default req
{
nano::node_config config;
nano::bandwidth_limiter limiter (config.bandwidth_limit);
auto now (std::chrono::steady_clock::now ());

while (now + 1s >= std::chrono::steady_clock::now ())
{
auto should_drop (limiter.should_drop (detail_req, full_confirm_req));
std::this_thread::sleep_for (50ms);
}

ASSERT_LT (limiter.get_rate (), config.bandwidth_limit);
//adding another should not drop as 1s has elapsed
ASSERT_EQ (limiter.should_drop (detail_req, full_confirm_req), 0);
//rate should have reset when time was reset and should now be 144 bytes
ASSERT_EQ (limiter.get_rate (), full_confirm_req);
}
// test bounded 3Mb req
{
nano::node_config config;
nano::bandwidth_limiter limiter (config.bandwidth_limit * 2);
auto now (std::chrono::steady_clock::now ());

while (now + 1s >= std::chrono::steady_clock::now ())
{
auto should_drop (limiter.should_drop (detail_req, full_confirm_req));
std::this_thread::sleep_for (50ms);
}

//max we can send per second would be 20 messages of payload size 144 bytes passing 3Mbps drops messages
ASSERT_LT (limiter.get_rate (), config.bandwidth_limit * 2);
//adding another should not drop as 1s has elapsed
ASSERT_EQ (limiter.should_drop (detail_req, full_confirm_req), 0);
//rate should have reset when time was reset and should now be 144 bytes
ASSERT_EQ (limiter.get_rate (), full_confirm_req);
}
}
12 changes: 8 additions & 4 deletions nano/node/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ void nano::frontier_req_client::run ()
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error while sending bootstrap request %1%") % ec.message ()));
}
}
});
},
true); // is bootstrap traffic
}

std::shared_ptr<nano::bootstrap_client> nano::bootstrap_client::shared ()
Expand Down Expand Up @@ -345,7 +346,8 @@ void nano::bulk_pull_client::request ()
}
this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_request_failure, nano::stat::dir::in);
}
});
},
true); // is bootstrap traffic
}

void nano::bulk_pull_client::receive_block ()
Expand Down Expand Up @@ -530,7 +532,8 @@ void nano::bulk_push_client::start ()
this_l->connection->node->logger.try_log (boost::str (boost::format ("Unable to send bulk_push request: %1%") % ec.message ()));
}
}
});
},
true); // is bootstrap traffic
}

void nano::bulk_push_client::push (nano::transaction const & transaction_a)
Expand Down Expand Up @@ -667,7 +670,8 @@ void nano::bulk_pull_account_client::request ()
}
this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_error_starting_request, nano::stat::dir::in);
}
});
},
true); // is bootstrap traffic
}

void nano::bulk_pull_account_client::receive_pending ()
Expand Down
6 changes: 3 additions & 3 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void nano::network::send_keepalive (std::shared_ptr<nano::transport::channel> ch
{
nano::keepalive message;
random_fill (message.peers);
channel_a->send (message);
channel_a->send (message, nullptr, true); // is bootstrap traffic
}

void nano::network::send_keepalive_self (std::shared_ptr<nano::transport::channel> channel_a)
Expand All @@ -100,7 +100,7 @@ void nano::network::send_keepalive_self (std::shared_ptr<nano::transport::channe
message.peers[0] = nano::endpoint (boost::asio::ip::address_v6{}, endpoint ().port ());
}
}
channel_a->send (message);
channel_a->send (message, nullptr, true); // is bootstrap traffic
}

void nano::network::send_node_id_handshake (std::shared_ptr<nano::transport::channel> channel_a, boost::optional<nano::uint256_union> const & query, boost::optional<nano::uint256_union> const & respond_to)
Expand All @@ -116,7 +116,7 @@ void nano::network::send_node_id_handshake (std::shared_ptr<nano::transport::cha
{
node.logger.try_log (boost::str (boost::format ("Node ID handshake sent with node ID %1% to %2%: query %3%, respond_to %4% (signature %5%)") % node.node_id.pub.to_account () % channel_a->get_endpoint () % (query ? query->to_string () : std::string ("[none]")) % (respond_to ? respond_to->to_string () : std::string ("[none]")) % (response ? response->second.to_string () : std::string ("[none]"))));
}
channel_a->send (message);
channel_a->send (message, nullptr, true); // is node_id_handshake
}

template <typename T>
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ void nano::transport::tcp_channels::ongoing_keepalive ()
for (auto & channel : send_list)
{
std::weak_ptr<nano::node> node_w (node.shared ());
channel->send (message);
channel->send (message, nullptr, true); // is ongoing keepalive;
}
std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + node.network_params.node.half_period, [node_w]() {
Expand Down
24 changes: 14 additions & 10 deletions nano/node/transport/transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/transport.hpp>

#include <numeric>

namespace
{
class callback_visitor : public nano::message_visitor
Expand Down Expand Up @@ -73,13 +75,13 @@ node (node_a)
{
}

void nano::transport::channel::send (nano::message const & message_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a)
void nano::transport::channel::send (nano::message const & message_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a, bool const & is_dropable)
{
callback_visitor visitor;
message_a.visit (visitor);
auto buffer (message_a.to_bytes ());
auto detail (visitor.result);
if (!limiter.should_drop (detail, buffer->size ()))
if (is_dropable || !limiter.should_drop (buffer->size ()))
{
send_buffer (buffer, detail, callback_a);
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
Expand Down Expand Up @@ -205,28 +207,30 @@ bool nano::transport::reserved_address (nano::endpoint const & endpoint_a, bool
nano::bandwidth_limiter::bandwidth_limiter (const size_t limit_a) :
last_poll (std::chrono::steady_clock::now ()),
limit (limit_a),
rate (0)
rate (0),
trended_rate (0)
{
}

bool nano::bandwidth_limiter::should_drop (const nano::stat::detail & detail_a, const size_t & message_size)
bool nano::bandwidth_limiter::should_drop (const size_t & message_size)
{
using namespace std::chrono_literals;
bool result (false);
static constexpr std::array<nano::stat::detail, 4> could_drop{ nano::stat::detail::confirm_req, nano::stat::detail::confirm_ack, nano::stat::detail::publish, nano::stat::detail::republish_vote };
auto should_keep (std::find (std::begin (could_drop), std::end (could_drop), detail_a));
if (limit == 0 || should_keep == std::end (could_drop))
if (limit == 0) //never drop if limit is 0
{
return result;
}
std::lock_guard<std::mutex> lock (mutex);

if (last_poll + 1s < std::chrono::steady_clock::now ())
if (last_poll + 50ms < std::chrono::steady_clock::now ())
{
last_poll = std::chrono::steady_clock::now ();
rate_buffer.push_back (rate);
auto sum = std::accumulate (rate_buffer.begin (), rate_buffer.end (), 0);
trended_rate = sum / rate_buffer.size ();
rate = 0;
}
if (message_size + rate > limit)
if (trended_rate > limit)
{
result = true;
}
Expand All @@ -240,5 +244,5 @@ bool nano::bandwidth_limiter::should_drop (const nano::stat::detail & detail_a,
size_t nano::bandwidth_limiter::get_rate ()
{
std::lock_guard<std::mutex> lock (mutex);
return rate;
return trended_rate;
}
8 changes: 6 additions & 2 deletions nano/node/transport/transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@ class bandwidth_limiter final
public:
// initialize with rate 0 = unbounded
bandwidth_limiter (const size_t);
bool should_drop (const nano::stat::detail &, const size_t &);
bool should_drop (const size_t &);
size_t get_rate ();

private:
//last time rate was adjusted
std::chrono::steady_clock::time_point last_poll;
//trend rate over 20 poll periods
boost::circular_buffer<size_t> rate_buffer{ 20, 0 };
//limit bandwidth to
size_t limit;
//rate, increment if message_size + rate < rate
size_t rate;
//trended rate to even out spikes in traffic
size_t trended_rate;
std::mutex mutex;
};
namespace transport
Expand All @@ -49,7 +53,7 @@ namespace transport
virtual ~channel () = default;
virtual size_t hash_code () const = 0;
virtual bool operator== (nano::transport::channel const &) const = 0;
void send (nano::message const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr);
void send (nano::message const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr, bool const & = false);
virtual void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) = 0;
virtual std::function<void(boost::system::error_code const &, size_t)> callback (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const = 0;
virtual std::string to_string () const = 0;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ void nano::transport::udp_channels::ongoing_keepalive ()
for (auto i (channels.get<last_packet_received_tag> ().begin ()); i != keepalive_cutoff; ++i)
{
i->channel->set_last_packet_sent (std::chrono::steady_clock::now ());
i->channel->send (message);
i->channel->send (message, nullptr, true); // is ongoing keepalive
}
std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + node.network_params.node.period, [node_w]() {
Expand Down