Skip to content

Commit

Permalink
Websockets - check for subscriptions before proceeding (#1906)
Browse files Browse the repository at this point in the history
* Check if any subscriptions exist before creating a message for websocket broadcasting

* Mutex lock

* Change to any_subscribers

* Forgot test renaming

* Change map to a fixed size array, remove mutex by changing to atomic
  • Loading branch information
Guilherme Lawless authored and cryptocode committed Apr 15, 2019
1 parent e811971 commit ac89b64
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 24 deletions.
2 changes: 2 additions & 0 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ TEST (websocket, confirmation)

// Start websocket test-client in a separate thread
std::atomic<bool> confirmation_event_received{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));
std::thread client_thread ([&system, &confirmation_event_received]() {
// This will expect two results: the acknowledgement of the subscription
// and then the block confirmation message
Expand All @@ -91,6 +92,7 @@ TEST (websocket, confirmation)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));

// Quick-confirm a block
nano::keypair key;
Expand Down
39 changes: 21 additions & 18 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1124,31 +1124,34 @@ startup_time (std::chrono::steady_clock::now ())
if (websocket_server)
{
observers.blocks.add ([this](std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) {
if (this->block_arrival.recent (block_a->hash ()))
if (this->websocket_server->any_subscribers (nano::websocket::topic::confirmation))
{
std::string subtype;
if (is_state_send_a)
{
subtype = "send";
}
else if (block_a->type () == nano::block_type::state)
if (this->block_arrival.recent (block_a->hash ()))
{
if (block_a->link ().is_zero ())
{
subtype = "change";
}
else if (amount_a == 0 && !this->ledger.epoch_link.is_zero () && this->ledger.is_epoch_link (block_a->link ()))
std::string subtype;
if (is_state_send_a)
{
subtype = "epoch";
subtype = "send";
}
else
else if (block_a->type () == nano::block_type::state)
{
subtype = "receive";
if (block_a->link ().is_zero ())
{
subtype = "change";
}
else if (amount_a == 0 && !this->ledger.epoch_link.is_zero () && this->ledger.is_epoch_link (block_a->link ()))
{
subtype = "epoch";
}
else
{
subtype = "receive";
}
}
nano::websocket::message_builder builder;
auto msg (builder.block_confirmed (block_a, account_a, amount_a, subtype));
this->websocket_server->broadcast (msg);
}
nano::websocket::message_builder builder;
auto msg (builder.block_confirmed (block_a, account_a, amount_a, subtype));
this->websocket_server->broadcast (msg);
}
});
}
Expand Down
39 changes: 34 additions & 5 deletions nano/node/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ ws_listener (listener_a), ws (std::move (socket_a)), write_strand (ws.get_execut

nano::websocket::session::~session ()
{
{
std::unique_lock<std::mutex> lk (subscriptions_mutex);
for (auto & subscription : subscriptions)
{
ws_listener.decrease_subscription_count (subscription);
}
}

ws_listener.get_node ().logger.try_log ("websocket session ended");
}

Expand Down Expand Up @@ -168,20 +176,24 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const
auto topic_l (to_topic (message_a.get<std::string> ("topic", "")));
auto ack_l (message_a.get<bool> ("ack", false));
auto id_l (message_a.get<std::string> ("id", ""));
auto subscribe_succeeded (false);
auto action_succeeded (false);
if (action == "subscribe" && topic_l != nano::websocket::topic::invalid)
{
std::lock_guard<std::mutex> lk (subscriptions_mutex);
subscriptions.insert (topic_l);
subscribe_succeeded = true;
ws_listener.increase_subscription_count (topic_l);
action_succeeded = true;
}
else if (action == "unsubscribe" && topic_l != nano::websocket::topic::invalid)
{
std::lock_guard<std::mutex> lk (subscriptions_mutex);
subscriptions.erase (topic_l);
subscribe_succeeded = true;
if (subscriptions.erase (topic_l))
{
ws_listener.decrease_subscription_count (topic_l);
}
action_succeeded = true;
}
if (ack_l && subscribe_succeeded)
if (ack_l && action_succeeded)
{
send_ack (action, id_l);
}
Expand Down Expand Up @@ -274,6 +286,23 @@ void nano::websocket::listener::broadcast (nano::websocket::message message_a)
sessions.erase (std::remove_if (sessions.begin (), sessions.end (), [](auto & elem) { return elem.expired (); }), sessions.end ());
}

bool nano::websocket::listener::any_subscribers (nano::websocket::topic const & topic_a)
{
return topic_subscription_count[static_cast<std::size_t> (topic_a)] > 0;
}

void nano::websocket::listener::increase_subscription_count (nano::websocket::topic const & topic_a)
{
topic_subscription_count[static_cast<std::size_t> (topic_a)] += 1;
}

void nano::websocket::listener::decrease_subscription_count (nano::websocket::topic const & topic_a)
{
auto & count (topic_subscription_count[static_cast<std::size_t> (topic_a)]);
release_assert (count > 0);
count -= 1;
}

nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype)
{
nano::websocket::message msg (nano::websocket::topic::confirmation);
Expand Down
16 changes: 15 additions & 1 deletion nano/node/websocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ namespace websocket
/** Acknowledgement of prior incoming message */
ack,
/** A confirmation message */
confirmation
confirmation,
/** Auxiliary length, not a valid topic, must be the last enum */
_length
};
constexpr size_t number_topics{ static_cast<size_t> (topic::_length) - static_cast<size_t> (topic::invalid) };

/** A message queued for broadcasting */
class message final
Expand Down Expand Up @@ -143,12 +146,23 @@ namespace websocket
return node;
}

/**
* Per-topic subscribers check. Relies on all sessions correctly increasing and
* decreasing the subscriber counts themselves.
*/
bool any_subscribers (nano::websocket::topic const & topic_a);
/** Adds to subscription count of a specific topic*/
void increase_subscription_count (nano::websocket::topic const & topic_a);
/** Removes from subscription count of a specific topic*/
void decrease_subscription_count (nano::websocket::topic const & topic_a);

private:
nano::node & node;
boost::asio::ip::tcp::acceptor acceptor;
boost::asio::ip::tcp::socket socket;
std::mutex sessions_mutex;
std::vector<std::weak_ptr<session>> sessions;
std::array<std::atomic<std::size_t>, number_topics> topic_subscription_count{};
std::atomic<bool> stopped{ false };
};
}
Expand Down

0 comments on commit ac89b64

Please sign in to comment.