Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local block broadcaster #4454

Merged
merged 7 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ enum class type : uint8_t
election_scheduler,
optimistic_scheduler,
handshake,
local_block_broadcaster,

bootstrap_ascending,
bootstrap_ascending_accounts,
Expand Down Expand Up @@ -102,6 +103,7 @@ enum class detail : uint8_t
old,
gap_previous,
gap_source,
rollback,
rollback_failed,
progress,
bad_signature,
Expand Down Expand Up @@ -328,6 +330,12 @@ enum class detail : uint8_t
deprioritize,
deprioritize_failed,

// block broadcaster
broadcast_normal,
broadcast_aggressive,
erase_old,
erase_confirmed,

_last // Must be the last enum
};

Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::scheduler_priority:
thread_role_name_string = "Sched Priority";
break;
case nano::thread_role::name::local_block_broadcasting:
thread_role_name_string = "Local broadcast";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum class name
scheduler_manual,
scheduler_optimistic,
scheduler_priority,
local_block_broadcasting,
};

/*
Expand Down
4 changes: 2 additions & 2 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ add_library(
backlog_population.cpp
bandwidth_limiter.hpp
bandwidth_limiter.cpp
block_broadcast.cpp
block_broadcast.hpp
blockprocessor.hpp
blockprocessor.cpp
bootstrap/block_deserializer.hpp
Expand Down Expand Up @@ -100,6 +98,8 @@ add_library(
ipc/ipc_server.cpp
json_handler.hpp
json_handler.cpp
local_block_broadcaster.cpp
local_block_broadcaster.hpp
make_store.hpp
make_store.cpp
network.hpp
Expand Down
51 changes: 0 additions & 51 deletions nano/node/block_broadcast.cpp

This file was deleted.

28 changes: 0 additions & 28 deletions nano/node/block_broadcast.hpp

This file was deleted.

4 changes: 4 additions & 0 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,15 @@ void nano::block_processor::rollback_competitor (store::write_transaction const
}
else
{
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback);
node.logger.debug (nano::log::type::blockprocessor, "Blocks rolled back: {}", rollback_list.size ());
}

// Deleting from votes cache, stop active transaction
for (auto & i : rollback_list)
{
rolled_back.notify (i);

node.history.erase (i->root ());
// Stop all rolled back active transactions except initial
if (i->hash () != successor->hash ())
Expand Down
1 change: 1 addition & 0 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class block_processor final
// The batch observer feeds the processed observer
nano::observer_set<nano::block_status const &, context const &> block_processed;
nano::observer_set<processed_batch_t const &> batch_processed;
nano::observer_set<std::shared_ptr<nano::block> const &> rolled_back;

private:
// Roll back block in the ledger that conflicts with 'block'
Expand Down
170 changes: 170 additions & 0 deletions nano/node/local_block_broadcaster.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/local_block_broadcaster.hpp>
#include <nano/node/network.hpp>
#include <nano/node/node.hpp>

nano::local_block_broadcaster::local_block_broadcaster (nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::stats & stats_a, bool enabled_a) :
node{ node_a },
block_processor{ block_processor_a },
network{ network_a },
stats{ stats_a },
enabled{ enabled_a }
{
if (!enabled)
{
return;
}

block_processor.batch_processed.add ([this] (auto const & batch) {
bool should_notify = false;
for (auto const & [result, context] : batch)
{
// Only rebroadcast local blocks that were successfully processed (no forks or gaps)
if (result == nano::block_status::progress && context.source == nano::block_source::local)
{
nano::lock_guard<nano::mutex> guard{ mutex };
local_blocks.emplace_back (local_entry{ context.block, std::chrono::steady_clock::now () });
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::insert);
should_notify = true;
}
}
if (should_notify)
{
condition.notify_all ();
}
});

block_processor.rolled_back.add ([this] (auto const & block) {
nano::lock_guard<nano::mutex> guard{ mutex };
auto erased = local_blocks.get<tag_hash> ().erase (block->hash ());
stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, nano::stat::dir::in, erased);
});
}

nano::local_block_broadcaster::~local_block_broadcaster ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
}

void nano::local_block_broadcaster::start ()
{
if (!enabled)
{
return;
}

debug_assert (!thread.joinable ());

thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::local_block_broadcasting);
run ();
} };
}

void nano::local_block_broadcaster::stop ()
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
condition.notify_all ();
nano::join_or_pass (thread);
}

void nano::local_block_broadcaster::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::loop);

condition.wait_for (lock, check_interval);
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds

if (!stopped)
{
cleanup ();
run_broadcasts (lock);
debug_assert (lock.owns_lock ());
}
}
}

void nano::local_block_broadcaster::run_broadcasts (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());

std::vector<std::shared_ptr<nano::block>> to_broadcast;

auto const now = std::chrono::steady_clock::now ();
for (auto & entry : local_blocks)
{
if (elapsed (entry.last_broadcast, broadcast_interval, now))
{
entry.last_broadcast = now;
to_broadcast.push_back (entry.block);
}
}

lock.unlock ();

for (auto const & block : to_broadcast)
{
while (!limiter.should_pass (1))
{
std::this_thread::sleep_for (std::chrono::milliseconds{ 100 });
if (stopped)
{
return;
}
}

stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::broadcast, nano::stat::dir::out);

network.flood_block_initial (block);
}

lock.lock ();
}

void nano::local_block_broadcaster::cleanup ()
{
debug_assert (!mutex.try_lock ());

// Erase oldest blocks if the queue gets too big
while (local_blocks.size () > max_size)
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_oldest);
local_blocks.pop_front ();
}

// TODO: Mutex is held during IO, but it should be fine since it's not performance critical
auto transaction = node.store.tx_begin_read ();
erase_if (local_blocks, [this, &transaction] (auto const & entry) {
transaction.refresh_if_needed ();

if (entry.last_broadcast == std::chrono::steady_clock::time_point{})
{
// This block has never been broadcasted, keep it so it's broadcasted at least once
return false;
}
if (node.block_confirmed_or_being_confirmed (transaction, entry.block->hash ()))
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::erase_confirmed);
return true;
}
return false;
});
}

std::unique_ptr<nano::container_info_component> nano::local_block_broadcaster::collect_container_info (const std::string & name) const
{
nano::lock_guard<nano::mutex> guard{ mutex };

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "local", local_blocks.size (), sizeof (decltype (local_blocks)::value_type) }));
return composite;
}
Loading
Loading