Skip to content

Commit

Permalink
Websocket/extended confirmation support (#2066)
Browse files Browse the repository at this point in the history
* WS: Add support for confirmation without block contents

* Confirmation type filtering

* Remove debugging code

* Better comments

* Use if/else
  • Loading branch information
cryptocode authored and Russel Waters committed Jun 11, 2019
1 parent 8af51b9 commit 0efcaa3
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 26 deletions.
6 changes: 3 additions & 3 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ TEST (websocket, confirmation_options)
std::thread client_thread ([&client_thread_finished]() {
// Subscribe initially with a specific invalid account
auto response = websocket_test_call ("::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"accounts": ["xrb_invalid"]}})json", true, true, 1s);
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "accounts": ["xrb_invalid"]}})json", true, true, 1s);

ASSERT_FALSE (response);
client_thread_finished = true;
Expand Down Expand Up @@ -334,7 +334,7 @@ TEST (websocket, confirmation_options)
std::thread client_thread_2 ([&client_thread_2_finished]() {
// Re-subscribe with options for all local wallet accounts
auto response = websocket_test_call ("::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"all_local_accounts": "true"}})json", true, true);
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true"}})json", true, true);

ASSERT_TRUE (response);
boost::property_tree::ptree event;
Expand Down Expand Up @@ -375,7 +375,7 @@ TEST (websocket, confirmation_options)
std::atomic<bool> client_thread_3_finished{ false };
std::thread client_thread_3 ([&client_thread_3_finished]() {
auto response = websocket_test_call ("::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"all_local_accounts": "true"}})json", true, true, 1s);
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true"}})json", true, true, 1s);

ASSERT_FALSE (response);
client_thread_3_finished = true;
Expand Down
1 change: 0 additions & 1 deletion nano/node/active_transactions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ enum class election_status_type : uint8_t
active_confirmed_quorum = 1,
active_confirmation_height = 2,
inactive_confirmation_height = 3,
rpc_confirmation_height = 4,
stopped = 5
};

Expand Down
2 changes: 1 addition & 1 deletion nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ void nano::json_handler::block_confirm ()
else
{
// Add record in confirmation history for confirmed block
nano::election_status status{ block_l, 0, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values<std::chrono::milliseconds>::zero (), nano::election_status_type::rpc_confirmation_height };
nano::election_status status{ block_l, 0, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values<std::chrono::milliseconds>::zero (), nano::election_status_type::active_confirmation_height };
{
std::lock_guard<std::mutex> lock (node.active.mutex);
node.active.confirmed.push_back (status);
Expand Down
15 changes: 12 additions & 3 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ startup_time (std::chrono::steady_clock::now ())
if (websocket_server)
{
observers.blocks.add ([this](nano::election_status const & status_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) {
assert (status_a.type != nano::election_status_type::ongoing);

if (this->websocket_server->any_subscriber (nano::websocket::topic::confirmation))
{
auto block_a (status_a.winner);
Expand All @@ -339,12 +341,19 @@ startup_time (std::chrono::steady_clock::now ())
subtype = "receive";
}
}
nano::websocket::message_builder builder;
auto msg (builder.block_confirmed (block_a, account_a, amount_a, subtype));
this->websocket_server->broadcast (msg);

this->websocket_server->broadcast_confirmation (block_a, account_a, amount_a, subtype, status_a.type);
}
}
});

observers.active_stopped.add ([this](nano::block_hash const & hash_a) {
if (this->websocket_server->any_subscriber (nano::websocket::topic::stopped_election))
{
nano::websocket::message_builder builder;
this->websocket_server->broadcast (builder.stopped_election (hash_a));
}
});
}
observers.endpoint.add ([this](std::shared_ptr<nano::transport::channel> channel_a) {
if (channel_a->get_type () == nano::transport::transport_type::udp)
Expand Down
164 changes: 152 additions & 12 deletions nano/node/websocket.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,58 @@
#include <nano/node/node.hpp>
#include <nano/node/websocket.hpp>

#include <boost/algorithm/string.hpp>
#include <boost/property_tree/json_parser.hpp>

#include <algorithm>
#include <chrono>

nano::websocket::confirmation_options::confirmation_options (boost::property_tree::ptree const & options_a, nano::node & node_a) :
node (node_a),
all_local_accounts (options_a.get<bool> ("all_local_accounts", false))
node (node_a)
{
// Non-account filtering options
include_block = options_a.get<bool> ("include_block", true);

confirmation_types = 0;
auto type_l (options_a.get<std::string> ("confirmation_type", "all"));

if (boost::iequals (type_l, "active"))
{
confirmation_types = type_all_active;
}
else if (boost::iequals (type_l, "active_quorum"))
{
confirmation_types = type_active_quorum;
}
else if (boost::iequals (type_l, "active_confirmation_height"))
{
confirmation_types = type_active_confirmation_height;
}
else if (boost::iequals (type_l, "inactive"))
{
confirmation_types = type_inactive;
}
else
{
confirmation_types = type_all;
}

// Account filtering options
auto all_local_accounts_l (options_a.get_optional<bool> ("all_local_accounts"));
if (all_local_accounts_l.is_initialized ())
{
all_local_accounts = all_local_accounts_l.get ();
has_account_filtering_options = true;

if (!include_block)
{
node.logger.always_log ("Websocket: Filtering option \"all_local_accounts\" requires that \"include_block\" is set to true to be effective");
}
}
auto accounts_l (options_a.get_child_optional ("accounts"));
if (accounts_l)
{
has_account_filtering_options = true;
for (auto account_l : *accounts_l)
{
nano::account result_l (0);
Expand All @@ -26,6 +66,11 @@ all_local_accounts (options_a.get<bool> ("all_local_accounts", false))
node.logger.always_log ("Websocket: invalid account provided for filtering blocks: ", account_l.second.data ());
}
}

if (!include_block)
{
node.logger.always_log ("Websocket: Filtering option \"accounts\" requires that \"include_block\" is set to true to be effective");
}
}
// Warn the user if the options resulted in an empty filter
if (!all_local_accounts && accounts.empty ())
Expand All @@ -36,7 +81,23 @@ all_local_accounts (options_a.get<bool> ("all_local_accounts", false))

bool nano::websocket::confirmation_options::should_filter (nano::websocket::message const & message_a) const
{
bool should_filter_l (true);
bool should_filter_conf_type_l (true);

auto type_text_l (message_a.contents.get<std::string> ("confirmation_type"));
if (type_text_l == "active_quorum" && confirmation_types & type_active_quorum)
{
should_filter_conf_type_l = false;
}
else if (type_text_l == "active_confirmation_height" && confirmation_types & type_active_confirmation_height)
{
should_filter_conf_type_l = false;
}
else if (type_text_l == "inactive" && confirmation_types & type_inactive)
{
should_filter_conf_type_l = false;
}

bool should_filter_account (has_account_filtering_options);
auto destination_opt_l (message_a.contents.get_optional<std::string> ("message.block.link_as_account"));
if (destination_opt_l)
{
Expand All @@ -50,15 +111,16 @@ bool nano::websocket::confirmation_options::should_filter (nano::websocket::mess
assert (decode_source_ok_l && decode_destination_ok_l);
if (node.wallets.exists (transaction_l, source_l) || node.wallets.exists (transaction_l, destination_l))
{
should_filter_l = false;
should_filter_account = false;
}
}
if (accounts.find (source_text_l) != accounts.end () || accounts.find (destination_opt_l.get ()) != accounts.end ())
{
should_filter_l = false;
should_filter_account = false;
}
}
return should_filter_l;

return should_filter_conf_type_l || should_filter_account;
}

nano::websocket::vote_options::vote_options (boost::property_tree::ptree const & options_a, nano::node & node_a) :
Expand Down Expand Up @@ -242,6 +304,10 @@ nano::websocket::topic to_topic (std::string topic_a)
{
topic = nano::websocket::topic::confirmation;
}
else if (topic_a == "stopped_election")
{
topic = nano::websocket::topic::stopped_election;
}
else if (topic_a == "vote")
{
topic = nano::websocket::topic::vote;
Expand All @@ -260,6 +326,10 @@ std::string from_topic (nano::websocket::topic topic_a)
{
topic = "confirmation";
}
else if (topic_a == nano::websocket::topic::stopped_election)
{
topic = "stopped_election";
}
else if (topic_a == nano::websocket::topic::vote)
{
topic = "vote";
Expand Down Expand Up @@ -416,6 +486,43 @@ void nano::websocket::listener::on_accept (boost::system::error_code ec)
}
}

void nano::websocket::listener::broadcast_confirmation (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype, nano::election_status_type election_status_type_a)
{
nano::websocket::message_builder builder;

std::lock_guard<std::mutex> lk (sessions_mutex);
boost::optional<nano::websocket::message> msg_with_block;
boost::optional<nano::websocket::message> msg_without_block;
for (auto & weak_session : sessions)
{
auto session_ptr (weak_session.lock ());
if (session_ptr)
{
auto subscription (session_ptr->subscriptions.find (nano::websocket::topic::confirmation));
if (subscription != session_ptr->subscriptions.end ())
{
auto conf_options (dynamic_cast<nano::websocket::confirmation_options *> (subscription->second.get ()));
auto include_block (conf_options == nullptr ? true : conf_options->get_include_block ());

if (include_block && !msg_with_block)
{
msg_with_block = builder.block_confirmed (block_a, account_a, amount_a, subtype, include_block, election_status_type_a);
}
else if (!include_block && !msg_without_block)
{
msg_without_block = builder.block_confirmed (block_a, account_a, amount_a, subtype, include_block, election_status_type_a);
}
else
{
assert (false);
}

session_ptr->write (include_block ? msg_with_block.get () : msg_without_block.get ());
}
}
}
}

void nano::websocket::listener::broadcast (nano::websocket::message message_a)
{
std::lock_guard<std::mutex> lk (sessions_mutex);
Expand All @@ -441,7 +548,18 @@ void nano::websocket::listener::decrease_subscriber_count (nano::websocket::topi
count -= 1;
}

nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype)
nano::websocket::message nano::websocket::message_builder::stopped_election (nano::block_hash const & hash_a)
{
nano::websocket::message message_l (nano::websocket::topic::confirmation);
set_common_fields (message_l);

boost::property_tree::ptree message_node_l;
message_node_l.add ("hash", hash_a.to_string ());

return message_l;
}

nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype, bool include_block_a, nano::election_status_type election_status_type_a)
{
nano::websocket::message message_l (nano::websocket::topic::confirmation);
set_common_fields (message_l);
Expand All @@ -451,13 +569,35 @@ nano::websocket::message nano::websocket::message_builder::block_confirmed (std:
message_node_l.add ("account", account_a.to_account ());
message_node_l.add ("amount", amount_a.to_string_dec ());
message_node_l.add ("hash", block_a->hash ().to_string ());
boost::property_tree::ptree block_node_l;
block_a->serialize_json (block_node_l);
if (!subtype.empty ())

std::string confirmation_type = "unknown";
switch (election_status_type_a)
{
block_node_l.add ("subtype", subtype);
case nano::election_status_type::active_confirmed_quorum:
confirmation_type = "active_quorum";
break;
case nano::election_status_type::active_confirmation_height:
confirmation_type = "active_confirmation_height";
break;
case nano::election_status_type::inactive_confirmation_height:
confirmation_type = "inactive";
break;
default:
break;
};
message_l.contents.add ("confirmation_type", confirmation_type);

if (include_block_a)
{
boost::property_tree::ptree block_node_l;
block_a->serialize_json (block_node_l);
if (!subtype.empty ())
{
block_node_l.add ("subtype", subtype);
}
message_node_l.add_child ("block", block_node_l);
}
message_node_l.add_child ("block", block_node_l);

message_l.contents.add_child ("message", message_node_l);

return message_l;
Expand Down
Loading

0 comments on commit 0efcaa3

Please sign in to comment.