Skip to content

Commit

Permalink
Secondary list of peers for generating work (#2330)
Browse files Browse the repository at this point in the history
* Secondary list of peers for distributed_work

This secondary list of peers can be used only via RPC work generate by setting the option "new_work_peers" to "true" along with "use_peers"

- Adds new_work_peers config, which works similarly to the current work_peers config
- Changes distributed_work to receive a list of peers
- Some fixes to distributed_work where it wouldn't call the failure callback in very rare scenarios
- Collect object stats for distributed_work

* Extract node_config::deserialize_address (const)

* Fix function signature

* Rename to secondary_work_peers, place under [node.experimental] and use 127.0.0.1:8076 as default

* Fix tests
  • Loading branch information
guilhermelawless authored Oct 10, 2019
1 parent 8f23972 commit 2d4a1ff
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 61 deletions.
38 changes: 19 additions & 19 deletions nano/core_test/distributed_work.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ TEST (distributed_work, no_peers)
work = work_a;
done = true;
};
node->distributed_work.make (hash, callback, node->network_params.network.publish_threshold, nano::account ());
node->distributed_work.make (hash, node->config.work_peers, callback, node->network_params.network.publish_threshold, nano::account ());
system.deadline_set (5s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_FALSE (nano::work_validate (hash, *work));
// should only be removed after cleanup
ASSERT_EQ (1, node->distributed_work.work.size ());
while (!node->distributed_work.work.empty ())
ASSERT_EQ (1, node->distributed_work.items.size ());
while (!node->distributed_work.items.empty ())
{
node->distributed_work.cleanup_finished ();
ASSERT_NO_ERROR (system.poll ());
Expand All @@ -44,7 +44,7 @@ TEST (distributed_work, no_peers_disabled)
ASSERT_FALSE (work_a.is_initialized ());
done = true;
};
node.distributed_work.make (nano::block_hash (), callback_failure, nano::network_constants::publish_test_threshold);
node.distributed_work.make (nano::block_hash (), node.config.work_peers, callback_failure, nano::network_constants::publish_test_threshold);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
Expand All @@ -64,27 +64,27 @@ TEST (distributed_work, no_peers_cancel)
ASSERT_FALSE (work_a.is_initialized ());
done = true;
};
node.distributed_work.make (hash, callback_to_cancel, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_threshold));
ASSERT_EQ (1, node.distributed_work.work.size ());
node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_threshold));
ASSERT_EQ (1, node.distributed_work.items.size ());
// cleanup should not cancel or remove an ongoing work
node.distributed_work.cleanup_finished ();
ASSERT_EQ (1, node.distributed_work.work.size ());
ASSERT_EQ (1, node.distributed_work.items.size ());

// manually cancel
node.distributed_work.cancel (hash, true); // forces local stop
system.deadline_set (20s);
while (!done && !node.distributed_work.work.empty ())
while (!done && !node.distributed_work.items.empty ())
{
ASSERT_NO_ERROR (system.poll ());
}

// now using observer
done = false;
node.distributed_work.make (hash, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node.network_params.network.publish_threshold));
ASSERT_EQ (1, node.distributed_work.work.size ());
node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node.network_params.network.publish_threshold));
ASSERT_EQ (1, node.distributed_work.items.size ());
node.observers.work_cancel.notify (hash);
system.deadline_set (20s);
while (!done && !node.distributed_work.work.empty ())
while (!done && !node.distributed_work.items.empty ())
{
ASSERT_NO_ERROR (system.poll ());
}
Expand All @@ -104,12 +104,12 @@ TEST (distributed_work, no_peers_multi)
// Test many works for the same root
for (unsigned i{ 0 }; i < total; ++i)
{
node->distributed_work.make (hash, callback, nano::difficulty::from_multiplier (10, node->network_params.network.publish_threshold));
node->distributed_work.make (hash, node->config.work_peers, callback, nano::difficulty::from_multiplier (10, node->network_params.network.publish_threshold));
}
// 1 root, and _total_ requests for that root are expected, but some may have already finished
ASSERT_EQ (1, node->distributed_work.work.size ());
ASSERT_EQ (1, node->distributed_work.items.size ());
{
auto requests (node->distributed_work.work.begin ());
auto requests (node->distributed_work.items.begin ());
ASSERT_EQ (hash, requests->first);
ASSERT_GE (requests->second.size (), total - 4);
}
Expand All @@ -119,7 +119,7 @@ TEST (distributed_work, no_peers_multi)
ASSERT_NO_ERROR (system.poll ());
}
system.deadline_set (5s);
while (node->distributed_work.work.empty ())
while (node->distributed_work.items.empty ())
{
node->distributed_work.cleanup_finished ();
ASSERT_NO_ERROR (system.poll ());
Expand All @@ -129,11 +129,11 @@ TEST (distributed_work, no_peers_multi)
for (unsigned i{ 0 }; i < total; ++i)
{
nano::block_hash hash_i (i + 1);
node->distributed_work.make (hash_i, callback, node->network_params.network.publish_threshold);
node->distributed_work.make (hash_i, node->config.work_peers, callback, node->network_params.network.publish_threshold);
}
// 10 roots expected with 1 work each, but some may have completed so test for some
ASSERT_GT (node->distributed_work.work.size (), 5);
for (auto & requests : node->distributed_work.work)
ASSERT_GT (node->distributed_work.items.size (), 5);
for (auto & requests : node->distributed_work.items)
{
ASSERT_EQ (1, requests.second.size ());
}
Expand All @@ -143,7 +143,7 @@ TEST (distributed_work, no_peers_multi)
ASSERT_NO_ERROR (system.poll ());
}
system.deadline_set (5s);
while (node->distributed_work.work.empty ())
while (node->distributed_work.items.empty ())
{
node->distributed_work.cleanup_finished ();
ASSERT_NO_ERROR (system.poll ());
Expand Down
5 changes: 5 additions & 0 deletions nano/core_test/toml.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ TEST (toml, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.lmdb_max_dbs, defaults.node.lmdb_max_dbs);
ASSERT_EQ (conf.node.max_work_generate_multiplier, defaults.node.max_work_generate_multiplier);
ASSERT_EQ (conf.node.network_threads, defaults.node.network_threads);
ASSERT_EQ (conf.node.secondary_work_peers, defaults.node.secondary_work_peers);
ASSERT_EQ (conf.node.work_watcher_period, defaults.node.work_watcher_period);
ASSERT_EQ (conf.node.online_weight_minimum, defaults.node.online_weight_minimum);
ASSERT_EQ (conf.node.online_weight_quorum, defaults.node.online_weight_quorum);
Expand Down Expand Up @@ -492,6 +493,9 @@ TEST (toml, daemon_config_deserialize_no_defaults)
num_memtables = 3
total_memtable_size = 0
[node.experimental]
secondary_work_peers = ["test.org:998"]
[opencl]
device = 999
enable = true
Expand Down Expand Up @@ -542,6 +546,7 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.max_work_generate_multiplier, defaults.node.max_work_generate_multiplier);
ASSERT_NE (conf.node.frontiers_confirmation, defaults.node.frontiers_confirmation);
ASSERT_NE (conf.node.network_threads, defaults.node.network_threads);
ASSERT_NE (conf.node.secondary_work_peers, defaults.node.secondary_work_peers);
ASSERT_NE (conf.node.work_watcher_period, defaults.node.work_watcher_period);
ASSERT_NE (conf.node.online_weight_minimum, defaults.node.online_weight_minimum);
ASSERT_NE (conf.node.online_weight_quorum, defaults.node.online_weight_quorum);
Expand Down
2 changes: 1 addition & 1 deletion nano/core_test/wallet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,7 @@ TEST (wallet, work_watcher_generation_disabled)
updated_difficulty = existing->difficulty;
}
ASSERT_EQ (updated_difficulty, difficulty);
ASSERT_TRUE (node.distributed_work.work.empty ());
ASSERT_TRUE (node.distributed_work.items.empty ());
}

TEST (wallet, work_watcher_removed)
Expand Down
55 changes: 41 additions & 14 deletions nano/node/distributed_work.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ std::shared_ptr<request_type> nano::work_peer_request::get_prepared_json_request
return request;
}

nano::distributed_work::distributed_work (unsigned int backoff_a, nano::node & node_a, nano::root const & root_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a) :
nano::distributed_work::distributed_work (nano::node & node_a, nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, unsigned int backoff_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a) :
callback (callback_a),
backoff (backoff_a),
node (node_a),
root (root_a),
account (account_a),
need_resolve (node.config.work_peers),
peers (peers_a),
need_resolve (peers_a),
difficulty (difficulty_a),
elapsed (nano::timer_state::started, "distributed work generation timer")
{
Expand Down Expand Up @@ -185,6 +186,11 @@ void nano::distributed_work::start_work ()
});
}
}

if (!local_generation_started && outstanding.empty ())
{
callback (boost::none);
}
}

void nano::distributed_work::cancel_connection (std::shared_ptr<nano::work_peer_request> connection_a)
Expand Down Expand Up @@ -341,10 +347,14 @@ void nano::distributed_work::handle_failure (bool const last_a)
std::weak_ptr<nano::node> node_w (node.shared ());
auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5));
// clang-format off
node.alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l = root, callback_l = callback, next_backoff, difficulty = difficulty, account_l = account ] {
node.alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l = root, peers_l = peers, callback_l = callback, next_backoff, difficulty = difficulty, account_l = account ] {
if (auto node_l = node_w.lock ())
{
node_l->distributed_work.make (next_backoff, root_l, callback_l, difficulty, account_l);
node_l->distributed_work.make (next_backoff, root_l, peers_l, callback_l, difficulty, account_l);
}
else if (callback_l)
{
callback_l (boost::none);
}
});
// clang-format on
Expand Down Expand Up @@ -374,20 +384,20 @@ node (node_a)
{
}

void nano::distributed_work_factory::make (nano::root const & root_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
void nano::distributed_work_factory::make (nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
{
make (1, root_a, callback_a, difficulty_a, account_a);
make (1, root_a, peers_a, callback_a, difficulty_a, account_a);
}

void nano::distributed_work_factory::make (unsigned int backoff_a, nano::root const & root_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
void nano::distributed_work_factory::make (unsigned int backoff_a, nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
{
cleanup_finished ();
if (node.work_generation_enabled ())
{
auto distributed (std::make_shared<nano::distributed_work> (backoff_a, node, root_a, callback_a, difficulty_a, account_a));
auto distributed (std::make_shared<nano::distributed_work> (node, root_a, peers_a, backoff_a, callback_a, difficulty_a, account_a));
{
nano::lock_guard<std::mutex> guard (mutex);
work[root_a].emplace_back (distributed);
items[root_a].emplace_back (distributed);
}
distributed->start ();
}
Expand All @@ -401,8 +411,8 @@ void nano::distributed_work_factory::cancel (nano::root const & root_a, bool con
{
{
nano::lock_guard<std::mutex> guard (mutex);
auto existing_l (work.find (root_a));
if (existing_l != work.end ())
auto existing_l (items.find (root_a));
if (existing_l != items.end ())
{
for (auto & distributed_w : existing_l->second)
{
Expand All @@ -412,15 +422,15 @@ void nano::distributed_work_factory::cancel (nano::root const & root_a, bool con
distributed_l->cancel_once ();
}
}
work.erase (existing_l);
items.erase (existing_l);
}
}
}

void nano::distributed_work_factory::cleanup_finished ()
{
nano::lock_guard<std::mutex> guard (mutex);
for (auto it (work.begin ()), end (work.end ()); it != end;)
for (auto it (items.begin ()), end (items.end ()); it != end;)
{
it->second.erase (std::remove_if (it->second.begin (), it->second.end (), [](auto distributed_a) {
return distributed_a.expired ();
Expand All @@ -429,11 +439,28 @@ void nano::distributed_work_factory::cleanup_finished ()

if (it->second.empty ())
{
it = work.erase (it);
it = items.erase (it);
}
else
{
++it;
}
}
}

namespace nano
{
std::unique_ptr<seq_con_info_component> collect_seq_con_info (distributed_work_factory & distributed_work, const std::string & name)
{
size_t item_count = 0;
{
nano::lock_guard<std::mutex> guard (distributed_work.mutex);
item_count = distributed_work.items.size ();
}

auto composite = std::make_unique<seq_con_info_composite> (name);
auto sizeof_item_element = sizeof (decltype (distributed_work.items)::value_type);
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "items", item_count, sizeof_item_element }));
return composite;
}
}
14 changes: 9 additions & 5 deletions nano/node/distributed_work.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class work_peer_request final
class distributed_work final : public std::enable_shared_from_this<nano::distributed_work>
{
public:
distributed_work (unsigned int, nano::node &, nano::root const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
distributed_work (nano::node &, nano::root const &, std::vector<std::pair<std::string, uint16_t>> const & peers_a, unsigned int, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
~distributed_work ();
void start ();
void start_work ();
Expand All @@ -60,6 +60,7 @@ class distributed_work final : public std::enable_shared_from_this<nano::distrib
std::mutex mutex;
std::map<boost::asio::ip::address, uint16_t> outstanding;
std::vector<std::weak_ptr<nano::work_peer_request>> connections;
std::vector<std::pair<std::string, uint16_t>> const peers;
std::vector<std::pair<std::string, uint16_t>> need_resolve;
uint64_t difficulty;
uint64_t work_result{ 0 };
Expand All @@ -76,13 +77,16 @@ class distributed_work_factory final
{
public:
distributed_work_factory (nano::node &);
void make (nano::root const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
void make (unsigned int, nano::root const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
void make (nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
void make (unsigned int, nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
void cancel (nano::root const &, bool const local_stop = false);
void cleanup_finished ();

std::unordered_map<nano::root, std::vector<std::weak_ptr<nano::distributed_work>>> work;
std::mutex mutex;
nano::node & node;
std::unordered_map<nano::root, std::vector<std::weak_ptr<nano::distributed_work>>> items;
std::mutex mutex;
};

class seq_con_info_component;
std::unique_ptr<seq_con_info_component> collect_seq_con_info (distributed_work_factory & distributed_work, const std::string & name);
}
8 changes: 5 additions & 3 deletions nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4530,7 +4530,7 @@ void nano::json_handler::work_generate ()
}
if (!ec)
{
bool use_peers (request.get_optional<bool> ("use_peers") == true);
auto use_peers (request.get<bool> ("use_peers", false));
auto rpc_l (shared_from_this ());
auto callback = [rpc_l, hash, this](boost::optional<uint64_t> const & work_a) {
if (work_a)
Expand Down Expand Up @@ -4566,9 +4566,11 @@ void nano::json_handler::work_generate ()
}
else
{
if (node.work_generation_enabled ())
auto secondary_work_peers_l (request.get<bool> ("secondary_work_peers", false));
auto const & peers_l (secondary_work_peers_l ? node.config.secondary_work_peers : node.config.work_peers);
if (node.work_generation_enabled (peers_l))
{
node.work_generate (hash, callback, difficulty, account);
node.work_generate (hash, callback, difficulty, account, secondary_work_peers_l);
}
else
{
Expand Down
13 changes: 10 additions & 3 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ std::unique_ptr<seq_con_info_component> collect_seq_con_info (node & node, const
composite->add_component (collect_seq_con_info (node.confirmation_height_processor, "confirmation_height_processor"));
composite->add_component (collect_seq_con_info (node.pending_confirmation_height, "pending_confirmation_height"));
composite->add_component (collect_seq_con_info (node.worker, "worker"));
composite->add_component (collect_seq_con_info (node.distributed_work, "distributed_work"));
return composite;
}
}
Expand Down Expand Up @@ -967,7 +968,12 @@ bool nano::node::local_work_generation_enabled () const

bool nano::node::work_generation_enabled () const
{
return !config.work_peers.empty () || local_work_generation_enabled ();
return work_generation_enabled (config.work_peers);
}

bool nano::node::work_generation_enabled (std::vector<std::pair<std::string, uint16_t>> const & peers_a) const
{
return !peers_a.empty () || local_work_generation_enabled ();
}

boost::optional<uint64_t> nano::node::work_generate_blocking (nano::block & block_a)
Expand All @@ -990,9 +996,10 @@ void nano::node::work_generate (nano::root const & root_a, std::function<void(bo
work_generate (root_a, callback_a, network_params.network.publish_threshold, account_a);
}

void nano::node::work_generate (nano::root const & root_a, std::function<void(boost::optional<uint64_t>)> callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
void nano::node::work_generate (nano::root const & root_a, std::function<void(boost::optional<uint64_t>)> callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a, bool secondary_work_peers_a)
{
distributed_work.make (root_a, callback_a, difficulty_a, account_a);
auto const & peers_l (secondary_work_peers_a ? config.secondary_work_peers : config.work_peers);
distributed_work.make (root_a, peers_l, callback_a, difficulty_a, account_a);
}

boost::optional<uint64_t> nano::node::work_generate_blocking (nano::root const & root_a, boost::optional<nano::account> const & account_a)
Expand Down
3 changes: 2 additions & 1 deletion nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,12 @@ class node final : public std::enable_shared_from_this<nano::node>
int price (nano::uint128_t const &, int);
bool local_work_generation_enabled () const;
bool work_generation_enabled () const;
bool work_generation_enabled (std::vector<std::pair<std::string, uint16_t>> const &) const;
boost::optional<uint64_t> work_generate_blocking (nano::block &, uint64_t);
boost::optional<uint64_t> work_generate_blocking (nano::block &);
boost::optional<uint64_t> work_generate_blocking (nano::root const &, uint64_t, boost::optional<nano::account> const & = boost::none);
boost::optional<uint64_t> work_generate_blocking (nano::root const &, boost::optional<nano::account> const & = boost::none);
void work_generate (nano::root const &, std::function<void(boost::optional<uint64_t>)>, uint64_t, boost::optional<nano::account> const & = boost::none);
void work_generate (nano::root const &, std::function<void(boost::optional<uint64_t>)>, uint64_t, boost::optional<nano::account> const & = boost::none, bool const = false);
void work_generate (nano::root const &, std::function<void(boost::optional<uint64_t>)>, boost::optional<nano::account> const & = boost::none);
void add_initial_peers ();
void block_confirm (std::shared_ptr<nano::block>);
Expand Down
Loading

0 comments on commit 2d4a1ff

Please sign in to comment.