Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket support #1840

Merged
merged 2 commits into from
Mar 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nano/core_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ add_executable (core_test
versioning.cpp
wallet.cpp
wallets.cpp
websocket.cpp
work_pool.cpp)

target_compile_definitions(core_test
Expand Down
109 changes: 109 additions & 0 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <chrono>
#include <cstdlib>
#include <gtest/gtest.h>
#include <iostream>
#include <memory>
#include <nano/core_test/testutil.hpp>
#include <nano/node/testing.hpp>
#include <nano/node/websocket.hpp>
#include <sstream>
#include <string>
#include <vector>

using namespace std::chrono_literals;

namespace
{
std::atomic<bool> ack_ready{ false };
/** A simple blocking websocket client for testing */
std::string websocket_test_call (boost::asio::io_context & ioc, std::string host, std::string port, std::string message_a, bool await_ack)
{
boost::asio::ip::tcp::resolver resolver{ ioc };
boost::beast::websocket::stream<boost::asio::ip::tcp::socket> ws{ ioc };

auto const results = resolver.resolve (host, port);
boost::asio::connect (ws.next_layer (), results.begin (), results.end ());

ws.handshake (host, "/");
ws.text (true);
ws.write (boost::asio::buffer (message_a));

if (await_ack)
{
boost::beast::flat_buffer buffer;
ws.read (buffer);
ack_ready = true;
}

boost::beast::flat_buffer buffer;
ws.read (buffer);
std::ostringstream res;
res << boost::beast::buffers (buffer.data ());

ws.close (boost::beast::websocket::close_code::normal);
return res.str ();
}
}

/** Subscribes to block confirmations, confirms a block and then awaits websocket notification */
TEST (websocket, confirmation)
{
nano::system system (24000, 1);
nano::node_init init1;
nano::node_config config;
nano::node_flags node_flags;
config.websocket_config.enabled = true;
config.websocket_config.port = 24078;

auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags));
nano::uint256_union wallet;
nano::random_pool::generate_block (wallet.bytes.data (), wallet.bytes.size ());
node1->wallets.create (wallet);
node1->start ();
system.nodes.push_back (node1);

// Start websocket test-client in a separate thread
std::atomic<bool> confirmation_event_received{ false };
std::thread client_thread ([&system, &confirmation_event_received]() {
// This will expect two results: the acknowledgement of the subscription
// and then the block confirmation message
std::string response = websocket_test_call (system.io_ctx, "::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true);

boost::property_tree::ptree event;
std::stringstream stream;
stream << response;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "confirmation");
confirmation_event_received = true;
});
client_thread.detach ();

// Wait for the subscription to be acknowledged
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}

// Quick-confirm a block
nano::keypair key;
nano::block_hash previous (node1->latest (nano::test_genesis_key.pub));
system.wallet (1)->insert_adhoc (key.prv);
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
auto send (std::make_shared<nano::send_block> (previous, key.pub, node1->config.online_weight_minimum.number () + 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (previous)));
node1->process_active (send);

// Wait for the websocket client to receive the confirmation message
system.deadline_set (5s);
while (!confirmation_event_received)
{
ASSERT_NO_ERROR (system.poll ());
}
node1->stop ();
}
4 changes: 4 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ add_library (node
voting.cpp
wallet.hpp
wallet.cpp
websocket.hpp
websocket.cpp
websocketconfig.hpp
websocketconfig.cpp
working.hpp
xorshift.hpp)

Expand Down
42 changes: 42 additions & 0 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,13 @@ stats (config.stat_config),
vote_uniquer (block_uniquer),
startup_time (std::chrono::steady_clock::now ())
{
if (config.websocket_config.enabled)
{
auto endpoint_l (nano::tcp_endpoint (config.websocket_config.address, config.websocket_config.port));
websocket_server = std::make_shared<nano::websocket::listener> (*this, endpoint_l);
this->websocket_server->run ();
}

wallets.observer = [this](bool active) {
observers.wallet.notify (active);
};
Expand Down Expand Up @@ -1118,6 +1125,37 @@ startup_time (std::chrono::steady_clock::now ())
}
});
}
if (websocket_server)
{
observers.blocks.add ([this](std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) {
if (this->block_arrival.recent (block_a->hash ()))
{
std::string subtype;
if (is_state_send_a)
{
subtype = "send";
}
else if (block_a->type () == nano::block_type::state)
{
if (block_a->link ().is_zero ())
{
subtype = "change";
}
else if (amount_a == 0 && !this->ledger.epoch_link.is_zero () && this->ledger.is_epoch_link (block_a->link ()))
{
subtype = "epoch";
}
else
{
subtype = "receive";
}
}
nano::websocket::message_builder builder;
auto msg (builder.block_confirmed (block_a, account_a, amount_a, subtype));
this->websocket_server->broadcast (msg);
}
});
}
observers.endpoint.add ([this](std::shared_ptr<nano::transport::channel> channel_a) {
this->network.send_keepalive (*channel_a);
});
Expand Down Expand Up @@ -1544,6 +1582,10 @@ void nano::node::stop ()
vote_processor.stop ();
active.stop ();
network.stop ();
if (websocket_server)
{
websocket_server->stop ();
}
bootstrap_initiator.stop ();
bootstrap.stop ();
port_mapping.stop ();
Expand Down
2 changes: 2 additions & 0 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <nano/node/stats.hpp>
#include <nano/node/transport/udp.hpp>
#include <nano/node/wallet.hpp>
#include <nano/node/websocket.hpp>
#include <nano/secure/ledger.hpp>

#include <atomic>
Expand Down Expand Up @@ -478,6 +479,7 @@ class node : public std::enable_shared_from_this<nano::node>
boost::asio::io_context & io_ctx;
nano::network_params network_params;
nano::node_config config;
std::shared_ptr<nano::websocket::listener> websocket_server;
nano::node_flags flags;
nano::alarm & alarm;
nano::work_pool & work;
Expand Down
13 changes: 11 additions & 2 deletions nano/node/nodeconfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ nano::error nano::node_config::serialize_json (nano::jsonconfig & json) const
json.put ("unchecked_cutoff_time", unchecked_cutoff_time.count ());
json.put ("tcp_client_timeout", tcp_client_timeout.count ());
json.put ("tcp_server_timeout", tcp_server_timeout.count ());

nano::jsonconfig websocket_l;
websocket_config.serialize_json (websocket_l);
json.put_child ("websocket", websocket_l);
nano::jsonconfig ipc_l;
ipc_config.serialize_json (ipc_l);
json.put_child ("ipc", ipc_l);
Expand Down Expand Up @@ -257,6 +259,9 @@ bool nano::node_config::upgrade_json (unsigned version_a, nano::jsonconfig & jso
}
case 16:
{
nano::jsonconfig websocket_l;
websocket_config.serialize_json (websocket_l);
json.put_child ("websocket", websocket_l);
json.put ("tcp_client_timeout", tcp_client_timeout.count ());
json.put ("tcp_server_timeout", tcp_server_timeout.count ());
upgraded = true;
Expand Down Expand Up @@ -372,7 +377,11 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco
{
ipc_config.deserialize_json (ipc_config_l.get ());
}

auto websocket_config_l (json.get_optional_child ("websocket"));
if (websocket_config_l)
{
websocket_config.deserialize_json (websocket_config_l.get ());
}
json.get<uint16_t> ("peering_port", peering_port);
json.get<unsigned> ("bootstrap_fraction_numerator", bootstrap_fraction_numerator);
json.get<unsigned> ("online_weight_quorum", online_weight_quorum);
Expand Down
2 changes: 2 additions & 0 deletions nano/node/nodeconfig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <nano/node/ipcconfig.hpp>
#include <nano/node/logging.hpp>
#include <nano/node/stats.hpp>
#include <nano/node/websocketconfig.hpp>
#include <vector>

namespace nano
Expand Down Expand Up @@ -43,6 +44,7 @@ class node_config
bool enable_voting;
unsigned bootstrap_connections;
unsigned bootstrap_connections_max;
nano::websocket::config websocket_config;
std::string callback_address;
uint16_t callback_port;
std::string callback_target;
Expand Down
Loading