Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Election scheduler predicates #3296

Merged
merged 5 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1458,3 +1458,73 @@ TEST (active_transactions, vacancy)
ASSERT_EQ (1, node.active.vacancy ());
ASSERT_EQ (0, node.active.size ());
}

// Ensure transactions in excess of capacity are removed in fifo order
TEST (active_transactions, fifo)
{
nano::system system;
nano::node_config config{ nano::get_available_port (), system.logging };
config.active_elections_size = 1;
auto & node = *system.add_node (config);
nano::keypair key0;
nano::keypair key1;
nano::state_block_builder builder;
// Construct two pending entries that can be received simultaneously
auto send0 = builder.make_block ()
.account (nano::dev_genesis_key.pub)
.previous (nano::genesis_hash)
.representative (nano::dev_genesis_key.pub)
.link (key0.pub)
.balance (nano::genesis_amount - 1)
.sign (nano::dev_genesis_key.prv, nano::dev_genesis_key.pub)
.work (*system.work.generate (nano::genesis_hash))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*send0).code);
nano::blocks_confirm (node, { send0 }, true);
ASSERT_TIMELY (1s, node.block_confirmed (send0->hash ()));
ASSERT_TIMELY (1s, node.active.empty ());
auto send1 = builder.make_block ()
.account (nano::dev_genesis_key.pub)
.previous (send0->hash ())
.representative (nano::dev_genesis_key.pub)
.link (key1.pub)
.balance (nano::genesis_amount - 2)
.sign (nano::dev_genesis_key.prv, nano::dev_genesis_key.pub)
.work (*system.work.generate (send0->hash ()))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*send1).code);
nano::blocks_confirm (node, { send1 }, true);
ASSERT_TIMELY (1s, node.block_confirmed (send1->hash ()));
ASSERT_TIMELY (1s, node.active.empty ());

auto receive0 = builder.make_block ()
.account (key0.pub)
.previous (0)
.representative (nano::dev_genesis_key.pub)
.link (send0->hash ())
.balance (1)
.sign (key0.prv, key0.pub)
.work (*system.work.generate (key0.pub))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*receive0).code);
auto receive1 = builder.make_block ()
.account (key1.pub)
.previous (0)
.representative (nano::dev_genesis_key.pub)
.link (send1->hash ())
.balance (1)
.sign (key1.prv, key1.pub)
.work (*system.work.generate (key1.pub))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*receive1).code);
node.scheduler.manual (receive0);
// Ensure first transaction becomes active
ASSERT_TIMELY (1s, node.active.election (receive0->qualified_root ()) != nullptr);
node.scheduler.manual (receive1);
// Ensure second transaction becomes active
ASSERT_TIMELY (1s, node.active.election (receive1->qualified_root ()) != nullptr);
// Ensure excess transactions get trimmed
ASSERT_TIMELY (1s, node.active.size () == 1);
// Ensure the surviving transaction is the least recently inserted
ASSERT_TIMELY (1s, node.active.election (receive1->qualified_root ()) != nullptr);
}
3 changes: 2 additions & 1 deletion nano/core_test/election_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ TEST (election_scheduler, flush_vacancy)
.sign (nano::dev_genesis_key.prv, nano::dev_genesis_key.pub)
.work (*system.work.generate (nano::genesis_hash))
.build_shared ();
node.scheduler.manual (send);
ASSERT_EQ (nano::process_result::progress, node.process (*send).code);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
// Ensure this call does not block, even though no elections can be activated.
node.scheduler.flush ();
ASSERT_EQ (0, node.active.size ());
Expand Down
14 changes: 11 additions & 3 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,7 @@ void nano::active_transactions::request_confirm (nano::unique_lock<nano::mutex>
{
bool const confirmed_l (election_l->confirmed ());

unconfirmed_count_l += !confirmed_l;
bool const overflow_l (unconfirmed_count_l > node.config.active_elections_size && election_l->election_start < election_ttl_cutoff_l);
if (overflow_l || election_l->transition_time (solicitor))
if (election_l->transition_time (solicitor))
{
if (election_l->optimistic () && election_l->failed ())
{
Expand Down Expand Up @@ -1046,6 +1044,16 @@ void nano::active_transactions::erase_hash (nano::block_hash const & hash_a)
debug_assert (erased == 1);
}

void nano::active_transactions::erase_oldest ()
{
nano::unique_lock<nano::mutex> lock (mutex);
if (!roots.empty ())
{
auto item = roots.get<tag_random_access> ().front ();
cleanup_election (lock, *item.election);
}
}

bool nano::active_transactions::empty ()
{
nano::lock_guard<nano::mutex> lock (mutex);
Expand Down
1 change: 1 addition & 0 deletions nano/node/active_transactions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class active_transactions final
std::vector<std::shared_ptr<nano::election>> list_active (size_t = std::numeric_limits<size_t>::max ());
void erase (nano::block const &);
void erase_hash (nano::block_hash const &);
void erase_oldest ();
bool empty ();
size_t size ();
void stop ();
Expand Down
42 changes: 28 additions & 14 deletions nano/node/election_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,45 @@ size_t nano::election_scheduler::priority_queue_size () const
return priority.size ();
}

bool nano::election_scheduler::priority_queue_predicate () const
{
return node.active.vacancy () > 0 && !priority.empty ();
}

bool nano::election_scheduler::manual_queue_predicate () const
{
return !manual_queue.empty ();
}

bool nano::election_scheduler::overfill_predicate () const
{
return node.active.vacancy () < 0;
}

void nano::election_scheduler::run ()
{
nano::thread_role::set (nano::thread_role::name::election_scheduler);
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait (lock, [this] () {
auto vacancy = node.active.vacancy ();
auto has_vacancy = vacancy > 0;
auto available = !priority.empty () || !manual_queue.empty ();
return stopped || (has_vacancy && available);
return stopped || priority_queue_predicate () || manual_queue_predicate () || overfill_predicate ();
});
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds
if (!stopped)
{
if (!priority.empty ())
if (overfill_predicate ())
{
node.active.erase_oldest ();
}
else if (manual_queue_predicate ())
{
auto const [block, previous_balance, election_behavior, confirmation_action] = manual_queue.front ();
nano::unique_lock<nano::mutex> lock2 (node.active.mutex);
node.active.insert_impl (lock2, block, previous_balance, election_behavior, confirmation_action);
manual_queue.pop_front ();
}
else if (priority_queue_predicate ())
{
auto block = priority.top ();
std::shared_ptr<nano::election> election;
Expand All @@ -113,15 +136,6 @@ void nano::election_scheduler::run ()
election->transition_active ();
}
priority.pop ();
++priority_queued;
}
if (!manual_queue.empty ())
{
auto const [block, previous_balance, election_behavior, confirmation_action] = manual_queue.front ();
nano::unique_lock<nano::mutex> lock2 (node.active.mutex);
node.active.insert_impl (lock2, block, previous_balance, election_behavior, confirmation_action);
manual_queue.pop_front ();
++manual_queued;
}
notify ();
}
Expand Down
5 changes: 3 additions & 2 deletions nano/node/election_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ class election_scheduler final
private:
void run ();
bool empty_locked () const;
bool priority_queue_predicate () const;
bool manual_queue_predicate () const;
bool overfill_predicate () const;
nano::prioritization priority;
uint64_t priority_queued{ 0 };
std::deque<std::tuple<std::shared_ptr<nano::block>, boost::optional<nano::uint128_t>, nano::election_behavior, std::function<void (std::shared_ptr<nano::block>)>>> manual_queue;
uint64_t manual_queued{ 0 };
nano::node & node;
bool stopped;
nano::condition_variable condition;
Expand Down