Skip to content

Commit

Permalink
GH-592 Merge GH-624 changes into split-ship-log branch.
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Jan 9, 2023
1 parent a9cea07 commit e75b2dd
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <eosio/state_history/serialization.hpp>

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/local/stream_protocol.hpp>
#include <boost/beast/websocket.hpp>

Expand All @@ -21,18 +22,18 @@ namespace bio = boost::iostreams;

template <typename Session>
struct send_queue_entry_base {
virtual ~send_queue_entry_base() {}
virtual ~send_queue_entry_base() = default;
virtual void send(Session* s) = 0;
};

template <typename Session>
struct basic_send_queue_entry : send_queue_entry_base<Session> {
std::vector<char> data;
template <typename... Args>
basic_send_queue_entry(Args&&... args)
explicit basic_send_queue_entry(Args&&... args)
: data(std::forward<Args>(args)...) {}
void send(Session* s) override {
s->socket_stream.async_write(boost::asio::buffer(data),
s->socket_stream->async_write(boost::asio::buffer(data),
[s = s->shared_from_this()](boost::system::error_code ec, size_t) {
s->callback(ec, "async_write", [s] { s->pop_entry(); });
});
Expand All @@ -47,7 +48,7 @@ class blocks_result_send_queue_entry : public send_queue_entry_base<Session> {

template <typename Next>
void async_send(Session* s, bool fin, const std::vector<char>& d, Next&& next) {
s->socket_stream.async_write_some(
s->socket_stream->async_write_some(
fin, boost::asio::buffer(d),
[s = s->shared_from_this(), next = std::forward<Next>(next)](boost::system::error_code ec, size_t) mutable {
s->callback(ec, "async_write", [s, next = std::move(next)]() mutable { next(s.get()); });
Expand All @@ -61,7 +62,7 @@ class blocks_result_send_queue_entry : public send_queue_entry_base<Session> {
data.resize(size);
bool eof = (locked_strm.buf.sgetc() == EOF);

s->socket_stream.async_write_some(
s->socket_stream->async_write_some(
fin && eof, boost::asio::buffer(data),
[this, s = s->shared_from_this(), fin, &locked_strm, eof,
next = std::forward<Next>(next)](boost::system::error_code ec, size_t) mutable {
Expand Down Expand Up @@ -145,11 +146,14 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
using entry_ptr = std::unique_ptr<send_queue_entry_base<session>>;

Plugin plugin;
ws::stream<SocketType> socket_stream;
bool sending = false;
bool sent_abi = false;
std::vector<entry_ptr> send_queue;
bool need_to_send_update = false;
std::optional<ws::stream<SocketType>> socket_stream; // ship thread only after creation
bool sending = false; // ship thread only
std::vector<entry_ptr> send_queue; // ship thread only

bool need_to_send_update = false; // main thread only
uint32_t to_send_block_num = 0; // main thread only
std::optional<std::vector<block_position>::const_iterator> position_it; // main thread only

const int32_t default_frame_size;

session(Plugin plugin, SocketType socket)
Expand All @@ -159,33 +163,40 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock

void start() {
fc_ilog(plugin->logger(), "incoming connection");
socket_stream.auto_fragment(false);
socket_stream.binary(true);
socket_stream->auto_fragment(false);
socket_stream->binary(true);
if constexpr (std::is_same_v<SocketType, tcp::socket>) {
socket_stream.next_layer().set_option(boost::asio::ip::tcp::no_delay(true));
socket_stream->next_layer().set_option(boost::asio::ip::tcp::no_delay(true));
}
socket_stream.next_layer().set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024));
socket_stream.next_layer().set_option(boost::asio::socket_base::receive_buffer_size(1024 * 1024));
socket_stream->next_layer().set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024));
socket_stream->next_layer().set_option(boost::asio::socket_base::receive_buffer_size(1024 * 1024));

socket_stream.async_accept([self = this->shared_from_this()](boost::system::error_code ec) {
socket_stream->async_accept([self = this->shared_from_this()](boost::system::error_code ec) {
self->callback(ec, "async_accept", [self] {
self->start_read();
self->send_abi();
self->socket_stream->binary(false);
self->socket_stream->async_write(
boost::asio::buffer(state_history_plugin_abi, strlen(state_history_plugin_abi)),
[self](boost::system::error_code ec, size_t) {
self->callback(ec, "async_write", [self] {
self->socket_stream->binary(true);
self->start_read();
});
});
});
});
}

void start_read() {
auto in_buffer = std::make_shared<boost::beast::flat_buffer>();
socket_stream.async_read(
socket_stream->async_read(
*in_buffer, [self = this->shared_from_this(), in_buffer](boost::system::error_code ec, size_t) {
self->callback(ec, "async_read", [self, in_buffer] {
auto d = boost::asio::buffer_cast<char const*>(boost::beast::buffers_front(in_buffer->data()));
auto s = boost::asio::buffer_size(in_buffer->data());
fc::datastream<const char*> ds(d, s);
state_request req;
fc::raw::unpack(ds, req);
self->plugin->post_task([self, req = std::move(req)]() mutable { std::visit(*self, req); });
self->plugin->post_task_main_thread_medium([self, req = std::move(req)]() mutable { std::visit(*self, req); });
self->start_read();
});
});
Expand All @@ -195,35 +206,27 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
if (sending)
return;
if (send_queue.empty()) {
plugin->post_task(
[self = this->shared_from_this()]() { self->send_update(); });
plugin->post_task_main_thread_medium([self = this->shared_from_this()]() {
self->send_update();
});
return;
}
sending = true;
socket_stream.binary(sent_abi);
sent_abi = true;
send_queue[0]->send(this);
}

void send_abi() {
boost::asio::post(this->plugin->work_strand, [self = this->shared_from_this()]() {
std::string_view str(state_history_plugin_abi);
self->send_queue.push_back(std::make_unique<basic_send_queue_entry<session>>(str.begin(), str.end()));
self->send();
});
send_queue[0]->send(this);
}

template <typename T>
void send(T obj) {
boost::asio::post(this->plugin->work_strand, [self = this->shared_from_this(), obj = std::move(obj)]() mutable {
boost::asio::post(this->plugin->get_ship_executor(), [self = this->shared_from_this(), obj = std::move(obj)]() mutable {
self->send_queue.push_back(
std::make_unique<basic_send_queue_entry<session>>(fc::raw::pack(state_result{std::move(obj)})));
self->send();
});
}

void send(get_blocks_result_v0&& obj) {
boost::asio::post(this->plugin->work_strand, [self = this->shared_from_this(), obj = std::move(obj)]() mutable {
boost::asio::post(this->plugin->get_ship_executor(), [self = this->shared_from_this(), obj = std::move(obj)]() mutable {
self->send_queue.push_back(std::make_unique<blocks_result_send_queue_entry<session>>(std::move(obj)));
self->send();
});
Expand Down Expand Up @@ -271,8 +274,10 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
send(std::move(result));
}

// called from main thread
void operator()(get_blocks_request_v0& req) {
fc_ilog(plugin->logger(), "received get_blocks_request_v0 = ${req}", ("req", req));
to_send_block_num = req.start_block_num;
for (auto& cp : req.have_positions) {
if (req.start_block_num <= cp.block_num)
continue;
Expand All @@ -281,19 +286,26 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
req.start_block_num = std::min(req.start_block_num, cp.block_num);

if (!id) {
to_send_block_num = std::min(to_send_block_num, cp.block_num);
fc_dlog(plugin->logger(), "block ${block_num} is not available", ("block_num", cp.block_num));
} else if (*id != cp.block_id) {
to_send_block_num = std::min(to_send_block_num, cp.block_num);
fc_dlog(plugin->logger(),
"the id for block ${block_num} in block request have_positions does not match the existing",
("block_num", cp.block_num));
}
}
req.have_positions.clear();
fc_dlog(plugin->logger(), " get_blocks_request_v0 start_block_num set to ${num}", ("num", req.start_block_num));
current_request = req;
fc_dlog(plugin->logger(), " get_blocks_request_v0 start_block_num set to ${num}", ("num", to_send_block_num));

if( !req.have_positions.empty() ) {
position_it = req.have_positions.begin();
}

current_request = std::move(req);
send_update(true);
}

// called from main thread
void operator()(get_blocks_ack_request_v0& req) {
fc_ilog(plugin->logger(), "received get_blocks_ack_request_v0 = ${req}", ("req", req));
if (!current_request) {
Expand All @@ -304,39 +316,56 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
send_update();
}

// called from main thread
void send_update(get_blocks_result_v0 result, const chain::block_state_ptr& block_state) {
need_to_send_update = true;
if (!send_queue.empty() || !current_request || !current_request->max_messages_in_flight)
if (!current_request || !current_request->max_messages_in_flight)
return;

result.last_irreversible = plugin->get_last_irreversible();
uint32_t current =
current_request->irreversible_only ? result.last_irreversible.block_num : result.head.block_num;

if (current_request->start_block_num <= current &&
current_request->start_block_num < current_request->end_block_num) {
auto block_id = plugin->get_block_id(current_request->start_block_num);

if (block_id) {
result.this_block = block_position{current_request->start_block_num, *block_id};
auto prev_block_id = plugin->get_block_id(current_request->start_block_num - 1);
if (prev_block_id)
result.prev_block = block_position{current_request->start_block_num - 1, *prev_block_id};
if (current_request->fetch_block)
plugin->get_block(current_request->start_block_num, block_state, result.block);
if (current_request->fetch_traces && plugin->get_trace_log())
result.traces.emplace();
if (current_request->fetch_deltas && plugin->get_chain_state_log())
result.deltas.emplace();
current_request->irreversible_only ? result.last_irreversible.block_num : result.head.block_num;

if (to_send_block_num > current || to_send_block_num >= current_request->end_block_num) {
fc_dlog( plugin->logger(), "Not sending, to_send_block_num: ${s}, current: ${c} current_request.end_block_num: ${b}",
("s", to_send_block_num)("c", current)("b", current_request->end_block_num) );
return;
}

auto block_id = plugin->get_block_id(to_send_block_num);

if (block_id && position_it && (*position_it)->block_num == to_send_block_num) {
// This branch happens when the head block of nodeos is behind the head block of connecting client.
// In addition, the client told us the corresponding block id for block_num we are going to send.
// We can send the block when the block_id is different.
auto& itr = *position_it;
auto block_id_seen_by_client = itr->block_id;
++itr;
if (itr == current_request->have_positions.end())
position_it.reset();

if(block_id_seen_by_client == *block_id) {
++to_send_block_num;
return;
}
++current_request->start_block_num;
}

auto& block_num = current_request->start_block_num;
auto timestamp = plugin->get_block_timestamp(block_num, block_state);
if (block_id) {
result.this_block = block_position{to_send_block_num, *block_id};
auto prev_block_id = plugin->get_block_id(to_send_block_num - 1);
if (prev_block_id)
result.prev_block = block_position{to_send_block_num - 1, *prev_block_id};
if (current_request->fetch_block)
plugin->get_block(to_send_block_num, block_state, result.block);
if (current_request->fetch_traces && plugin->get_trace_log())
result.traces.emplace();
if (current_request->fetch_deltas && plugin->get_chain_state_log())
result.deltas.emplace();
}
++to_send_block_num;

// during syncing if block is older than 5 min, log every 1000th block
bool fresh_block = timestamp && fc::time_point::now() - *timestamp < fc::minutes(5);
bool fresh_block = fc::time_point::now() - plugin->get_head_block_timestamp() < fc::minutes(5);
if (fresh_block || (result.this_block && result.this_block->block_num % 1000 == 0)) {
fc_ilog(plugin->logger(),
"pushing result "
Expand All @@ -348,31 +377,44 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock

send(std::move(result));
--current_request->max_messages_in_flight;
need_to_send_update = current_request->start_block_num <= current &&
current_request->start_block_num < current_request->end_block_num;
need_to_send_update = to_send_block_num <= current &&
to_send_block_num < current_request->end_block_num;
}

void send_update(const chain::block_state_ptr& block_state) {
need_to_send_update = true;
if (!send_queue.empty() || !current_request || !current_request->max_messages_in_flight)
// called from main thread
void send_update(const chain::block_state_ptr& block_state) override {
if (!current_request || !current_request->max_messages_in_flight)
return;

get_blocks_result_v0 result;
result.head = {block_state->block_num, block_state->id};
send_update(std::move(result), block_state);
}

// called from main thread
void send_update(bool changed = false) {
if (changed)
need_to_send_update = true;
if (!send_queue.empty() || !need_to_send_update || !current_request || !current_request->max_messages_in_flight)
return;
get_blocks_result_v0 result;
result.head = plugin->get_block_head();
send_update(std::move(result), {});
if (changed || need_to_send_update) {
get_blocks_result_v0 result;
result.head = plugin->get_block_head();
send_update(std::move(result), {});
}
}

template <typename F>
void catch_and_close(F f) {
void callback(boost::system::error_code ec, const char* what, F f) {
if (plugin->stopping)
return;

if (ec) {
if (ec == boost::asio::error::eof || ec == boost::beast::websocket::error::closed) {
fc_dlog(plugin->logger(), "${w}: ${m}", ("w", what)("m", ec.message()));
} else {
fc_elog(plugin->logger(), "${w}: ${m}", ("w", what)("m", ec.message()));
}
close_i();
return;
}

try {
f();
} catch (const fc::exception& e) {
Expand All @@ -387,38 +429,23 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
}
}

template <typename F>
void callback(boost::system::error_code ec, const char* what, F f) {
plugin->post_task([=]() {
if (plugin->stopping)
return;
if (ec) {
return on_fail(ec, what);
}
catch_and_close(f);
void close() override {
boost::asio::post(plugin->get_ship_executor(), [self = this->shared_from_this()]() {
self->close_i();
});
}

void on_fail(boost::system::error_code ec, const char* what) {
try {
if (ec == boost::asio::error::eof || ec == boost::beast::websocket::error::closed) {
fc_dlog(plugin->logger(), "${w}: ${m}", ("w", what)("m", ec.message()));
} else {
fc_elog(plugin->logger(), "${w}: ${m}", ("w", what)("m", ec.message()));
}
close();
} catch (...) {
fc_elog(plugin->logger(), "uncaught exception on close");
}
}

void close() {
// called from ship thread
void close_i() {
boost::system::error_code ec;
socket_stream.next_layer().close(ec);
socket_stream->next_layer().close(ec);
if (ec) {
fc_elog(plugin->logger(), "close: ${m}", ("m", ec.message()));
}
plugin->remove_session(this->shared_from_this());
socket_stream.reset();
plugin->post_task_main_thread_high([self = this->shared_from_this(), plugin=plugin]() {
plugin->session_set.erase(self);
});
}
};
} // namespace eosio
Loading

0 comments on commit e75b2dd

Please sign in to comment.