Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web socket subscription for active difficulty #2091

Merged
66 changes: 66 additions & 0 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,72 @@ TEST (websocket, subscription_edge)
node1->stop ();
}

// Test client subscribing to changes in active_difficulty
TEST (websocket, subscribe_active_difficulty)
{
nano::system system (24000, 1);
nano::node_init init1;
nano::node_config config;
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = 24078;

auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
node1->start ();
system.nodes.push_back (node1);

ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::active_difficulty));

// Subscribe to active_difficulty and wait for response asynchronously
ack_ready = false;
auto client_task = ([&node1]() -> boost::optional<std::string> {
auto response = websocket_test_call ("::1", "24078", R"json({"action": "subscribe", "topic": "active_difficulty", "ack": true})json", true, true);
return response;
});
auto client_future = std::async (client_task);

// Wait for acknowledge
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::active_difficulty));

// Fake history records to force trended_active_difficulty change
node1->active.multipliers_cb.push_front (10.);

// Wait to receive the active_difficulty message
system.deadline_set (5s);
while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}

// Check active_difficulty response
auto response = client_future.get ();
ASSERT_TRUE (response);
std::stringstream stream;
stream << response;
boost::property_tree::ptree event;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "active_difficulty");

auto message_contents = event.get_child ("message");
uint64_t network_minimum;
nano::from_string_hex (message_contents.get<std::string> ("network_minimum"), network_minimum);
ASSERT_EQ (network_minimum, node1->network_params.network.publish_threshold);

uint64_t network_current;
nano::from_string_hex (message_contents.get<std::string> ("network_current"), network_current);
ASSERT_EQ (network_current, node1->active.active_difficulty ());

double multiplier = message_contents.get<double> ("multiplier");
ASSERT_NEAR (multiplier, nano::difficulty::to_multiplier (node1->active.active_difficulty (), node1->network_params.network.publish_threshold), 1e-6);

node1->stop ();
}

/** Subscribes to block confirmations, confirms a block and then awaits websocket notification */
TEST (websocket, confirmation)
{
Expand Down
6 changes: 6 additions & 0 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,13 @@ void nano::active_transactions::update_active_difficulty (std::unique_lock<std::
auto sum (std::accumulate (multipliers_cb.begin (), multipliers_cb.end (), double(0)));
auto difficulty = nano::difficulty::from_multiplier (sum / multipliers_cb.size (), node.network_params.network.publish_threshold);
assert (difficulty >= node.network_params.network.publish_threshold);

bool notify_change = trended_active_difficulty != difficulty;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently implemented to only notify if the active difficulty changes - is this correct or should it notify on every iteration even if active difficulty remains the same?

Copy link
Contributor

@cryptocode cryptocode Jun 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only when it changes I think. Thoughts @zhyatt ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the call is sufficiently lightweight, I could see value in triggering it every cycle, specifically to ensure consistent data points for any tracking/visualizations, and also as a mechanism that allows watching for that consistency. If gaps are seen, this allows polling for difficulty separately through RPC as fallback or alerting of a possible issue (if these notifications aren't seen for a time period). Any additional thoughts @cryptocode or @guilhermelawless ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good idea, it's very light data especially considering the ~20 second period.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me if it simplifies clients and is low frequency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The request_loop runs faster in testing - will publishing on every iteration have an impact on other tests if running faster?:

request_interval_ms = is_test_network () ? (is_sanitizer_build ? 100 : 20) : 16000;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a problem I believe, there is much higher frequency data being passed around. Tests should pass.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agree with @guilhermelawless, sending it every cycle should be fine.

trended_active_difficulty = difficulty;
if (notify_change)
{
difficulty_observer (trended_active_difficulty);
}
}

uint64_t nano::active_transactions::active_difficulty ()
Expand Down
1 change: 1 addition & 0 deletions nano/node/active_transactions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class active_transactions final
uint64_t trended_active_difficulty;
size_t priority_cementable_frontiers_size ();
boost::circular_buffer<double> difficulty_trend ();
std::function<void(uint64_t)> difficulty_observer;

private:
// Call action with confirmed block, may be different than what we started with
Expand Down
12 changes: 12 additions & 0 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ startup_time (std::chrono::steady_clock::now ())
network.disconnect_observer = [this]() {
observers.disconnect.notify ();
};
active.difficulty_observer = [this](uint64_t active_difficulty) {
observers.difficulty.notify (active_difficulty);
};
if (!config.callback_address.empty ())
{
observers.blocks.add ([this](nano::election_status const & status_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) {
Expand Down Expand Up @@ -351,6 +354,15 @@ startup_time (std::chrono::steady_clock::now ())
this->websocket_server->broadcast (builder.stopped_election (hash_a));
}
});

observers.difficulty.add ([this](uint64_t active_difficulty) {
if (this->websocket_server->any_subscriber (nano::websocket::topic::active_difficulty))
{
nano::websocket::message_builder builder;
auto msg (builder.difficulty_changed (network_params.network.publish_threshold, active_difficulty));
this->websocket_server->broadcast (msg);
}
});
}
observers.endpoint.add ([this](std::shared_ptr<nano::transport::channel> channel_a) {
if (channel_a->get_type () == nano::transport::transport_type::udp)
Expand Down
1 change: 1 addition & 0 deletions nano/node/node_observers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class node_observers final
nano::observer_set<nano::account const &, bool> account_balance;
nano::observer_set<std::shared_ptr<nano::transport::channel>> endpoint;
nano::observer_set<> disconnect;
nano::observer_set<uint64_t> difficulty;
};

std::unique_ptr<seq_con_info_component> collect_seq_con_info (node_observers & node_observers, const std::string & name);
Expand Down
25 changes: 25 additions & 0 deletions nano/node/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ nano::websocket::topic to_topic (std::string topic_a)
{
topic = nano::websocket::topic::ack;
}
else if (topic_a == "active_difficulty")
{
topic = nano::websocket::topic::active_difficulty;
}

return topic;
}

Expand All @@ -338,6 +343,10 @@ std::string from_topic (nano::websocket::topic topic_a)
{
topic = "ack";
}
else if (topic_a == nano::websocket::topic::active_difficulty)
{
topic = "active_difficulty";
}
return topic;
}
}
Expand Down Expand Up @@ -616,6 +625,22 @@ nano::websocket::message nano::websocket::message_builder::vote_received (std::s
return message_l;
}

nano::websocket::message nano::websocket::message_builder::difficulty_changed (uint64_t publish_threshold, uint64_t difficulty_active)
{
nano::websocket::message message_l (nano::websocket::topic::active_difficulty);
set_common_fields (message_l);

// Active difficulty information
boost::property_tree::ptree difficulty_l;
difficulty_l.put ("network_minimum", nano::to_string_hex (publish_threshold));
difficulty_l.put ("network_current", nano::to_string_hex (difficulty_active));
auto multiplier = nano::difficulty::to_multiplier (difficulty_active, publish_threshold);
difficulty_l.put ("multiplier", nano::to_string (multiplier));

message_l.contents.add_child ("message", difficulty_l);
return message_l;
}

void nano::websocket::message_builder::set_common_fields (nano::websocket::message & message_a)
{
using namespace std::chrono;
Expand Down
3 changes: 3 additions & 0 deletions nano/node/websocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ namespace websocket
stopped_election,
/** A vote message **/
vote,
/** An active difficulty message */
active_difficulty,
/** Auxiliary length, not a valid topic, must be the last enum */
_length
};
Expand Down Expand Up @@ -83,6 +85,7 @@ namespace websocket
message block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype, bool include_block, nano::election_status_type election_status_type_a);
message stopped_election (nano::block_hash const & hash_a);
message vote_received (std::shared_ptr<nano::vote> vote_a);
message difficulty_changed (uint64_t publish_threshold, uint64_t difficulty_active);

private:
/** Set the common fields for messages: timestamp and topic. */
Expand Down