From ac89b6490ce772bf2493492e1c0a23c5f2cf4549 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Mon, 15 Apr 2019 08:37:00 +0100 Subject: [PATCH] Websockets - check for subscriptions before proceeding (#1906) * Check if any subscriptions exist before creating a message for websocket broadcasting * Mutex lock * Change to any_subscribers * Forgot test renaming * Change map to a fixed size array, remove mutex by changing to atomic --- nano/core_test/websocket.cpp | 2 ++ nano/node/node.cpp | 39 +++++++++++++++++++----------------- nano/node/websocket.cpp | 39 +++++++++++++++++++++++++++++++----- nano/node/websocket.hpp | 16 ++++++++++++++- 4 files changed, 72 insertions(+), 24 deletions(-) diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index 7dc2c30f81..e6e2b96509 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -70,6 +70,7 @@ TEST (websocket, confirmation) // Start websocket test-client in a separate thread std::atomic confirmation_event_received{ false }; + ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation)); std::thread client_thread ([&system, &confirmation_event_received]() { // This will expect two results: the acknowledgement of the subscription // and then the block confirmation message @@ -91,6 +92,7 @@ TEST (websocket, confirmation) { ASSERT_NO_ERROR (system.poll ()); } + ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation)); // Quick-confirm a block nano::keypair key; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 62579a1c67..585f734b82 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1124,31 +1124,34 @@ startup_time (std::chrono::steady_clock::now ()) if (websocket_server) { observers.blocks.add ([this](std::shared_ptr block_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) { - if (this->block_arrival.recent (block_a->hash ())) + if (this->websocket_server->any_subscribers (nano::websocket::topic::confirmation)) { - std::string subtype; - if (is_state_send_a) - { - subtype = "send"; - } - else if (block_a->type () == nano::block_type::state) + if (this->block_arrival.recent (block_a->hash ())) { - if (block_a->link ().is_zero ()) - { - subtype = "change"; - } - else if (amount_a == 0 && !this->ledger.epoch_link.is_zero () && this->ledger.is_epoch_link (block_a->link ())) + std::string subtype; + if (is_state_send_a) { - subtype = "epoch"; + subtype = "send"; } - else + else if (block_a->type () == nano::block_type::state) { - subtype = "receive"; + if (block_a->link ().is_zero ()) + { + subtype = "change"; + } + else if (amount_a == 0 && !this->ledger.epoch_link.is_zero () && this->ledger.is_epoch_link (block_a->link ())) + { + subtype = "epoch"; + } + else + { + subtype = "receive"; + } } + nano::websocket::message_builder builder; + auto msg (builder.block_confirmed (block_a, account_a, amount_a, subtype)); + this->websocket_server->broadcast (msg); } - nano::websocket::message_builder builder; - auto msg (builder.block_confirmed (block_a, account_a, amount_a, subtype)); - this->websocket_server->broadcast (msg); } }); } diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index 9c5c54148b..482b71d074 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -13,6 +13,14 @@ ws_listener (listener_a), ws (std::move (socket_a)), write_strand (ws.get_execut nano::websocket::session::~session () { + { + std::unique_lock lk (subscriptions_mutex); + for (auto & subscription : subscriptions) + { + ws_listener.decrease_subscription_count (subscription); + } + } + ws_listener.get_node ().logger.try_log ("websocket session ended"); } @@ -168,20 +176,24 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const auto topic_l (to_topic (message_a.get ("topic", ""))); auto ack_l (message_a.get ("ack", false)); auto id_l (message_a.get ("id", "")); - auto subscribe_succeeded (false); + auto action_succeeded (false); if (action == "subscribe" && topic_l != nano::websocket::topic::invalid) { std::lock_guard lk (subscriptions_mutex); subscriptions.insert (topic_l); - subscribe_succeeded = true; + ws_listener.increase_subscription_count (topic_l); + action_succeeded = true; } else if (action == "unsubscribe" && topic_l != nano::websocket::topic::invalid) { std::lock_guard lk (subscriptions_mutex); - subscriptions.erase (topic_l); - subscribe_succeeded = true; + if (subscriptions.erase (topic_l)) + { + ws_listener.decrease_subscription_count (topic_l); + } + action_succeeded = true; } - if (ack_l && subscribe_succeeded) + if (ack_l && action_succeeded) { send_ack (action, id_l); } @@ -274,6 +286,23 @@ void nano::websocket::listener::broadcast (nano::websocket::message message_a) sessions.erase (std::remove_if (sessions.begin (), sessions.end (), [](auto & elem) { return elem.expired (); }), sessions.end ()); } +bool nano::websocket::listener::any_subscribers (nano::websocket::topic const & topic_a) +{ + return topic_subscription_count[static_cast (topic_a)] > 0; +} + +void nano::websocket::listener::increase_subscription_count (nano::websocket::topic const & topic_a) +{ + topic_subscription_count[static_cast (topic_a)] += 1; +} + +void nano::websocket::listener::decrease_subscription_count (nano::websocket::topic const & topic_a) +{ + auto & count (topic_subscription_count[static_cast (topic_a)]); + release_assert (count > 0); + count -= 1; +} + nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype) { nano::websocket::message msg (nano::websocket::topic::confirmation); diff --git a/nano/node/websocket.hpp b/nano/node/websocket.hpp index 842e672498..b523ccb776 100644 --- a/nano/node/websocket.hpp +++ b/nano/node/websocket.hpp @@ -35,8 +35,11 @@ namespace websocket /** Acknowledgement of prior incoming message */ ack, /** A confirmation message */ - confirmation + confirmation, + /** Auxiliary length, not a valid topic, must be the last enum */ + _length }; + constexpr size_t number_topics{ static_cast (topic::_length) - static_cast (topic::invalid) }; /** A message queued for broadcasting */ class message final @@ -143,12 +146,23 @@ namespace websocket return node; } + /** + * Per-topic subscribers check. Relies on all sessions correctly increasing and + * decreasing the subscriber counts themselves. + */ + bool any_subscribers (nano::websocket::topic const & topic_a); + /** Adds to subscription count of a specific topic*/ + void increase_subscription_count (nano::websocket::topic const & topic_a); + /** Removes from subscription count of a specific topic*/ + void decrease_subscription_count (nano::websocket::topic const & topic_a); + private: nano::node & node; boost::asio::ip::tcp::acceptor acceptor; boost::asio::ip::tcp::socket socket; std::mutex sessions_mutex; std::vector> sessions; + std::array, number_topics> topic_subscription_count{}; std::atomic stopped{ false }; }; }