Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets - filtering options #1907

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2a33ca7
Add subscription filters for accounts in wallets or a custom list of …
Apr 14, 2019
3b41599
Small syntax
Apr 14, 2019
0d31c69
Another small leftover
Apr 14, 2019
ef20645
Change to a set of strings, add an assert
Apr 14, 2019
c9d46be
Merge remote-tracking branch 'origin/master' into websockets/filterin…
Apr 15, 2019
a668332
Inverse logic for decode_account assert
Apr 15, 2019
19d7d1d
Remove virtual semantics from the final classes, add documentation
Apr 15, 2019
9c23c30
Remove nano::node passing through functions, add as a member in optio…
Apr 15, 2019
5fd7499
Check destination account as well
Apr 15, 2019
63e23a7
Change to should_filter
Apr 15, 2019
00b7a8a
Add test for confirmation options
Apr 15, 2019
2b759cc
Check for subscribers
Apr 16, 2019
eba699d
Tests more consistent
Apr 16, 2019
354dd4f
Test unsubscribe in the first test instead
Apr 16, 2019
13ebb49
should_filter takes the whole message instead of contents, make messa…
Apr 16, 2019
3c9773c
Check for both destination possiblities, assert to check if none found
Apr 16, 2019
765c053
Fix all tests
Apr 16, 2019
bde6ef3
Sanitize local variables and add should_filter_l return variable
Apr 17, 2019
6f3a7de
Always use async_read with a cv wait instead of sleep
Apr 17, 2019
368d671
No early return
Apr 17, 2019
32c63b8
Cancel timer safely
Apr 17, 2019
18d01ad
Post review adjustments
Apr 17, 2019
0ac6d0f
Fix repeated tests failing due to ack_ready not being reset to false
Apr 17, 2019
e979b7a
release_assert
Apr 17, 2019
6fa90d5
Legacy blocks are always filtered, fix asserts
Apr 17, 2019
670a386
Merge branch 'master' into websockets/filtering-accounts
Apr 17, 2019
f51bbc5
Add tests for legacy blocks - without filtering options they are broa…
Apr 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 168 additions & 18 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#include <boost/asio.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <gtest/gtest.h>
#include <iostream>
Expand All @@ -20,10 +22,17 @@ using namespace std::chrono_literals;

namespace
{
/** This variable must be set to false before setting up every thread that makes a websocket test call (and needs ack), to be safe */
std::atomic<bool> ack_ready{ false };
/** A simple blocking websocket client for testing */
std::string websocket_test_call (boost::asio::io_context & ioc, std::string host, std::string port, std::string message_a, bool await_ack)

/** An optionally blocking websocket client for testing */
boost::optional<std::string> websocket_test_call (boost::asio::io_context & ioc, std::string host, std::string port, std::string message_a, bool await_ack, bool await_response, seconds response_deadline = 5s)
{
if (await_ack)
{
ack_ready = false;
}

boost::asio::ip::tcp::resolver resolver{ ioc };
boost::beast::websocket::stream<boost::asio::ip::tcp::socket> ws{ ioc };

Expand All @@ -41,13 +50,46 @@ std::string websocket_test_call (boost::asio::io_context & ioc, std::string host
ack_ready = true;
}

boost::beast::flat_buffer buffer;
ws.read (buffer);
std::ostringstream res;
res << boost::beast::buffers (buffer.data ());
boost::optional<std::string> ret;

ws.close (boost::beast::websocket::close_code::normal);
return res.str ();
if (await_response)
{
boost::asio::deadline_timer timer (ioc);
std::atomic<bool> timed_out{ false }, got_response{ false };
std::mutex cond_mutex;
std::condition_variable cond_var;
timer.expires_from_now (boost::posix_time::seconds (response_deadline.count ()));
timer.async_wait ([&ws, &cond_mutex, &cond_var, &timed_out](boost::system::error_code const & ec) {
if (!ec)
{
std::unique_lock<std::mutex> lock (cond_mutex);
ws.next_layer ().cancel ();
timed_out = true;
cond_var.notify_one ();
}
});

boost::beast::flat_buffer buffer;
ws.async_read (buffer, [&ret, &buffer, &cond_mutex, &cond_var, &got_response](boost::beast::error_code const & ec, std::size_t const n) {
if (!ec)
{
std::unique_lock<std::mutex> lock (cond_mutex);
std::ostringstream res;
res << boost::beast::buffers (buffer.data ());
ret = res.str ();
got_response = true;
cond_var.notify_one ();
}
});
std::unique_lock<std::mutex> lock (cond_mutex);
cond_var.wait (lock, [&] { return timed_out || got_response; });
if (got_response)
{
timer.cancel ();
ws.close (boost::beast::websocket::close_code::normal);
}
}
return ret;
}
}

Expand All @@ -69,20 +111,26 @@ TEST (websocket, confirmation)
system.nodes.push_back (node1);

// Start websocket test-client in a separate thread
std::atomic<bool> confirmation_event_received{ false };
ack_ready = false;
std::atomic<bool> unsubscribe_ack_received{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));
std::thread client_thread ([&system, &confirmation_event_received]() {
std::thread client_thread ([&system, &unsubscribe_ack_received]() {
// This will expect two results: the acknowledgement of the subscription
// and then the block confirmation message
std::string response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true);
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, true);

ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response;
stream << response.get ();
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "confirmation");
confirmation_event_received = true;

// Unsubscribe action, expects an acknowledge but no response follows
websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, false);
unsubscribe_ack_received = true;
});
client_thread.detach ();

Expand All @@ -92,19 +140,121 @@ TEST (websocket, confirmation)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;

ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));

// Quick-confirm a block
nano::keypair key;
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
system.wallet (1)->insert_adhoc (key.prv);
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
auto send (std::make_shared<nano::send_block> (previous, key.pub, node1->config.online_weight_minimum.number () + 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, node1->config.online_weight_minimum.number () + 1, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);

// Wait for the websocket client to receive the confirmation message
// Wait for the unsubscribe action to be acknowledged
system.deadline_set (5s);
while (!unsubscribe_ack_received)
{
ASSERT_NO_ERROR (system.poll ());
}

node1->stop ();
}

/** Tests the filtering options of block confirmations */
TEST (websocket, confirmation_options)
{
nano::system system (24000, 1);
nano::node_init init1;
nano::node_config config;
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = 24078;

auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
nano::uint256_union wallet;
nano::random_pool::generate_block (wallet.bytes.data (), wallet.bytes.size ());
node1->wallets.create (wallet);
node1->start ();
system.nodes.push_back (node1);

system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
nano::keypair key;

// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));
std::thread client_thread ([&system, &client_thread_finished]() {
// Subscribe initially with a specific invalid account
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"accounts": ["xrb_invalid"]}})json", true, true, 1s);

ASSERT_FALSE (response);
client_thread_finished = true;
});
client_thread.detach ();

// Wait for subscribe acknowledgement
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;

// Confirms a random block for an in-wallet account
{
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, nano::genesis_amount - (node1->config.online_weight_minimum.number () + 1), key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);
}

// Wait for client thread to finish, no confirmation message should be received with given filter
system.deadline_set (5s);
while (!client_thread_finished)
{
ASSERT_NO_ERROR (system.poll ());
}

ack_ready = false;
std::atomic<bool> client_thread_2_finished{ false };
std::thread client_thread_2 ([&system, &client_thread_2_finished]() {
// Re-subscribe with options for all local wallet accounts
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"all_local_accounts": "true"}})json", true, true);

ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response.get ();
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "confirmation");

client_thread_2_finished = true;
});
client_thread_2.detach ();

// Wait for the subscribe action to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;

ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));

// Quick-confirm another block
{
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, nano::genesis_amount - 2 * (node1->config.online_weight_minimum.number () + 1), key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);
}

// Wait for confirmation message
system.deadline_set (5s);
while (!confirmation_event_received)
while (!client_thread_2_finished)
{
ASSERT_NO_ERROR (system.poll ());
}
Expand Down
64 changes: 60 additions & 4 deletions nano/node/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,53 @@
#include <nano/node/node.hpp>
#include <nano/node/websocket.hpp>

nano::websocket::confirmation_options::confirmation_options (boost::property_tree::ptree const & options_a, nano::node & node_a) :
all_local_accounts (options_a.get<bool> ("all_local_accounts", false)),
node (node_a)
{
auto accounts_l (options_a.get_child_optional ("accounts"));
if (accounts_l)
{
for (auto account_l : *accounts_l)
{
// Check if the account is valid, but no error handling if it's not, simply not added to the filter
nano::account result_l (0);
if (!result_l.decode_account (account_l.second.data ()))
{
// Do not insert the given raw data to keep old prefix support
accounts.insert (result_l.to_account ());
}
}
}
}

bool nano::websocket::confirmation_options::should_filter (nano::websocket::message const & message_a) const
{
bool should_filter_l (true);
auto destination_opt_l (message_a.contents.get_optional<std::string> ("message.block.link_as_account"));
if (destination_opt_l)
{
auto source_text_l (message_a.contents.get<std::string> ("message.account"));
if (all_local_accounts)
{
auto transaction_l (node.wallets.tx_begin_read ());
nano::account source_l (0), destination_l (0);
auto decode_source_ok_l (!source_l.decode_account (source_text_l));
auto decode_destination_ok_l (!destination_l.decode_account (destination_opt_l.get ()));
assert (decode_source_ok_l && decode_destination_ok_l);
if (node.wallets.exists (transaction_l, source_l) || node.wallets.exists (transaction_l, destination_l))
{
should_filter_l = false;
}
}
if (accounts.find (source_text_l) != accounts.end () || accounts.find (destination_opt_l.get ()) != accounts.end ())
{
should_filter_l = false;
}
}
return should_filter_l;
}

nano::websocket::session::session (nano::websocket::listener & listener_a, boost::asio::ip::tcp::socket socket_a) :
ws_listener (listener_a), ws (std::move (socket_a)), write_strand (ws.get_executor ())
{
Expand All @@ -17,7 +64,7 @@ nano::websocket::session::~session ()
std::unique_lock<std::mutex> lk (subscriptions_mutex);
for (auto & subscription : subscriptions)
{
ws_listener.decrease_subscription_count (subscription);
ws_listener.decrease_subscription_count (subscription.first);
}
}

Expand Down Expand Up @@ -54,7 +101,8 @@ void nano::websocket::session::write (nano::websocket::message message_a)
{
// clang-format off
std::unique_lock<std::mutex> lk (subscriptions_mutex);
if (message_a.topic == nano::websocket::topic::ack || subscriptions.find (message_a.topic) != subscriptions.end ())
auto subscription (subscriptions.find (message_a.topic));
if (message_a.topic == nano::websocket::topic::ack || (subscription != subscriptions.end () && !subscription->second->should_filter (message_a)))
{
lk.unlock ();
boost::asio::post (write_strand,
Expand Down Expand Up @@ -179,8 +227,16 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const
auto action_succeeded (false);
if (action == "subscribe" && topic_l != nano::websocket::topic::invalid)
{
auto options_l (message_a.get_child_optional ("options"));
std::lock_guard<std::mutex> lk (subscriptions_mutex);
subscriptions.insert (topic_l);
if (topic_l == nano::websocket::topic::confirmation)
{
subscriptions.insert (std::make_pair (topic_l, options_l ? std::make_unique<nano::websocket::confirmation_options> (options_l.get (), ws_listener.get_node ()) : std::make_unique<nano::websocket::options> ()));
}
else
{
subscriptions.insert (std::make_pair (topic_l, std::make_unique<nano::websocket::options> ()));
}
ws_listener.increase_subscription_count (topic_l);
action_succeeded = true;
}
Expand Down Expand Up @@ -331,7 +387,7 @@ nano::websocket::message nano::websocket::message_builder::block_confirmed (std:
return msg;
}

std::string nano::websocket::message::to_string ()
std::string nano::websocket::message::to_string () const
{
std::ostringstream ostream;
boost::property_tree::write_json (ostream, contents);
Expand Down
Loading