Skip to content

Commit

Permalink
RPC publish optional work_watch flag to add to work_watcher (#2168)
Browse files Browse the repository at this point in the history
* publishes from rpc send, publish and wallet send, publish will no longer be eligible to drop due to bandwidth limiting

* PR feedback edits

* formatting

* add flag watch_work, defaulting to true to process RPC call

* normalize bool name to fit other usages

* add watch_work setting to false to require enable_control

* default watch_work is true, should check for false
  • Loading branch information
Russel Waters authored Aug 11, 2019
1 parent d44660e commit 3d5c2d2
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 29 deletions.
17 changes: 11 additions & 6 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,17 @@ void nano::block_processor::process_batch (std::unique_lock<std::mutex> & lock_a
}
}

void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr<nano::block> block_a)
void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr<nano::block> block_a, const bool watch_work_a)
{
// Start collecting quorum on block
node.active.start (block_a);
//add block to watcher if desired after block has been added to active
if (watch_work_a)
{
node.wallets.watcher.add (block_a);
}
// Announce block contents to the network
node.network.flood_block (block_a);
node.network.flood_block (block_a, false);
if (node.config.enable_voting)
{
// Announce our weighted vote to the network
Expand Down Expand Up @@ -389,7 +394,7 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std::
});
}

nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, nano::unchecked_info info_a)
nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, nano::unchecked_info info_a, const bool watch_work_a)
{
nano::process_return result;
auto hash (info_a.block->hash ());
Expand All @@ -407,7 +412,7 @@ nano::process_return nano::block_processor::process_one (nano::transaction const
}
if (info_a.modified > nano::seconds_since_epoch () - 300 && node.block_arrival.recent (hash))
{
process_live (hash, info_a.block);
process_live (hash, info_a.block, watch_work_a);
}
queue_unchecked (transaction_a, hash);
break;
Expand Down Expand Up @@ -522,10 +527,10 @@ nano::process_return nano::block_processor::process_one (nano::transaction const
return result;
}

nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, std::shared_ptr<nano::block> block_a)
nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, std::shared_ptr<nano::block> block_a, const bool watch_work_a)
{
nano::unchecked_info info (block_a, block_a->account (), 0, nano::signature_verification::unknown);
auto result (process_one (transaction_a, info));
auto result (process_one (transaction_a, info, watch_work_a));
return result;
}

Expand Down
6 changes: 3 additions & 3 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class block_processor final
bool should_log (bool);
bool have_blocks ();
void process_blocks ();
nano::process_return process_one (nano::transaction const &, nano::unchecked_info);
nano::process_return process_one (nano::transaction const &, std::shared_ptr<nano::block>);
nano::process_return process_one (nano::transaction const &, nano::unchecked_info, const bool = false);
nano::process_return process_one (nano::transaction const &, std::shared_ptr<nano::block>, const bool = false);
nano::vote_generator generator;
// Delay required for average network propagartion before requesting confirmation
static std::chrono::milliseconds constexpr confirmation_request_delay{ 1500 };
Expand All @@ -55,7 +55,7 @@ class block_processor final
void queue_unchecked (nano::transaction const &, nano::block_hash const &);
void verify_state_blocks (nano::transaction const & transaction_a, std::unique_lock<std::mutex> &, size_t = std::numeric_limits<size_t>::max ());
void process_batch (std::unique_lock<std::mutex> &);
void process_live (nano::block_hash const &, std::shared_ptr<nano::block>);
void process_live (nano::block_hash const &, std::shared_ptr<nano::block>, const bool = false);
bool stopped;
bool active;
bool awaiting_write{ false };
Expand Down
8 changes: 4 additions & 4 deletions nano/node/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void nano::frontier_req_client::run ()
}
}
},
false); // is bootstrap traffic is_dropable false
false); // is bootstrap traffic is_droppable false
}

std::shared_ptr<nano::bootstrap_client> nano::bootstrap_client::shared ()
Expand Down Expand Up @@ -352,7 +352,7 @@ void nano::bulk_pull_client::request ()
this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_request_failure, nano::stat::dir::in);
}
},
false); // is bootstrap traffic is_dropable false
false); // is bootstrap traffic is_droppable false
}

void nano::bulk_pull_client::receive_block ()
Expand Down Expand Up @@ -539,7 +539,7 @@ void nano::bulk_push_client::start ()
}
}
},
false); // is bootstrap traffic is_dropable false
false); // is bootstrap traffic is_droppable false
}

void nano::bulk_push_client::push (nano::transaction const & transaction_a)
Expand Down Expand Up @@ -678,7 +678,7 @@ void nano::bulk_pull_account_client::request ()
this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_error_starting_request, nano::stat::dir::in);
}
},
false); // is bootstrap traffic is_dropable false
false); // is bootstrap traffic is_droppable false
}

void nano::bulk_pull_account_client::receive_pending ()
Expand Down
2 changes: 1 addition & 1 deletion nano/node/election.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ bool nano::election::publish (std::shared_ptr<nano::block> block_a)
{
blocks.insert (std::make_pair (block_a->hash (), block_a));
confirm_if_quorum (transaction);
node.network.flood_block (block_a);
node.network.flood_block (block_a, false);
}
else
{
Expand Down
3 changes: 2 additions & 1 deletion nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2854,6 +2854,7 @@ void nano::json_handler::payment_wait ()
void nano::json_handler::process ()
{
const bool json_block_l = request.get<bool> ("json_block", false);
const bool watch_work_l = request.get<bool> ("watch_work", true);
std::shared_ptr<nano::block> block;
if (json_block_l)
{
Expand Down Expand Up @@ -2935,7 +2936,7 @@ void nano::json_handler::process ()
{
if (!nano::work_validate (*block))
{
auto result (node.process_local (block));
auto result (node.process_local (block, watch_work_l));
switch (result.code)
{
case nano::process_result::progress:
Expand Down
4 changes: 2 additions & 2 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ bool nano::network::send_votes_cache (std::shared_ptr<nano::transport::channel>
return result;
}

void nano::network::flood_message (nano::message const & message_a)
void nano::network::flood_message (nano::message const & message_a, bool const is_droppable_a)
{
auto list (list_fanout ());
for (auto i (list.begin ()), n (list.end ()); i != n; ++i)
{
(*i)->send (message_a);
(*i)->send (message_a, nullptr, is_droppable_a);
}
}

Expand Down
7 changes: 4 additions & 3 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class network final
~network ();
void start ();
void stop ();
void flood_message (nano::message const &);
void flood_message (nano::message const &, bool const = true);
void flood_keepalive ()
{
nano::keepalive message;
Expand All @@ -126,11 +126,12 @@ class network final
nano::confirm_ack message (vote_a);
flood_message (message);
}
void flood_block (std::shared_ptr<nano::block> block_a)
void flood_block (std::shared_ptr<nano::block> block_a, bool const is_droppable_a = true)
{
nano::publish publish (block_a);
flood_message (publish);
flood_message (publish, is_droppable_a);
}

void flood_block_batch (std::deque<std::shared_ptr<nano::block>>, unsigned = broadcast_interval_ms);
void merge_peers (std::array<nano::endpoint, 8> const &);
void merge_peer (nano::endpoint const &);
Expand Down
4 changes: 2 additions & 2 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ nano::process_return nano::node::process (nano::block const & block_a)
return result;
}

nano::process_return nano::node::process_local (std::shared_ptr<nano::block> block_a)
nano::process_return nano::node::process_local (std::shared_ptr<nano::block> block_a, bool const work_watcher_a)
{
// Add block hash as recently arrived to trigger automatic rebroadcast and election
block_arrival.add (block_a->hash ());
Expand All @@ -618,7 +618,7 @@ nano::process_return nano::node::process_local (std::shared_ptr<nano::block> blo
block_processor.wait_write ();
// Process block
auto transaction (store.tx_begin_write ());
return block_processor.process_one (transaction, info);
return block_processor.process_one (transaction, info, work_watcher_a);
}

void nano::node::start ()
Expand Down
2 changes: 1 addition & 1 deletion nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class node final : public std::enable_shared_from_this<nano::node>
void process_confirmed (nano::election_status const &, uint8_t = 0);
void process_active (std::shared_ptr<nano::block>);
nano::process_return process (nano::block const &);
nano::process_return process_local (std::shared_ptr<nano::block>);
nano::process_return process_local (std::shared_ptr<nano::block>, bool const = false);
void keepalive_preconfigured (std::vector<std::string> const &);
nano::block_hash latest (nano::account const &);
nano::uint128_t balance (nano::account const &);
Expand Down
4 changes: 2 additions & 2 deletions nano/node/transport/transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ node (node_a)
{
}

void nano::transport::channel::send (nano::message const & message_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a, bool const & is_dropable)
void nano::transport::channel::send (nano::message const & message_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a, bool const is_droppable_a)
{
callback_visitor visitor;
message_a.visit (visitor);
auto buffer (message_a.to_bytes ());
auto detail (visitor.result);
if (!is_dropable || !limiter.should_drop (buffer->size ()))
if (!is_droppable_a || !limiter.should_drop (buffer->size ()))
{
send_buffer (buffer, detail, callback_a);
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace transport
virtual ~channel () = default;
virtual size_t hash_code () const = 0;
virtual bool operator== (nano::transport::channel const &) const = 0;
void send (nano::message const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr, bool const & = true);
void send (nano::message const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr, bool const = true);
virtual void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) = 0;
virtual std::function<void(boost::system::error_code const &, size_t)> callback (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const = 0;
virtual std::string to_string () const = 0;
Expand Down
4 changes: 2 additions & 2 deletions nano/node/wallet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ std::shared_ptr<nano::block> nano::wallet::send_action (nano::account const & so
if (block != nullptr)
{
cached_block = true;
wallets.node.network.flood_block (block);
wallets.node.network.flood_block (block, false);
}
}
else if (status != MDB_NOTFOUND)
Expand Down Expand Up @@ -1500,7 +1500,7 @@ void nano::work_watcher::run ()
current->second = block;
}
}
node.network.flood_block (block);
node.network.flood_block (block, false);
node.active.update_difficulty (*block.get ());
lock.lock ();
if (stopped)
Expand Down
3 changes: 2 additions & 1 deletion nano/rpc/rpc_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ void nano::rpc_handler::process_request ()
else if (action == "process")
{
auto force = request.get_optional<bool> ("force");
if (force.is_initialized () && *force && !rpc_config.enable_control)
auto watch_work = request.get_optional<bool> ("watch_work");
if (((force.is_initialized () && *force) || (watch_work.is_initialized () && !*watch_work)) && !rpc_config.enable_control)
{
json_error_response (response, rpc_control_disabled_ec.message ());
error = true;
Expand Down
60 changes: 60 additions & 0 deletions nano/rpc_test/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,66 @@ TEST (rpc, process_block)
ASSERT_EQ (send.hash ().to_string (), send_hash);
}

TEST (rpc, process_block_with_work_watcher)
{
nano::system system;
nano::node_config node_config (24000, system.logging);
node_config.enable_voting = false;
auto & node1 = *system.add_node (node_config);
nano::keypair key;
auto latest (system.nodes[0]->latest (nano::test_genesis_key.pub));
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, latest, nano::test_genesis_key.pub, nano::genesis_amount - 100, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest)));
uint64_t difficulty1 (0);
nano::work_validate (*send, &difficulty1);
auto multiplier1 = nano::difficulty::to_multiplier (difficulty1, node1.network_params.network.publish_threshold);
enable_ipc_transport_tcp (node1.config.ipc_config.transport_tcp);
nano::node_rpc_config node_rpc_config;
nano::ipc::ipc_server ipc_server (node1, node_rpc_config);
nano::rpc_config rpc_config (true);
nano::ipc_rpc_processor ipc_rpc_processor (system.io_ctx, rpc_config);
nano::rpc rpc (system.io_ctx, rpc_config, ipc_rpc_processor);
rpc.start ();
boost::property_tree::ptree request;
request.put ("action", "process");
request.put ("work_watcher", true);
std::string json;
send->serialize_json (json);
request.put ("block", json);
test_response response (request, rpc.config.port, system.io_ctx);
system.deadline_set (5s);
while (response.status == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (200, response.status);
system.deadline_set (10s);
while (system.nodes[0]->latest (nano::test_genesis_key.pub) != send->hash ())
{
ASSERT_NO_ERROR (system.poll ());
}
system.deadline_set (10s);
auto updated (false);
uint64_t updated_difficulty;
while (!updated)
{
std::unique_lock<std::mutex> lock (node1.active.mutex);
//fill multipliers_cb and update active difficulty;
for (auto i (0); i < node1.active.multipliers_cb.size (); i++)
{
node1.active.multipliers_cb.push_back (multiplier1 * (1 + i / 100.));
}
node1.active.update_active_difficulty (lock);
auto const existing (node1.active.roots.find (send->qualified_root ()));
//if existing is junk the block has been confirmed already
ASSERT_NE (existing, node1.active.roots.end ());
updated = existing->difficulty != difficulty1;
updated_difficulty = existing->difficulty;
lock.unlock ();
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_GT (updated_difficulty, difficulty1);
}

TEST (rpc, process_block_no_work)
{
nano::system system (24000, 1);
Expand Down

0 comments on commit 3d5c2d2

Please sign in to comment.