Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets - subscribe to votes #1908

Merged
merged 4 commits into from
Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,170 @@ TEST (websocket, confirmation_options)

node1->stop ();
}

/** Subscribes to votes, sends a block and awaits websocket notification of a vote arrival */
TEST (websocket, vote)
{
nano::system system (24000, 1);
nano::node_init init1;
nano::node_config config;
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = 24078;

auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
nano::uint256_union wallet;
nano::random_pool::generate_block (wallet.bytes.data (), wallet.bytes.size ());
node1->wallets.create (wallet);
node1->start ();
system.nodes.push_back (node1);

// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
std::thread client_thread ([&system, &client_thread_finished]() {
// This will expect two results: the acknowledgement of the subscription
// and then the vote message
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "vote", "ack": true})json", true, true);

ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "vote");
client_thread_finished = true;
});
client_thread.detach ();

// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;

ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));

// Quick-confirm a block
nano::keypair key;
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, nano::genesis_amount - (node1->config.online_weight_minimum.number () + 1), key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);

// Wait for the websocket client to receive the vote message
system.deadline_set (5s);
while (!client_thread_finished)
{
ASSERT_NO_ERROR (system.poll ());
}

node1->stop ();
}

/** Tests vote subscription options */
TEST (websocket, vote_options)
{
nano::system system (24000, 1);
nano::node_init init1;
nano::node_config config;
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = 24078;

auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
nano::uint256_union wallet;
nano::random_pool::generate_block (wallet.bytes.data (), wallet.bytes.size ());
node1->wallets.create (wallet);
node1->start ();
system.nodes.push_back (node1);

// Start websocket test-client in a separate thread
ack_ready = false;
std::atomic<bool> client_thread_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
std::thread client_thread ([&system, &client_thread_finished]() {
std::ostringstream data;
data << R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": [")json"
<< nano::test_genesis_key.pub.to_account ()
<< R"json("]}})json";
auto response = websocket_test_call (system.io_ctx, "::1", "24078", data.str (), true, true);

ASSERT_TRUE (response);
boost::property_tree::ptree event;
std::stringstream stream;
stream << response;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "vote");
client_thread_finished = true;
});
client_thread.detach ();

// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;

ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));

// Quick-confirm a block
nano::keypair key;
auto balance = nano::genesis_amount;
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
auto send_amount = node1->config.online_weight_minimum.number () + 1;
auto confirm_block = [&]() {
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
balance -= send_amount;
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, balance, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);
};
confirm_block ();

// Wait for the websocket client to receive the vote message
system.deadline_set (5s);
while (!client_thread_finished)
{
ASSERT_NO_ERROR (system.poll ());
}

std::atomic<bool> client_thread_2_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
std::thread client_thread_2 ([&system, &client_thread_2_finished]() {
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": ["xrb_invalid"]}})json", true, true, 1s);

// No response expected given the filter
ASSERT_FALSE (response);
client_thread_2_finished = true;
});
client_thread_2.detach ();

// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ack_ready = false;

ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));

// Confirm another block
confirm_block ();

// No response expected
system.deadline_set (5s);
while (!client_thread_2_finished)
{
ASSERT_NO_ERROR (system.poll ());
}

node1->stop ();
}
11 changes: 11 additions & 0 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,17 @@ startup_time (std::chrono::steady_clock::now ())
}
}
});
if (this->websocket_server)
{
observers.vote.add ([this](nano::transaction const & transaction, std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a) {
if (this->websocket_server->any_subscribers (nano::websocket::topic::vote))
{
nano::websocket::message_builder builder;
auto msg (builder.vote_received (vote_a));
this->websocket_server->broadcast (msg);
}
});
}
if (NANO_VERSION_PATCH == 0)
{
logger.always_log ("Node starting, version: ", NANO_MAJOR_MINOR_VERSION);
Expand Down
Loading