Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets - check for subscriptions before proceeding #1906

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ TEST (websocket, confirmation)

// Start websocket test-client in a separate thread
std::atomic<bool> confirmation_event_received{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));
std::thread client_thread ([&system, &confirmation_event_received]() {
// This will expect two results: the acknowledgement of the subscription
// and then the block confirmation message
Expand All @@ -91,6 +92,7 @@ TEST (websocket, confirmation)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));

// Quick-confirm a block
nano::keypair key;
Expand Down
39 changes: 21 additions & 18 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1124,31 +1124,34 @@ startup_time (std::chrono::steady_clock::now ())
if (websocket_server)
{
observers.blocks.add ([this](std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) {
if (this->block_arrival.recent (block_a->hash ()))
if (this->websocket_server->any_subscribers (nano::websocket::topic::confirmation))
{
std::string subtype;
if (is_state_send_a)
{
subtype = "send";
}
else if (block_a->type () == nano::block_type::state)
if (this->block_arrival.recent (block_a->hash ()))
{
if (block_a->link ().is_zero ())
{
subtype = "change";
}
else if (amount_a == 0 && !this->ledger.epoch_link.is_zero () && this->ledger.is_epoch_link (block_a->link ()))
std::string subtype;
if (is_state_send_a)
{
subtype = "epoch";
subtype = "send";
}
else
else if (block_a->type () == nano::block_type::state)
{
subtype = "receive";
if (block_a->link ().is_zero ())
{
subtype = "change";
}
else if (amount_a == 0 && !this->ledger.epoch_link.is_zero () && this->ledger.is_epoch_link (block_a->link ()))
{
subtype = "epoch";
}
else
{
subtype = "receive";
}
}
nano::websocket::message_builder builder;
auto msg (builder.block_confirmed (block_a, account_a, amount_a, subtype));
this->websocket_server->broadcast (msg);
}
nano::websocket::message_builder builder;
auto msg (builder.block_confirmed (block_a, account_a, amount_a, subtype));
this->websocket_server->broadcast (msg);
}
});
}
Expand Down
39 changes: 34 additions & 5 deletions nano/node/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ ws_listener (listener_a), ws (std::move (socket_a)), write_strand (ws.get_execut

nano::websocket::session::~session ()
{
{
std::unique_lock<std::mutex> lk (subscriptions_mutex);
for (auto & subscription : subscriptions)
{
ws_listener.decrease_subscription_count (subscription);
}
}

ws_listener.get_node ().logger.try_log ("websocket session ended");
}

Expand Down Expand Up @@ -168,20 +176,24 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const
auto topic_l (to_topic (message_a.get<std::string> ("topic", "")));
auto ack_l (message_a.get<bool> ("ack", false));
auto id_l (message_a.get<std::string> ("id", ""));
auto subscribe_succeeded (false);
auto action_succeeded (false);
if (action == "subscribe" && topic_l != nano::websocket::topic::invalid)
{
std::lock_guard<std::mutex> lk (subscriptions_mutex);
subscriptions.insert (topic_l);
subscribe_succeeded = true;
ws_listener.increase_subscription_count (topic_l);
action_succeeded = true;
}
else if (action == "unsubscribe" && topic_l != nano::websocket::topic::invalid)
{
std::lock_guard<std::mutex> lk (subscriptions_mutex);
subscriptions.erase (topic_l);
subscribe_succeeded = true;
if (subscriptions.erase (topic_l))
{
ws_listener.decrease_subscription_count (topic_l);
}
action_succeeded = true;
}
if (ack_l && subscribe_succeeded)
if (ack_l && action_succeeded)
{
send_ack (action, id_l);
}
Expand Down Expand Up @@ -274,6 +286,23 @@ void nano::websocket::listener::broadcast (nano::websocket::message message_a)
sessions.erase (std::remove_if (sessions.begin (), sessions.end (), [](auto & elem) { return elem.expired (); }), sessions.end ());
}

bool nano::websocket::listener::any_subscribers (nano::websocket::topic const & topic_a)
{
return topic_subscription_count[static_cast<std::size_t> (topic_a)] > 0;
}

void nano::websocket::listener::increase_subscription_count (nano::websocket::topic const & topic_a)
{
topic_subscription_count[static_cast<std::size_t> (topic_a)] += 1;
}

void nano::websocket::listener::decrease_subscription_count (nano::websocket::topic const & topic_a)
{
auto & count (topic_subscription_count[static_cast<std::size_t> (topic_a)]);
release_assert (count > 0);
count -= 1;
}

nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype)
{
nano::websocket::message msg (nano::websocket::topic::confirmation);
Expand Down
16 changes: 15 additions & 1 deletion nano/node/websocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ namespace websocket
/** Acknowledgement of prior incoming message */
ack,
/** A confirmation message */
confirmation
confirmation,
/** Auxiliary length, not a valid topic, must be the last enum */
_length
};
constexpr size_t number_topics{ static_cast<size_t> (topic::_length) - static_cast<size_t> (topic::invalid) };

/** A message queued for broadcasting */
class message final
Expand Down Expand Up @@ -143,12 +146,23 @@ namespace websocket
return node;
}

/**
* Per-topic subscribers check. Relies on all sessions correctly increasing and
* decreasing the subscriber counts themselves.
*/
bool any_subscribers (nano::websocket::topic const & topic_a);
/** Adds to subscription count of a specific topic*/
void increase_subscription_count (nano::websocket::topic const & topic_a);
/** Removes from subscription count of a specific topic*/
void decrease_subscription_count (nano::websocket::topic const & topic_a);

private:
nano::node & node;
boost::asio::ip::tcp::acceptor acceptor;
boost::asio::ip::tcp::socket socket;
std::mutex sessions_mutex;
std::vector<std::weak_ptr<session>> sessions;
std::array<std::atomic<std::size_t>, number_topics> topic_subscription_count{};
std::atomic<bool> stopped{ false };
};
}
Expand Down