Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets - subscribe to votes #1908

Merged
merged 4 commits into from
Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,170 @@ TEST (websocket, confirmation_options)

node1->stop ();
}

/** Subscribes to votes, sends a block and awaits websocket notification of a vote arrival */
TEST (websocket, vote)
{
nano::system system (24000, 1);
nano::node_init init1;
nano::node_config config;
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = 24078;

auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
nano::uint256_union wallet;
nano::random_pool::generate_block (wallet.bytes.data (), wallet.bytes.size ());
node1->wallets.create (wallet);
node1->start ();
system.nodes.push_back (node1);

// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
std::thread client_thread ([&system, &client_thread_finished]() {
// This will expect two results: the acknowledgement of the subscription
// and then the vote message
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "vote", "ack": true})json", true, true);

ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "vote");
client_thread_finished = true;
});
client_thread.detach ();

// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;

ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));

// Quick-confirm a block
nano::keypair key;
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, nano::genesis_amount - (node1->config.online_weight_minimum.number () + 1), key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);

// Wait for the websocket client to receive the vote message
system.deadline_set (5s);
while (!client_thread_finished)
{
ASSERT_NO_ERROR (system.poll ());
}

node1->stop ();
}

/** Tests vote subscription options */
TEST (websocket, vote_options)
{
nano::system system (24000, 1);
nano::node_init init1;
nano::node_config config;
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = 24078;

auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
nano::uint256_union wallet;
nano::random_pool::generate_block (wallet.bytes.data (), wallet.bytes.size ());
node1->wallets.create (wallet);
node1->start ();
system.nodes.push_back (node1);

// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
std::thread client_thread ([&system, &client_thread_finished]() {
std::ostringstream data;
data << R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": [")json"
<< nano::test_genesis_key.pub.to_account ()
<< R"json("]}})json";
auto response = websocket_test_call (system.io_ctx, "::1", "24078", data.str (), true, true);

ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "vote");
client_thread_finished = true;
});
client_thread.detach ();

// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;

ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));

// Quick-confirm a block
nano::keypair key;
auto balance = nano::genesis_amount;
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
auto send_amount = node1->config.online_weight_minimum.number () + 1;
auto confirm_block = [&]() {
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
balance -= send_amount;
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, balance, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);
};
confirm_block ();

// Wait for the websocket client to receive the vote message
system.deadline_set (5s);
while (!client_thread_finished)
{
ASSERT_NO_ERROR (system.poll ());
}

std::atomic<bool> client_thread_2_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
std::thread client_thread_2 ([&system, &client_thread_2_finished]() {
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": ["xrb_invalid"]}})json", true, true, 1s);

// No response expected given the filter
ASSERT_FALSE (response);
client_thread_2_finished = true;
});
client_thread_2.detach ();

// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;

ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));

// Confirm another block
confirm_block ();

// No response expected
system.deadline_set (5s);
while (!client_thread_2_finished)
{
ASSERT_NO_ERROR (system.poll ());
}

node1->stop ();
}
11 changes: 11 additions & 0 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,17 @@ startup_time (std::chrono::steady_clock::now ())
}
}
});
if (this->websocket_server)
{
observers.vote.add ([this](nano::transaction const & transaction, std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a) {
if (this->websocket_server->any_subscribers (nano::websocket::topic::vote))
{
nano::websocket::message_builder builder;
auto msg (builder.vote_received (vote_a));
this->websocket_server->broadcast (msg);
}
});
}
if (NANO_VERSION_PATCH == 0)
{
logger.always_log ("Node starting, version: ", NANO_MAJOR_MINOR_VERSION);
Expand Down
77 changes: 67 additions & 10 deletions nano/node/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,35 @@ bool nano::websocket::confirmation_options::should_filter (nano::websocket::mess
return should_filter_l;
}

nano::websocket::vote_options::vote_options (boost::property_tree::ptree const & options_a)
{
auto representatives_l (options_a.get_child_optional ("representatives"));
if (representatives_l)
{
for (auto representative_l : *representatives_l)
{
// Check if the representative is valid, but no error handling if it's not, simply not added to the filter
nano::account result_l (0);
if (!result_l.decode_account (representative_l.second.data ()))
{
// Do not insert the given raw data to keep old prefix support
representatives.insert (result_l.to_account ());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log if the account doesn't decode correctly? It seems like on a type-o someone could be confused as to why it's not working.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes more sense to log, thanks! Also logging if the resulting filter is empty.

Note to change documentation which warned about this.

}
}
}

bool nano::websocket::vote_options::should_filter (nano::websocket::message const & message_a) const
{
bool should_filter_l (true);
auto representative_text_l (message_a.contents.get<std::string> ("message.account"));
if (representatives.find (representative_text_l) != representatives.end ())
{
should_filter_l = false;
}
return should_filter_l;
}

nano::websocket::session::session (nano::websocket::listener & listener_a, boost::asio::ip::tcp::socket socket_a) :
ws_listener (listener_a), ws (std::move (socket_a)), write_strand (ws.get_executor ())
{
Expand Down Expand Up @@ -182,6 +211,10 @@ nano::websocket::topic to_topic (std::string topic_a)
{
topic = nano::websocket::topic::confirmation;
}
else if (topic_a == "vote")
{
topic = nano::websocket::topic::vote;
}
else if (topic_a == "ack")
{
topic = nano::websocket::topic::ack;
Expand All @@ -196,6 +229,10 @@ std::string from_topic (nano::websocket::topic topic_a)
{
topic = "confirmation";
}
else if (topic_a == nano::websocket::topic::vote)
{
topic = "vote";
}
else if (topic_a == nano::websocket::topic::ack)
{
topic = "ack";
Expand Down Expand Up @@ -233,6 +270,10 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const
{
subscriptions.insert (std::make_pair (topic_l, options_l ? std::make_unique<nano::websocket::confirmation_options> (options_l.get (), ws_listener.get_node ()) : std::make_unique<nano::websocket::options> ()));
}
else if (topic_l == nano::websocket::topic::vote)
{
subscriptions.insert (std::make_pair (topic_l, options_l ? std::make_unique<nano::websocket::vote_options> (options_l.get ()) : std::make_unique<nano::websocket::options> ()));
}
else
{
subscriptions.insert (std::make_pair (topic_l, std::make_unique<nano::websocket::options> ()));
Expand Down Expand Up @@ -361,14 +402,8 @@ void nano::websocket::listener::decrease_subscription_count (nano::websocket::to

nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype)
{
nano::websocket::message msg (nano::websocket::topic::confirmation);
using namespace std::chrono;
auto milli_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()).count ();

// Common message information
boost::property_tree::ptree & message_l = msg.contents;
message_l.add ("topic", from_topic (msg.topic));
message_l.add ("time", std::to_string (milli_since_epoch));
nano::websocket::message message_l (nano::websocket::topic::confirmation);
set_common_fields (message_l);

// Block confirmation properties
boost::property_tree::ptree message_node_l;
Expand All @@ -382,9 +417,31 @@ nano::websocket::message nano::websocket::message_builder::block_confirmed (std:
block_node_l.add ("subtype", subtype);
}
message_node_l.add_child ("block", block_node_l);
message_l.add_child ("message", message_node_l);
message_l.contents.add_child ("message", message_node_l);

return msg;
return message_l;
}

nano::websocket::message nano::websocket::message_builder::vote_received (std::shared_ptr<nano::vote> vote_a)
{
nano::websocket::message message_l (nano::websocket::topic::vote);
set_common_fields (message_l);

// Vote information
boost::property_tree::ptree vote_node_l;
vote_a->serialize_json (vote_node_l);
message_l.contents.add_child ("message", vote_node_l);
return message_l;
}

void nano::websocket::message_builder::set_common_fields (nano::websocket::message & message_a)
{
using namespace std::chrono;
auto milli_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()).count ();

// Common message information
message_a.contents.add ("topic", from_topic (message_a.topic));
message_a.contents.add ("time", std::to_string (milli_since_epoch));
}

std::string nano::websocket::message::to_string () const
Expand Down
30 changes: 30 additions & 0 deletions nano/node/websocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ namespace websocket
ack,
/** A confirmation message */
confirmation,
/** A vote message **/
vote,
/** Auxiliary length, not a valid topic, must be the last enum */
_length
};
Expand Down Expand Up @@ -65,6 +67,11 @@ namespace websocket
{
public:
message block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype);
message vote_received (std::shared_ptr<nano::vote> vote_a);

private:
/** Set the common fields for messages: timestamp and topic. */
void set_common_fields (message & message_a);
};

/** Filtering options for subscriptions */
Expand Down Expand Up @@ -111,6 +118,29 @@ namespace websocket
std::unordered_set<std::string> accounts;
};

/**
* Filtering options for vote subscriptions
* Possible filtering options:
* * "representatives" (array of std::strings) - will only broadcast votes from these representatives
* @remark No error is shown if any given representative is invalid, the entry is simply ignored
*/
class vote_options final : public options
{
public:
vote_options ();
vote_options (boost::property_tree::ptree const & options_a);

/**
* Checks if a message should be filtered for given vote received options.
* @param message_a the message to be checked
* @return false if the message should be broadcasted, true if it should be filtered
*/
bool should_filter (message const & message_a) const override;

private:
std::unordered_set<std::string> representatives;
};

/** A websocket session managing its own lifetime */
class session final : public std::enable_shared_from_this<session>
{
Expand Down