Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket/extended confirmation support #2066

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ TEST (websocket, confirmation_options)
std::thread client_thread ([&client_thread_finished]() {
// Subscribe initially with a specific invalid account
auto response = websocket_test_call ("::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"accounts": ["xrb_invalid"]}})json", true, true, 1s);
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "accounts": ["xrb_invalid"]}})json", true, true, 1s);

ASSERT_FALSE (response);
client_thread_finished = true;
Expand Down Expand Up @@ -334,7 +334,7 @@ TEST (websocket, confirmation_options)
std::thread client_thread_2 ([&client_thread_2_finished]() {
// Re-subscribe with options for all local wallet accounts
auto response = websocket_test_call ("::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"all_local_accounts": "true"}})json", true, true);
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true"}})json", true, true);

ASSERT_TRUE (response);
boost::property_tree::ptree event;
Expand Down Expand Up @@ -375,7 +375,7 @@ TEST (websocket, confirmation_options)
std::atomic<bool> client_thread_3_finished{ false };
std::thread client_thread_3 ([&client_thread_3_finished]() {
auto response = websocket_test_call ("::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"all_local_accounts": "true"}})json", true, true, 1s);
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"confirmation_type": "active_quorum", "all_local_accounts": "true"}})json", true, true, 1s);

ASSERT_FALSE (response);
client_thread_3_finished = true;
Expand Down
1 change: 0 additions & 1 deletion nano/node/active_transactions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ enum class election_status_type : uint8_t
active_confirmed_quorum = 1,
active_confirmation_height = 2,
inactive_confirmation_height = 3,
rpc_confirmation_height = 4,
stopped = 5
};

Expand Down
2 changes: 1 addition & 1 deletion nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ void nano::json_handler::block_confirm ()
else
{
// Add record in confirmation history for confirmed block
nano::election_status status{ block_l, 0, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values<std::chrono::milliseconds>::zero (), nano::election_status_type::rpc_confirmation_height };
nano::election_status status{ block_l, 0, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values<std::chrono::milliseconds>::zero (), nano::election_status_type::active_confirmation_height };
{
std::lock_guard<std::mutex> lock (node.active.mutex);
node.active.confirmed.push_back (status);
Expand Down
15 changes: 12 additions & 3 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ startup_time (std::chrono::steady_clock::now ())
if (websocket_server)
{
observers.blocks.add ([this](nano::election_status const & status_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) {
assert (status_a.type != nano::election_status_type::ongoing);

if (this->websocket_server->any_subscriber (nano::websocket::topic::confirmation))
{
auto block_a (status_a.winner);
Expand All @@ -339,12 +341,19 @@ startup_time (std::chrono::steady_clock::now ())
subtype = "receive";
}
}
nano::websocket::message_builder builder;
auto msg (builder.block_confirmed (block_a, account_a, amount_a, subtype));
this->websocket_server->broadcast (msg);

this->websocket_server->broadcast_confirmation (block_a, account_a, amount_a, subtype, status_a.type);
}
}
});

observers.active_stopped.add ([this](nano::block_hash const & hash_a) {
if (this->websocket_server->any_subscriber (nano::websocket::topic::stopped_election))
{
nano::websocket::message_builder builder;
this->websocket_server->broadcast (builder.stopped_election (hash_a));
}
});
}
observers.endpoint.add ([this](std::shared_ptr<nano::transport::channel> channel_a) {
if (channel_a->get_type () == nano::transport::transport_type::udp)
Expand Down
164 changes: 152 additions & 12 deletions nano/node/websocket.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,58 @@
#include <nano/node/node.hpp>
#include <nano/node/websocket.hpp>

#include <boost/algorithm/string.hpp>
#include <boost/property_tree/json_parser.hpp>

#include <algorithm>
#include <chrono>

nano::websocket::confirmation_options::confirmation_options (boost::property_tree::ptree const & options_a, nano::node & node_a) :
node (node_a),
all_local_accounts (options_a.get<bool> ("all_local_accounts", false))
node (node_a)
{
// Non-account filtering options
include_block = options_a.get<bool> ("include_block", true);

confirmation_types = 0;
auto type_l (options_a.get<std::string> ("confirmation_type", "all"));

if (boost::iequals (type_l, "active"))
{
confirmation_types = type_all_active;
}
else if (boost::iequals (type_l, "active_quorum"))
{
confirmation_types = type_active_quorum;
}
else if (boost::iequals (type_l, "active_confirmation_height"))
{
confirmation_types = type_active_confirmation_height;
}
else if (boost::iequals (type_l, "inactive"))
{
confirmation_types = type_inactive;
}
else
{
confirmation_types = type_all;
}

// Account filtering options
auto all_local_accounts_l (options_a.get_optional<bool> ("all_local_accounts"));
if (all_local_accounts_l.is_initialized ())
{
all_local_accounts = all_local_accounts_l.get ();
has_account_filtering_options = true;

if (!include_block)
{
node.logger.always_log ("Websocket: Filtering option \"all_local_accounts\" requires that \"include_block\" is set to true to be effective");
}
}
auto accounts_l (options_a.get_child_optional ("accounts"));
if (accounts_l)
{
has_account_filtering_options = true;
for (auto account_l : *accounts_l)
{
nano::account result_l (0);
Expand All @@ -26,6 +66,11 @@ all_local_accounts (options_a.get<bool> ("all_local_accounts", false))
node.logger.always_log ("Websocket: invalid account provided for filtering blocks: ", account_l.second.data ());
}
}

if (!include_block)
{
node.logger.always_log ("Websocket: Filtering option \"accounts\" requires that \"include_block\" is set to true to be effective");
}
}
// Warn the user if the options resulted in an empty filter
if (!all_local_accounts && accounts.empty ())
Expand All @@ -36,7 +81,23 @@ all_local_accounts (options_a.get<bool> ("all_local_accounts", false))

bool nano::websocket::confirmation_options::should_filter (nano::websocket::message const & message_a) const
{
bool should_filter_l (true);
bool should_filter_conf_type_l (true);

auto type_text_l (message_a.contents.get<std::string> ("confirmation_type"));
if (type_text_l == "active_quorum" && confirmation_types & type_active_quorum)
{
should_filter_conf_type_l = false;
}
else if (type_text_l == "active_confirmation_height" && confirmation_types & type_active_confirmation_height)
{
should_filter_conf_type_l = false;
}
else if (type_text_l == "inactive" && confirmation_types & type_inactive)
{
should_filter_conf_type_l = false;
}

bool should_filter_account (has_account_filtering_options);
auto destination_opt_l (message_a.contents.get_optional<std::string> ("message.block.link_as_account"));
if (destination_opt_l)
{
Expand All @@ -50,15 +111,16 @@ bool nano::websocket::confirmation_options::should_filter (nano::websocket::mess
assert (decode_source_ok_l && decode_destination_ok_l);
if (node.wallets.exists (transaction_l, source_l) || node.wallets.exists (transaction_l, destination_l))
{
should_filter_l = false;
should_filter_account = false;
}
}
if (accounts.find (source_text_l) != accounts.end () || accounts.find (destination_opt_l.get ()) != accounts.end ())
{
should_filter_l = false;
should_filter_account = false;
}
}
return should_filter_l;

return should_filter_conf_type_l || should_filter_account;
}

nano::websocket::vote_options::vote_options (boost::property_tree::ptree const & options_a, nano::node & node_a) :
Expand Down Expand Up @@ -242,6 +304,10 @@ nano::websocket::topic to_topic (std::string topic_a)
{
topic = nano::websocket::topic::confirmation;
}
else if (topic_a == "stopped_election")
{
topic = nano::websocket::topic::stopped_election;
}
else if (topic_a == "vote")
{
topic = nano::websocket::topic::vote;
Expand All @@ -260,6 +326,10 @@ std::string from_topic (nano::websocket::topic topic_a)
{
topic = "confirmation";
}
else if (topic_a == nano::websocket::topic::stopped_election)
{
topic = "stopped_election";
}
else if (topic_a == nano::websocket::topic::vote)
{
topic = "vote";
Expand Down Expand Up @@ -416,6 +486,43 @@ void nano::websocket::listener::on_accept (boost::system::error_code ec)
}
}

void nano::websocket::listener::broadcast_confirmation (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype, nano::election_status_type election_status_type_a)
{
nano::websocket::message_builder builder;

std::lock_guard<std::mutex> lk (sessions_mutex);
boost::optional<nano::websocket::message> msg_with_block;
boost::optional<nano::websocket::message> msg_without_block;
for (auto & weak_session : sessions)
{
auto session_ptr (weak_session.lock ());
if (session_ptr)
{
auto subscription (session_ptr->subscriptions.find (nano::websocket::topic::confirmation));
if (subscription != session_ptr->subscriptions.end ())
{
auto conf_options (dynamic_cast<nano::websocket::confirmation_options *> (subscription->second.get ()));
auto include_block (conf_options == nullptr ? true : conf_options->get_include_block ());

if (include_block && !msg_with_block)
{
msg_with_block = builder.block_confirmed (block_a, account_a, amount_a, subtype, include_block, election_status_type_a);
}
else if (!include_block && !msg_without_block)
{
msg_without_block = builder.block_confirmed (block_a, account_a, amount_a, subtype, include_block, election_status_type_a);
}
else
{
assert (false);
}

session_ptr->write (include_block ? msg_with_block.get () : msg_without_block.get ());
}
}
}
}

void nano::websocket::listener::broadcast (nano::websocket::message message_a)
{
std::lock_guard<std::mutex> lk (sessions_mutex);
Expand All @@ -441,7 +548,18 @@ void nano::websocket::listener::decrease_subscriber_count (nano::websocket::topi
count -= 1;
}

nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype)
nano::websocket::message nano::websocket::message_builder::stopped_election (nano::block_hash const & hash_a)
{
nano::websocket::message message_l (nano::websocket::topic::confirmation);
set_common_fields (message_l);

boost::property_tree::ptree message_node_l;
message_node_l.add ("hash", hash_a.to_string ());

return message_l;
}

nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype, bool include_block_a, nano::election_status_type election_status_type_a)
{
nano::websocket::message message_l (nano::websocket::topic::confirmation);
set_common_fields (message_l);
Expand All @@ -451,13 +569,35 @@ nano::websocket::message nano::websocket::message_builder::block_confirmed (std:
message_node_l.add ("account", account_a.to_account ());
message_node_l.add ("amount", amount_a.to_string_dec ());
message_node_l.add ("hash", block_a->hash ().to_string ());
boost::property_tree::ptree block_node_l;
block_a->serialize_json (block_node_l);
if (!subtype.empty ())

std::string confirmation_type = "unknown";
switch (election_status_type_a)
{
block_node_l.add ("subtype", subtype);
case nano::election_status_type::active_confirmed_quorum:
confirmation_type = "active_quorum";
break;
case nano::election_status_type::active_confirmation_height:
confirmation_type = "active_confirmation_height";
break;
case nano::election_status_type::inactive_confirmation_height:
confirmation_type = "inactive";
break;
default:
break;
};
message_l.contents.add ("confirmation_type", confirmation_type);

if (include_block_a)
{
boost::property_tree::ptree block_node_l;
block_a->serialize_json (block_node_l);
if (!subtype.empty ())
{
block_node_l.add ("subtype", subtype);
}
message_node_l.add_child ("block", block_node_l);
}
message_node_l.add_child ("block", block_node_l);

message_l.contents.add_child ("message", message_node_l);

return message_l;
Expand Down
Loading