diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index 4062a94736..eef5efc16e 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -335,3 +335,170 @@ TEST (websocket, confirmation_options) node1->stop (); } + +/** Subscribes to votes, sends a block and awaits websocket notification of a vote arrival */ +TEST (websocket, vote) +{ + nano::system system (24000, 1); + nano::node_init init1; + nano::node_config config; + nano::node_flags node_flags; + config.websocket_config.enabled = true; + config.websocket_config.port = 24078; + + auto node1 (std::make_shared (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags)); + nano::uint256_union wallet; + nano::random_pool::generate_block (wallet.bytes.data (), wallet.bytes.size ()); + node1->wallets.create (wallet); + node1->start (); + system.nodes.push_back (node1); + + // Start websocket test-client in a separate thread + ack_ready = false; + std::atomic client_thread_finished{ false }; + ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote)); + std::thread client_thread ([&system, &client_thread_finished]() { + // This will expect two results: the acknowledgement of the subscription + // and then the vote message + auto response = websocket_test_call (system.io_ctx, "::1", "24078", + R"json({"action": "subscribe", "topic": "vote", "ack": true})json", true, true); + + ASSERT_TRUE (response); + boost::property_tree::ptree event; + std::stringstream stream; + stream << response; + boost::property_tree::read_json (stream, event); + ASSERT_EQ (event.get ("topic"), "vote"); + client_thread_finished = true; + }); + client_thread.detach (); + + // Wait for the subscription to be acknowledged + system.deadline_set (5s); + while (!ack_ready) + { + ASSERT_NO_ERROR (system.poll ()); + } + ack_ready = false; + + ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote)); + + // Quick-confirm a block + nano::keypair key; + system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); + nano::block_hash previous (node1->latest (nano::test_genesis_key.pub)); + auto send (std::make_shared (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, nano::genesis_amount - (node1->config.online_weight_minimum.number () + 1), key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous))); + node1->process_active (send); + + // Wait for the websocket client to receive the vote message + system.deadline_set (5s); + while (!client_thread_finished) + { + ASSERT_NO_ERROR (system.poll ()); + } + + node1->stop (); +} + +/** Tests vote subscription options */ +TEST (websocket, vote_options) +{ + nano::system system (24000, 1); + nano::node_init init1; + nano::node_config config; + nano::node_flags node_flags; + config.websocket_config.enabled = true; + config.websocket_config.port = 24078; + + auto node1 (std::make_shared (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags)); + nano::uint256_union wallet; + nano::random_pool::generate_block (wallet.bytes.data (), wallet.bytes.size ()); + node1->wallets.create (wallet); + node1->start (); + system.nodes.push_back (node1); + + // Start websocket test-client in a separate thread + ack_ready = false; + std::atomic client_thread_finished{ false }; + ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote)); + std::thread client_thread ([&system, &client_thread_finished]() { + std::ostringstream data; + data << R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": [")json" + << nano::test_genesis_key.pub.to_account () + << R"json("]}})json"; + auto response = websocket_test_call (system.io_ctx, "::1", "24078", data.str (), true, true); + + ASSERT_TRUE (response); + boost::property_tree::ptree event; + std::stringstream stream; + stream << response; + boost::property_tree::read_json (stream, event); + ASSERT_EQ (event.get ("topic"), "vote"); + client_thread_finished = true; + }); + client_thread.detach (); + + // Wait for the subscription to be acknowledged + system.deadline_set (5s); + while (!ack_ready) + { + ASSERT_NO_ERROR (system.poll ()); + } + ack_ready = false; + + ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote)); + + // Quick-confirm a block + nano::keypair key; + auto balance = nano::genesis_amount; + system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); + auto send_amount = node1->config.online_weight_minimum.number () + 1; + auto confirm_block = [&]() { + nano::block_hash previous (node1->latest (nano::test_genesis_key.pub)); + balance -= send_amount; + auto send (std::make_shared (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, balance, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous))); + node1->process_active (send); + }; + confirm_block (); + + // Wait for the websocket client to receive the vote message + system.deadline_set (5s); + while (!client_thread_finished) + { + ASSERT_NO_ERROR (system.poll ()); + } + + std::atomic client_thread_2_finished{ false }; + ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote)); + std::thread client_thread_2 ([&system, &client_thread_2_finished]() { + auto response = websocket_test_call (system.io_ctx, "::1", "24078", + R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": ["xrb_invalid"]}})json", true, true, 1s); + + // No response expected given the filter + ASSERT_FALSE (response); + client_thread_2_finished = true; + }); + client_thread_2.detach (); + + // Wait for the subscription to be acknowledged + system.deadline_set (5s); + while (!ack_ready) + { + ASSERT_NO_ERROR (system.poll ()); + } + ack_ready = false; + + ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote)); + + // Confirm another block + confirm_block (); + + // No response expected + system.deadline_set (5s); + while (!client_thread_2_finished) + { + ASSERT_NO_ERROR (system.poll ()); + } + + node1->stop (); +} \ No newline at end of file diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 4d0ac6f982..b269eb4447 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1220,6 +1220,17 @@ startup_time (std::chrono::steady_clock::now ()) } } }); + if (this->websocket_server) + { + observers.vote.add ([this](nano::transaction const & transaction, std::shared_ptr vote_a, std::shared_ptr channel_a) { + if (this->websocket_server->any_subscribers (nano::websocket::topic::vote)) + { + nano::websocket::message_builder builder; + auto msg (builder.vote_received (vote_a)); + this->websocket_server->broadcast (msg); + } + }); + } if (NANO_VERSION_PATCH == 0) { logger.always_log ("Node starting, version: ", NANO_MAJOR_MINOR_VERSION); diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index b99d06e4a4..879f186566 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -13,15 +13,23 @@ node (node_a) { for (auto account_l : *accounts_l) { - // Check if the account is valid, but no error handling if it's not, simply not added to the filter nano::account result_l (0); if (!result_l.decode_account (account_l.second.data ())) { // Do not insert the given raw data to keep old prefix support accounts.insert (result_l.to_account ()); } + else + { + node.logger.always_log (boost::str (boost::format ("Websocket: invalid account provided for filtering blocks: %1%") % account_l.second.data ())); + } } } + // Warn the user if the options resulted in an empty filter + if (!all_local_accounts && accounts.empty ()) + { + node.logger.always_log ("Websocket: provided options resulted in an empty block confirmation filter"); + } } bool nano::websocket::confirmation_options::should_filter (nano::websocket::message const & message_a) const @@ -51,11 +59,49 @@ bool nano::websocket::confirmation_options::should_filter (nano::websocket::mess return should_filter_l; } +nano::websocket::vote_options::vote_options (boost::property_tree::ptree const & options_a, nano::node & node_a) : +node (node_a) +{ + auto representatives_l (options_a.get_child_optional ("representatives")); + if (representatives_l) + { + for (auto representative_l : *representatives_l) + { + nano::account result_l (0); + if (!result_l.decode_account (representative_l.second.data ())) + { + // Do not insert the given raw data to keep old prefix support + representatives.insert (result_l.to_account ()); + } + else + { + node.logger.always_log (boost::str (boost::format ("Websocket: invalid account given to filter votes: %1%") % representative_l.second.data ())); + } + } + } + // Warn the user if the options resulted in an empty filter + if (representatives.empty ()) + { + node.logger.always_log ("Websocket: provided options resulted in an empty vote filter"); + } +} + +bool nano::websocket::vote_options::should_filter (nano::websocket::message const & message_a) const +{ + bool should_filter_l (true); + auto representative_text_l (message_a.contents.get ("message.account")); + if (representatives.find (representative_text_l) != representatives.end ()) + { + should_filter_l = false; + } + return should_filter_l; +} + nano::websocket::session::session (nano::websocket::listener & listener_a, boost::asio::ip::tcp::socket socket_a) : ws_listener (listener_a), ws (std::move (socket_a)), write_strand (ws.get_executor ()) { ws.text (true); - ws_listener.get_node ().logger.try_log ("websocket session started"); + ws_listener.get_node ().logger.try_log ("Websocket: session started"); } nano::websocket::session::~session () @@ -68,7 +114,7 @@ nano::websocket::session::~session () } } - ws_listener.get_node ().logger.try_log ("websocket session ended"); + ws_listener.get_node ().logger.try_log ("Websocket: session ended"); } void nano::websocket::session::handshake () @@ -82,7 +128,7 @@ void nano::websocket::session::handshake () } else { - self_l->ws_listener.get_node ().logger.always_log ("websocket handshake failed: ", ec.message ()); + self_l->ws_listener.get_node ().logger.always_log ("Websocket: handshake failed: ", ec.message ()); } }); } @@ -163,12 +209,12 @@ void nano::websocket::session::read () } catch (boost::property_tree::json_parser::json_parser_error const & ex) { - self_l->ws_listener.get_node ().logger.try_log ("websocket json parsing failed: ", ex.what ()); + self_l->ws_listener.get_node ().logger.try_log ("Websocket: json parsing failed: ", ex.what ()); } } else { - self_l->ws_listener.get_node ().logger.try_log ("websocket read failed: ", ec.message ()); + self_l->ws_listener.get_node ().logger.try_log ("Websocket: read failed: ", ec.message ()); } }); } @@ -182,6 +228,10 @@ nano::websocket::topic to_topic (std::string topic_a) { topic = nano::websocket::topic::confirmation; } + else if (topic_a == "vote") + { + topic = nano::websocket::topic::vote; + } else if (topic_a == "ack") { topic = nano::websocket::topic::ack; @@ -196,6 +246,10 @@ std::string from_topic (nano::websocket::topic topic_a) { topic = "confirmation"; } + else if (topic_a == nano::websocket::topic::vote) + { + topic = "vote"; + } else if (topic_a == nano::websocket::topic::ack) { topic = "ack"; @@ -233,6 +287,10 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const { subscriptions.insert (std::make_pair (topic_l, options_l ? std::make_unique (options_l.get (), ws_listener.get_node ()) : std::make_unique ())); } + else if (topic_l == nano::websocket::topic::vote) + { + subscriptions.insert (std::make_pair (topic_l, options_l ? std::make_unique (options_l.get (), ws_listener.get_node ()) : std::make_unique ())); + } else { subscriptions.insert (std::make_pair (topic_l, std::make_unique ())); @@ -284,7 +342,7 @@ socket (node_a.io_ctx) } catch (std::exception const & ex) { - node.logger.always_log ("websocket listen failed: ", ex.what ()); + node.logger.always_log ("Websocket: listen failed: ", ex.what ()); } } @@ -308,7 +366,7 @@ void nano::websocket::listener::on_accept (boost::system::error_code ec) { if (ec) { - node.logger.always_log ("websocket accept failed: ", ec.message ()); + node.logger.always_log ("Websocket: accept failed: ", ec.message ()); } else { @@ -361,14 +419,8 @@ void nano::websocket::listener::decrease_subscription_count (nano::websocket::to nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype) { - nano::websocket::message msg (nano::websocket::topic::confirmation); - using namespace std::chrono; - auto milli_since_epoch = std::chrono::duration_cast (std::chrono::system_clock::now ().time_since_epoch ()).count (); - - // Common message information - boost::property_tree::ptree & message_l = msg.contents; - message_l.add ("topic", from_topic (msg.topic)); - message_l.add ("time", std::to_string (milli_since_epoch)); + nano::websocket::message message_l (nano::websocket::topic::confirmation); + set_common_fields (message_l); // Block confirmation properties boost::property_tree::ptree message_node_l; @@ -382,9 +434,31 @@ nano::websocket::message nano::websocket::message_builder::block_confirmed (std: block_node_l.add ("subtype", subtype); } message_node_l.add_child ("block", block_node_l); - message_l.add_child ("message", message_node_l); + message_l.contents.add_child ("message", message_node_l); + + return message_l; +} + +nano::websocket::message nano::websocket::message_builder::vote_received (std::shared_ptr vote_a) +{ + nano::websocket::message message_l (nano::websocket::topic::vote); + set_common_fields (message_l); + + // Vote information + boost::property_tree::ptree vote_node_l; + vote_a->serialize_json (vote_node_l); + message_l.contents.add_child ("message", vote_node_l); + return message_l; +} + +void nano::websocket::message_builder::set_common_fields (nano::websocket::message & message_a) +{ + using namespace std::chrono; + auto milli_since_epoch = std::chrono::duration_cast (std::chrono::system_clock::now ().time_since_epoch ()).count (); - return msg; + // Common message information + message_a.contents.add ("topic", from_topic (message_a.topic)); + message_a.contents.add ("time", std::to_string (milli_since_epoch)); } std::string nano::websocket::message::to_string () const diff --git a/nano/node/websocket.hpp b/nano/node/websocket.hpp index b046e231b6..7523c31eb0 100644 --- a/nano/node/websocket.hpp +++ b/nano/node/websocket.hpp @@ -37,6 +37,8 @@ namespace websocket ack, /** A confirmation message */ confirmation, + /** A vote message **/ + vote, /** Auxiliary length, not a valid topic, must be the last enum */ _length }; @@ -65,6 +67,11 @@ namespace websocket { public: message block_confirmed (std::shared_ptr block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype); + message vote_received (std::shared_ptr vote_a); + + private: + /** Set the common fields for messages: timestamp and topic. */ + void set_common_fields (message & message_a); }; /** Filtering options for subscriptions */ @@ -89,7 +96,6 @@ namespace websocket * * "all_local_accounts" (bool) - will only not filter blocks that have local wallet accounts as source/destination * * "accounts" (array of std::strings) - will only not filter blocks that have these accounts as source/destination * @remark Both options can be given, the resulting filter is an intersection of individual filters - * @remark No error is shown if any given account is invalid, the entry is simply ignored * @warn Legacy blocks are always filtered (not broadcasted) */ class confirmation_options final : public options @@ -111,6 +117,29 @@ namespace websocket std::unordered_set accounts; }; + /** + * Filtering options for vote subscriptions + * Possible filtering options: + * * "representatives" (array of std::strings) - will only broadcast votes from these representatives + */ + class vote_options final : public options + { + public: + vote_options (); + vote_options (boost::property_tree::ptree const & options_a, nano::node & node_a); + + /** + * Checks if a message should be filtered for given vote received options. + * @param message_a the message to be checked + * @return false if the message should be broadcasted, true if it should be filtered + */ + bool should_filter (message const & message_a) const override; + + private: + nano::node & node; + std::unordered_set representatives; + }; + /** A websocket session managing its own lifetime */ class session final : public std::enable_shared_from_this {