Skip to content

Commit

Permalink
Local block broadcaster (#4454)
Browse files Browse the repository at this point in the history
* Add rolled back event

* Local block broadcaster

* Rate limit block broadcasting

* Rename to `local_block_broadcaster`

* Node initialization order

* Fix tests

* Bump local block queue size to 8k elements
  • Loading branch information
pwojcikdev authored Mar 5, 2024
1 parent 65de6fe commit 5f28f1a
Show file tree
Hide file tree
Showing 13 changed files with 305 additions and 89 deletions.
8 changes: 8 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ enum class type : uint8_t
election_scheduler,
optimistic_scheduler,
handshake,
local_block_broadcaster,

bootstrap_ascending,
bootstrap_ascending_accounts,
Expand Down Expand Up @@ -102,6 +103,7 @@ enum class detail : uint8_t
old,
gap_previous,
gap_source,
rollback,
rollback_failed,
progress,
bad_signature,
Expand Down Expand Up @@ -328,6 +330,12 @@ enum class detail : uint8_t
deprioritize,
deprioritize_failed,

// block broadcaster
broadcast_normal,
broadcast_aggressive,
erase_old,
erase_confirmed,

_last // Must be the last enum
};

Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::scheduler_priority:
thread_role_name_string = "Sched Priority";
break;
case nano::thread_role::name::local_block_broadcasting:
thread_role_name_string = "Local broadcast";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum class name
scheduler_manual,
scheduler_optimistic,
scheduler_priority,
local_block_broadcasting,
};

/*
Expand Down
4 changes: 2 additions & 2 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ add_library(
backlog_population.cpp
bandwidth_limiter.hpp
bandwidth_limiter.cpp
block_broadcast.cpp
block_broadcast.hpp
blockprocessor.hpp
blockprocessor.cpp
bootstrap/block_deserializer.hpp
Expand Down Expand Up @@ -100,6 +98,8 @@ add_library(
ipc/ipc_server.cpp
json_handler.hpp
json_handler.cpp
local_block_broadcaster.cpp
local_block_broadcaster.hpp
make_store.hpp
make_store.cpp
network.hpp
Expand Down
51 changes: 0 additions & 51 deletions nano/node/block_broadcast.cpp

This file was deleted.

28 changes: 0 additions & 28 deletions nano/node/block_broadcast.hpp

This file was deleted.

4 changes: 4 additions & 0 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,15 @@ void nano::block_processor::rollback_competitor (store::write_transaction const
}
else
{
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback);
node.logger.debug (nano::log::type::blockprocessor, "Blocks rolled back: {}", rollback_list.size ());
}

// Deleting from votes cache, stop active transaction
for (auto & i : rollback_list)
{
rolled_back.notify (i);

node.history.erase (i->root ());
// Stop all rolled back active transactions except initial
if (i->hash () != successor->hash ())
Expand Down
1 change: 1 addition & 0 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class block_processor final
// The batch observer feeds the processed observer
nano::observer_set<nano::block_status const &, context const &> block_processed;
nano::observer_set<processed_batch_t const &> batch_processed;
nano::observer_set<std::shared_ptr<nano::block> const &> rolled_back;

private:
// Roll back block in the ledger that conflicts with 'block'
Expand Down
170 changes: 170 additions & 0 deletions nano/node/local_block_broadcaster.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/local_block_broadcaster.hpp>
#include <nano/node/network.hpp>
#include <nano/node/node.hpp>

nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::stats & stats_a, bool enabled_a) :
node{ node_a },
block_processor{ block_processor_a },
network{ network_a },
stats{ stats_a },
enabled{ enabled_a }
{
if (!enabled)
{
return;
}

block_processor.batch_processed.add ([this] (auto const & batch) {
bool should_notify = false;
for (auto const & [result, context] : batch)
{
// Only rebroadcast local blocks that were successfully processed (no forks or gaps)
if (result == nano::block_status::progress && context.source == nano::block_source::local)
{
nano::lock_guard<nano::mutex> guard{ mutex };
local_blocks.emplace_back (local_entry{ context.block, std::chrono::steady_clock::now () });
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::insert);
should_notify = true;
}
}
if (should_notify)
{
condition.notify_all ();
}
});

block_processor.rolled_back.add ([this] (auto const & block) {
nano::lock_guard<nano::mutex> guard{ mutex };
auto erased = local_blocks.get<tag_hash> ().erase (block->hash ());
stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, nano::stat::dir::in, erased);
});
}

nano::local_block_broadcaster::~local_block_broadcaster ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
}

void nano::local_block_broadcaster::start ()
{
if (!enabled)
{
return;
}

debug_assert (!thread.joinable ());

thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::local_block_broadcasting);
run ();
} };
}

void nano::local_block_broadcaster::stop ()
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
condition.notify_all ();
nano::join_or_pass (thread);
}

void nano::local_block_broadcaster::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::loop);

condition.wait_for (lock, check_interval);
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds

if (!stopped)
{
cleanup ();
run_broadcasts (lock);
debug_assert (lock.owns_lock ());
}
}
}

void nano::local_block_broadcaster::run_broadcasts (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());

std::vector<std::shared_ptr<nano::block>> to_broadcast;

auto const now = std::chrono::steady_clock::now ();
for (auto & entry : local_blocks)
{
if (elapsed (entry.last_broadcast, broadcast_interval, now))
{
entry.last_broadcast = now;
to_broadcast.push_back (entry.block);
}
}

lock.unlock ();

for (auto const & block : to_broadcast)
{
while (!limiter.should_pass (1))
{
std::this_thread::sleep_for (std::chrono::milliseconds{ 100 });
if (stopped)
{
return;
}
}

stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::broadcast, nano::stat::dir::out);

network.flood_block_initial (block);
}

lock.lock ();
}

void nano::local_block_broadcaster::cleanup ()
{
debug_assert (!mutex.try_lock ());

// Erase oldest blocks if the queue gets too big
while (local_blocks.size () > max_size)
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_oldest);
local_blocks.pop_front ();
}

// TODO: Mutex is held during IO, but it should be fine since it's not performance critical
auto transaction = node.store.tx_begin_read ();
erase_if (local_blocks, [this, &transaction] (auto const & entry) {
transaction.refresh_if_needed ();

if (entry.last_broadcast == std::chrono::steady_clock::time_point{})
{
// This block has never been broadcasted, keep it so it's broadcasted at least once
return false;
}
if (node.block_confirmed_or_being_confirmed (transaction, entry.block->hash ()))
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_confirmed);
return true;
}
return false;
});
}

std::unique_ptr<nano::container_info_component> nano::local_block_broadcaster::collect_container_info (const std::string & name) const
{
nano::lock_guard<nano::mutex> guard{ mutex };

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "local", local_blocks.size (), sizeof (decltype (local_blocks)::value_type) }));
return composite;
}
Loading

0 comments on commit 5f28f1a

Please sign in to comment.