From 5f36d08a90112bc8b118bf1331a1d66192d15fec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 29 Jun 2024 14:32:45 +0200 Subject: [PATCH 1/8] Backoff for block broadcasting interval --- nano/node/local_block_broadcaster.cpp | 65 ++++++++++++++++----------- nano/node/local_block_broadcaster.hpp | 53 ++++++++++++---------- 2 files changed, 69 insertions(+), 49 deletions(-) diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index 4e97d1eb50..09bbff8c44 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -12,7 +12,8 @@ nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nan block_processor{ block_processor_a }, network{ network_a }, stats{ stats_a }, - enabled{ enabled_a } + enabled{ enabled_a }, + limiter{ config.broadcast_rate_limit, config.broadcast_rate_burst_ratio } { if (!enabled) { @@ -26,9 +27,20 @@ nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nan // Only rebroadcast local blocks that were successfully processed (no forks or gaps) if (result == nano::block_status::progress && context.source == nano::block_source::local) { + release_assert (context.block != nullptr); + nano::lock_guard guard{ mutex }; + local_blocks.emplace_back (local_entry{ context.block, std::chrono::steady_clock::now () }); stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::insert); + + // Erase oldest blocks if the queue gets too big + while (local_blocks.size () > config.max_size) + { + stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_oldest); + local_blocks.pop_front (); + } + should_notify = true; } } @@ -43,6 +55,8 @@ nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nan auto erased = local_blocks.get ().erase (block->hash ()); stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, nano::stat::dir::in, erased); }); + + // TODO: Listen for cemented callback } nano::local_block_broadcaster::~local_block_broadcaster () @@ -83,32 +97,47 @@ void nano::local_block_broadcaster::run () { stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::loop); - condition.wait_for (lock, check_interval); + condition.wait_for (lock, 1s); debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds - if (!stopped) + if (!stopped && !local_blocks.empty ()) { cleanup (); run_broadcasts (lock); - debug_assert (lock.owns_lock ()); + debug_assert (!lock.owns_lock ()); + lock.lock (); } } } +std::chrono::milliseconds nano::local_block_broadcaster::rebroadcast_interval (unsigned rebroadcasts) const +{ + return std::min (config.rebroadcast_interval * rebroadcasts, config.max_rebroadcast_interval); +} + void nano::local_block_broadcaster::run_broadcasts (nano::unique_lock & lock) { debug_assert (lock.owns_lock ()); - std::vector> to_broadcast; + std::deque> to_broadcast; auto const now = std::chrono::steady_clock::now (); - for (auto & entry : local_blocks) + + // Iterate blocks with next_broadcast <= now + auto & by_broadcast = local_blocks.get (); + for (auto it = by_broadcast.begin (), end = by_broadcast.upper_bound (now); it != end;) { - if (elapsed (entry.last_broadcast, broadcast_interval, now)) - { + debug_assert (it->next_broadcast <= now); + + release_assert (it->block != nullptr); + to_broadcast.push_back (it->block); + + bool success = by_broadcast.modify (it++, [this, now] (auto & entry) { + entry.rebroadcasts += 1; entry.last_broadcast = now; - to_broadcast.push_back (entry.block); - } + entry.next_broadcast = now + rebroadcast_interval (entry.rebroadcasts); + }); + release_assert (success, "modify failed"); // Should never fail } lock.unlock (); @@ -128,21 +157,12 @@ void nano::local_block_broadcaster::run_broadcasts (nano::unique_lock max_size) - { - stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_oldest); - local_blocks.pop_front (); - } - // TODO: Mutex is held during IO, but it should be fine since it's not performance critical auto transaction = node.ledger.tx_begin_read (); erase_if (local_blocks, [this, &transaction] (auto const & entry) { @@ -169,9 +189,4 @@ std::unique_ptr nano::local_block_broadcaster::c auto composite = std::make_unique (name); composite->add_component (std::make_unique (container_info{ "local", local_blocks.size (), sizeof (decltype (local_blocks)::value_type) })); return composite; -} - -nano::block_hash nano::local_block_broadcaster::local_entry::hash () const -{ - return block->hash (); -} +} \ No newline at end of file diff --git a/nano/node/local_block_broadcaster.hpp b/nano/node/local_block_broadcaster.hpp index b4e7093dd6..89ff706b2c 100644 --- a/nano/node/local_block_broadcaster.hpp +++ b/nano/node/local_block_broadcaster.hpp @@ -1,9 +1,11 @@ #pragma once +#include #include #include #include #include +#include #include #include @@ -20,24 +22,23 @@ namespace mi = boost::multi_index; namespace nano { -class node; -class network; -} - -namespace nano +class local_block_broadcaster_config { +public: + // TODO: Make these configurable + static std::size_t constexpr max_size{ 1024 * 8 }; + static std::chrono::seconds constexpr rebroadcast_interval{ 3 }; + static std::chrono::seconds constexpr max_rebroadcast_interval{ 60 }; + static std::size_t constexpr broadcast_rate_limit{ 32 }; + static double constexpr broadcast_rate_burst_ratio{ 3 }; +}; + /** * Broadcasts blocks to the network * Tracks local blocks for more aggressive propagation */ class local_block_broadcaster { - enum class broadcast_strategy - { - normal, - aggressive, - }; - public: local_block_broadcaster (nano::node &, nano::block_processor &, nano::network &, nano::stats &, bool enabled = false); ~local_block_broadcaster (); @@ -51,8 +52,10 @@ class local_block_broadcaster void run (); void run_broadcasts (nano::unique_lock &); void cleanup (); + std::chrono::milliseconds rebroadcast_interval (unsigned rebroadcasts) const; private: // Dependencies + local_block_broadcaster_config const config{}; // TODO: Pass in constructor nano::node & node; nano::block_processor & block_processor; nano::network & network; @@ -61,22 +64,31 @@ class local_block_broadcaster private: struct local_entry { - std::shared_ptr const block; - std::chrono::steady_clock::time_point const arrival; - mutable std::chrono::steady_clock::time_point last_broadcast{}; // Not part of any index + std::shared_ptr block; + std::chrono::steady_clock::time_point arrival; - nano::block_hash hash () const; + std::chrono::steady_clock::time_point last_broadcast{}; + std::chrono::steady_clock::time_point next_broadcast{}; + unsigned rebroadcasts{ 0 }; + + nano::block_hash hash () const + { + return block->hash (); + } }; // clang-format off class tag_sequenced {}; class tag_hash {}; + class tag_broadcast {}; using ordered_locals = boost::multi_index_container>, mi::hashed_unique, - mi::const_mem_fun> + mi::const_mem_fun>, + mi::ordered_non_unique, + mi::member> >>; // clang-format on @@ -85,18 +97,11 @@ class local_block_broadcaster private: bool enabled{ false }; - nano::bandwidth_limiter limiter{ broadcast_rate_limit, broadcast_rate_burst_ratio }; + nano::bandwidth_limiter limiter; std::atomic stopped{ false }; nano::condition_variable condition; mutable nano::mutex mutex; std::thread thread; - - // TODO: Make these configurable - static std::size_t constexpr max_size{ 1024 * 8 }; - static std::chrono::seconds constexpr check_interval{ 30 }; - static std::chrono::seconds constexpr broadcast_interval{ 60 }; - static std::size_t constexpr broadcast_rate_limit{ 32 }; - static double constexpr broadcast_rate_burst_ratio{ 3 }; }; } From 9f91fb68f69c0b05c55fc248cc23ef409d5a0675 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 29 Jun 2024 15:46:18 +0200 Subject: [PATCH 2/8] Listen for cemented event --- nano/node/fwd.hpp | 1 + nano/node/local_block_broadcaster.cpp | 13 ++++++++++--- nano/node/local_block_broadcaster.hpp | 4 +++- nano/node/node.cpp | 2 +- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 0da465ae22..9d6d6de9ea 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -7,6 +7,7 @@ namespace nano { class active_elections; +class confirming_set; class ledger; class local_vote_history; class logger; diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index 09bbff8c44..30dc19c486 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -2,16 +2,19 @@ #include #include #include +#include #include #include #include #include -nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::stats & stats_a, bool enabled_a) : +nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a, bool enabled_a) : node{ node_a }, block_processor{ block_processor_a }, network{ network_a }, + confirming_set{ confirming_set_a }, stats{ stats_a }, + logger{ logger_a }, enabled{ enabled_a }, limiter{ config.broadcast_rate_limit, config.broadcast_rate_burst_ratio } { @@ -53,10 +56,14 @@ nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nan block_processor.rolled_back.add ([this] (auto const & block) { nano::lock_guard guard{ mutex }; auto erased = local_blocks.get ().erase (block->hash ()); - stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, nano::stat::dir::in, erased); + stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, erased); }); - // TODO: Listen for cemented callback + confirming_set.cemented_observers.add ([this] (auto const & block) { + nano::lock_guard guard{ mutex }; + auto erased = local_blocks.get ().erase (block->hash ()); + stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::cemented, erased); + }); } nano::local_block_broadcaster::~local_block_broadcaster () diff --git a/nano/node/local_block_broadcaster.hpp b/nano/node/local_block_broadcaster.hpp index 89ff706b2c..c92643cce5 100644 --- a/nano/node/local_block_broadcaster.hpp +++ b/nano/node/local_block_broadcaster.hpp @@ -40,7 +40,7 @@ class local_block_broadcaster_config class local_block_broadcaster { public: - local_block_broadcaster (nano::node &, nano::block_processor &, nano::network &, nano::stats &, bool enabled = false); + local_block_broadcaster (nano::node &, nano::block_processor &, nano::network &, nano::confirming_set &, nano::stats &, nano::logger &, bool enabled = false); ~local_block_broadcaster (); void start (); @@ -59,7 +59,9 @@ class local_block_broadcaster nano::node & node; nano::block_processor & block_processor; nano::network & network; + nano::confirming_set & confirming_set; nano::stats & stats; + nano::logger & logger; private: struct local_entry diff --git a/nano/node/node.cpp b/nano/node/node.cpp index a2521e49cf..3344b99f1a 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -217,7 +217,7 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy ascendboot{ config, block_processor, ledger, network, stats }, websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger }, epoch_upgrader{ *this, ledger, store, network_params, logger }, - local_block_broadcaster{ *this, block_processor, network, stats, !flags.disable_block_processor_republishing }, + local_block_broadcaster{ *this, block_processor, network, confirming_set, stats, logger, !flags.disable_block_processor_republishing }, process_live_dispatcher{ ledger, scheduler.priority, vote_cache, websocket }, peer_history_impl{ std::make_unique (config.peer_history, store, network, logger, stats) }, peer_history{ *peer_history_impl }, From e2cb41059994015c81dddb025c8fe2c1d8cce368 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 29 Jun 2024 15:58:19 +0200 Subject: [PATCH 3/8] Cleanup rework --- nano/node/local_block_broadcaster.cpp | 50 ++++++++++++++++++--------- nano/node/local_block_broadcaster.hpp | 15 ++++---- 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index 30dc19c486..ef01edeb30 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -102,14 +102,19 @@ void nano::local_block_broadcaster::run () nano::unique_lock lock{ mutex }; while (!stopped) { - stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::loop); - condition.wait_for (lock, 1s); debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds if (!stopped && !local_blocks.empty ()) { - cleanup (); + stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::loop); + + if (cleanup_interval.elapsed (config.cleanup_interval)) + { + cleanup (lock); + debug_assert (lock.owns_lock ()); + } + run_broadcasts (lock); debug_assert (!lock.owns_lock ()); lock.lock (); @@ -161,31 +166,42 @@ void nano::local_block_broadcaster::run_broadcasts (nano::unique_lock & lock) { debug_assert (!mutex.try_lock ()); - // TODO: Mutex is held during IO, but it should be fine since it's not performance critical - auto transaction = node.ledger.tx_begin_read (); - erase_if (local_blocks, [this, &transaction] (auto const & entry) { - transaction.refresh_if_needed (); + // Copy the local blocks to avoid holding the mutex during IO + auto local_blocks_copy = local_blocks; + + lock.unlock (); - if (entry.last_broadcast == std::chrono::steady_clock::time_point{}) + std::set already_confirmed; + { + auto transaction = node.ledger.tx_begin_read (); + for (auto const & entry : local_blocks_copy) { // This block has never been broadcasted, keep it so it's broadcasted at least once - return false; - } - if (node.block_confirmed_or_being_confirmed (transaction, entry.block->hash ())) - { - stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_confirmed); - return true; + if (entry.last_broadcast == std::chrono::steady_clock::time_point{}) + { + continue; + } + if (node.block_confirmed_or_being_confirmed (transaction, entry.block->hash ())) + { + stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::already_confirmed); + already_confirmed.insert (entry.block->hash ()); + } } - return false; + } + + lock.lock (); + + // Erase blocks that have been confirmed + erase_if (local_blocks, [&already_confirmed] (auto const & entry) { + return already_confirmed.contains (entry.block->hash ()); }); } diff --git a/nano/node/local_block_broadcaster.hpp b/nano/node/local_block_broadcaster.hpp index c92643cce5..01f9f79fdf 100644 --- a/nano/node/local_block_broadcaster.hpp +++ b/nano/node/local_block_broadcaster.hpp @@ -26,11 +26,12 @@ class local_block_broadcaster_config { public: // TODO: Make these configurable - static std::size_t constexpr max_size{ 1024 * 8 }; - static std::chrono::seconds constexpr rebroadcast_interval{ 3 }; - static std::chrono::seconds constexpr max_rebroadcast_interval{ 60 }; - static std::size_t constexpr broadcast_rate_limit{ 32 }; - static double constexpr broadcast_rate_burst_ratio{ 3 }; + std::size_t max_size{ 1024 * 8 }; + std::chrono::seconds rebroadcast_interval{ 3 }; + std::chrono::seconds max_rebroadcast_interval{ 60 }; + std::size_t broadcast_rate_limit{ 32 }; + double broadcast_rate_burst_ratio{ 3 }; + std::chrono::seconds cleanup_interval{ 60 }; }; /** @@ -51,7 +52,7 @@ class local_block_broadcaster private: void run (); void run_broadcasts (nano::unique_lock &); - void cleanup (); + void cleanup (nano::unique_lock &); std::chrono::milliseconds rebroadcast_interval (unsigned rebroadcasts) const; private: // Dependencies @@ -98,8 +99,8 @@ class local_block_broadcaster private: bool enabled{ false }; - nano::bandwidth_limiter limiter; + nano::interval cleanup_interval; std::atomic stopped{ false }; nano::condition_variable condition; From a3e71ba236bcb5833f025f78651f321dcea434f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 29 Jun 2024 16:22:37 +0200 Subject: [PATCH 4/8] Unique pointer --- nano/node/fwd.hpp | 1 + nano/node/node.cpp | 4 +++- nano/node/node.hpp | 4 ++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 9d6d6de9ea..4a0f903a8a 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -9,6 +9,7 @@ namespace nano class active_elections; class confirming_set; class ledger; +class local_block_broadcaster; class local_vote_history; class logger; class network; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 3344b99f1a..566fc24ccb 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -217,7 +218,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy ascendboot{ config, block_processor, ledger, network, stats }, websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger }, epoch_upgrader{ *this, ledger, store, network_params, logger }, - local_block_broadcaster{ *this, block_processor, network, confirming_set, stats, logger, !flags.disable_block_processor_republishing }, + local_block_broadcaster_impl{ std::make_unique (*this, block_processor, network, confirming_set, stats, logger, !flags.disable_block_processor_republishing) }, + local_block_broadcaster{ *local_block_broadcaster_impl }, process_live_dispatcher{ ledger, scheduler.priority, vote_cache, websocket }, peer_history_impl{ std::make_unique (config.peer_history, store, network, logger, stats) }, peer_history{ *peer_history_impl }, diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 994bf1dce3..10494aa83d 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -212,7 +211,8 @@ class node final : public std::enable_shared_from_this nano::bootstrap_ascending::service ascendboot; nano::websocket_server websocket; nano::epoch_upgrader epoch_upgrader; - nano::local_block_broadcaster local_block_broadcaster; + std::unique_ptr local_block_broadcaster_impl; + nano::local_block_broadcaster & local_block_broadcaster; nano::process_live_dispatcher process_live_dispatcher; std::unique_ptr peer_history_impl; nano::peer_history & peer_history; From 56ae4b971e605d303b347e2814a62520d5fd2442 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 29 Jun 2024 16:33:11 +0200 Subject: [PATCH 5/8] Config --- nano/node/local_block_broadcaster.cpp | 3 ++- nano/node/local_block_broadcaster.hpp | 21 ++++++++++++++++----- nano/node/node.cpp | 2 +- nano/node/nodeconfig.cpp | 3 ++- nano/node/nodeconfig.hpp | 2 ++ 5 files changed, 23 insertions(+), 8 deletions(-) diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index ef01edeb30..0c5e0c3745 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -8,7 +8,8 @@ #include #include -nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a, bool enabled_a) : +nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_config const & config_a, nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a, bool enabled_a) : + config{ config_a }, node{ node_a }, block_processor{ block_processor_a }, network{ network_a }, diff --git a/nano/node/local_block_broadcaster.hpp b/nano/node/local_block_broadcaster.hpp index 01f9f79fdf..32a073ca96 100644 --- a/nano/node/local_block_broadcaster.hpp +++ b/nano/node/local_block_broadcaster.hpp @@ -22,10 +22,21 @@ namespace mi = boost::multi_index; namespace nano { -class local_block_broadcaster_config +class local_block_broadcaster_config final { public: - // TODO: Make these configurable + explicit local_block_broadcaster_config (nano::network_constants const & network) + { + if (network.is_dev_network ()) + { + rebroadcast_interval = 1s; + cleanup_interval = 1s; + } + } + + // TODO: Serialization & deserialization + +public: std::size_t max_size{ 1024 * 8 }; std::chrono::seconds rebroadcast_interval{ 3 }; std::chrono::seconds max_rebroadcast_interval{ 60 }; @@ -38,10 +49,10 @@ class local_block_broadcaster_config * Broadcasts blocks to the network * Tracks local blocks for more aggressive propagation */ -class local_block_broadcaster +class local_block_broadcaster final { public: - local_block_broadcaster (nano::node &, nano::block_processor &, nano::network &, nano::confirming_set &, nano::stats &, nano::logger &, bool enabled = false); + local_block_broadcaster (local_block_broadcaster_config const &, nano::node &, nano::block_processor &, nano::network &, nano::confirming_set &, nano::stats &, nano::logger &, bool enabled = false); ~local_block_broadcaster (); void start (); @@ -56,7 +67,7 @@ class local_block_broadcaster std::chrono::milliseconds rebroadcast_interval (unsigned rebroadcasts) const; private: // Dependencies - local_block_broadcaster_config const config{}; // TODO: Pass in constructor + local_block_broadcaster_config const & config; nano::node & node; nano::block_processor & block_processor; nano::network & network; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 566fc24ccb..90d102fc48 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -218,7 +218,7 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy ascendboot{ config, block_processor, ledger, network, stats }, websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger }, epoch_upgrader{ *this, ledger, store, network_params, logger }, - local_block_broadcaster_impl{ std::make_unique (*this, block_processor, network, confirming_set, stats, logger, !flags.disable_block_processor_republishing) }, + local_block_broadcaster_impl{ std::make_unique (config.local_block_broadcaster, *this, block_processor, network, confirming_set, stats, logger, !flags.disable_block_processor_republishing) }, local_block_broadcaster{ *local_block_broadcaster_impl }, process_live_dispatcher{ ledger, scheduler.priority, vote_cache, websocket }, peer_history_impl{ std::make_unique (config.peer_history, store, network, logger, stats) }, diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 683902c75d..0eef9a5674 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -39,7 +39,8 @@ nano::node_config::node_config (const std::optional & peering_port_a, block_processor{ network_params.network }, peer_history{ network_params.network }, tcp{ network_params.network }, - network{ network_params.network } + network{ network_params.network }, + local_block_broadcaster{ network_params.network } { if (peering_port == 0) { diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 1b9fd403bc..a71953b690 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -152,6 +153,7 @@ class node_config nano::request_aggregator_config request_aggregator; nano::message_processor_config message_processor; nano::network_config network; + nano::local_block_broadcaster_config local_block_broadcaster; public: std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const; From e942485c3d1e8183e7415cae70b6148fee4b876f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 29 Jun 2024 16:38:48 +0200 Subject: [PATCH 6/8] Logging --- nano/lib/logging_enums.hpp | 1 + nano/node/local_block_broadcaster.cpp | 12 ++++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index 780d2b01ba..b2b00b350c 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -79,6 +79,7 @@ enum class type signal_manager, peer_history, message_processor, + local_block_broadcaster, // bootstrap bulk_pull_client, diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index 0c5e0c3745..ba2e74a4e8 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -132,7 +132,7 @@ void nano::local_block_broadcaster::run_broadcasts (nano::unique_lock> to_broadcast; + std::deque to_broadcast; auto const now = std::chrono::steady_clock::now (); @@ -143,7 +143,7 @@ void nano::local_block_broadcaster::run_broadcasts (nano::unique_locknext_broadcast <= now); release_assert (it->block != nullptr); - to_broadcast.push_back (it->block); + to_broadcast.push_back (*it); bool success = by_broadcast.modify (it++, [this, now] (auto & entry) { entry.rebroadcasts += 1; @@ -155,7 +155,7 @@ void nano::local_block_broadcaster::run_broadcasts (nano::unique_lockhash ().to_string (), + entry.rebroadcasts); + stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::broadcast, nano::stat::dir::out); - network.flood_block_initial (block); + network.flood_block_initial (entry.block); } } From 1fc9e79b45f995b4944837d5c1ae368345a0f82c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 29 Jun 2024 18:13:48 +0200 Subject: [PATCH 7/8] Tests --- nano/core_test/node.cpp | 39 +++++++++++++++++++++++++++ nano/node/local_block_broadcaster.cpp | 6 +++++ nano/node/local_block_broadcaster.hpp | 2 ++ 3 files changed, 47 insertions(+) diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index f40dd6ccc0..4c49d7ed79 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -3859,3 +3859,42 @@ TEST (node, process_local_overflow) auto result = node.process_local (send1); ASSERT_FALSE (result); } + +TEST (node, local_block_broadcast) +{ + nano::test::system system; + + // Disable active elections to prevent the block from being broadcasted by the election + auto node_config = system.default_config (); + node_config.active_elections.size = 0; + node_config.local_block_broadcaster.rebroadcast_interval = 1s; + auto & node1 = *system.add_node (node_config); + auto & node2 = *system.make_disconnected_node (); + + nano::keypair key1; + nano::send_block_builder builder; + auto latest_hash = nano::dev::genesis->hash (); + auto send1 = builder.make_block () + .previous (latest_hash) + .destination (key1.pub) + .balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (latest_hash)) + .build (); + + auto result = node1.process_local (send1); + ASSERT_TRUE (result); + ASSERT_NEVER (500ms, node1.active.active (send1->qualified_root ())); + + // Wait until a broadcast is attempted + ASSERT_TIMELY_EQ (5s, node1.local_block_broadcaster.size (), 1); + ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::local_block_broadcaster, nano::stat::detail::broadcast, nano::stat::dir::out) >= 1); + + // The other node should not have received the block + ASSERT_NEVER (500ms, node2.block (send1->hash ())); + + // Connect the nodes and check that the block is propagated + node1.network.merge_peer (node2.network.endpoint ()); + ASSERT_TIMELY (5s, node1.network.find_node_id (node2.get_node_id ())); + ASSERT_TIMELY (10s, node2.block (send1->hash ())); +} \ No newline at end of file diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index ba2e74a4e8..624db69b16 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -98,6 +98,12 @@ void nano::local_block_broadcaster::stop () nano::join_or_pass (thread); } +size_t nano::local_block_broadcaster::size () const +{ + nano::lock_guard lock{ mutex }; + return local_blocks.size (); +} + void nano::local_block_broadcaster::run () { nano::unique_lock lock{ mutex }; diff --git a/nano/node/local_block_broadcaster.hpp b/nano/node/local_block_broadcaster.hpp index 32a073ca96..e86af05a9e 100644 --- a/nano/node/local_block_broadcaster.hpp +++ b/nano/node/local_block_broadcaster.hpp @@ -58,6 +58,8 @@ class local_block_broadcaster final void start (); void stop (); + size_t size () const; + std::unique_ptr collect_container_info (std::string const & name) const; private: From dbb17515f6a09a0340759ab74e90ac2352e23d89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 29 Jun 2024 19:31:25 +0200 Subject: [PATCH 8/8] Fix modification of multi-index iterators --- nano/node/local_block_broadcaster.cpp | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index 624db69b16..2a530712d4 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -137,6 +137,7 @@ std::chrono::milliseconds nano::local_block_broadcaster::rebroadcast_interval (u void nano::local_block_broadcaster::run_broadcasts (nano::unique_lock & lock) { debug_assert (lock.owns_lock ()); + debug_assert (!mutex.try_lock ()); std::deque to_broadcast; @@ -144,14 +145,20 @@ void nano::local_block_broadcaster::run_broadcasts (nano::unique_lock (); - for (auto it = by_broadcast.begin (), end = by_broadcast.upper_bound (now); it != end;) + for (auto const & entry : boost::make_iterator_range (by_broadcast.begin (), by_broadcast.upper_bound (now))) { - debug_assert (it->next_broadcast <= now); - - release_assert (it->block != nullptr); - to_broadcast.push_back (*it); + debug_assert (entry.next_broadcast <= now); + release_assert (entry.block != nullptr); + to_broadcast.push_back (entry); + } - bool success = by_broadcast.modify (it++, [this, now] (auto & entry) { + // Modify multi index container outside of the loop to avoid invalidating iterators + auto & by_hash = local_blocks.get (); + for (auto const & entry : to_broadcast) + { + auto it = by_hash.find (entry.hash ()); + release_assert (it != by_hash.end ()); + bool success = by_hash.modify (it, [this, now] (auto & entry) { entry.rebroadcasts += 1; entry.last_broadcast = now; entry.next_broadcast = now + rebroadcast_interval (entry.rebroadcasts);