Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

capi: add state snapshot #2683

Merged
merged 10 commits into from
Feb 8, 2025
Merged
4 changes: 2 additions & 2 deletions cmd/capi/execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,9 @@ int execute_blocks(SilkwormHandle handle, ExecuteBlocksSettings settings, const
}
});
for (auto& chain_snapshot : all_chain_snapshots) {
const int add_snapshot_status_code{silkworm_add_snapshot(handle, &chain_snapshot)};
const int add_snapshot_status_code{silkworm_add_blocks_snapshot(handle, &chain_snapshot)};
if (add_snapshot_status_code != SILKWORM_OK) {
SILK_ERROR << "silkworm_add_snapshot failed [code=" << std::to_string(add_snapshot_status_code) << "]";
SILK_ERROR << "silkworm_add_blocks_snapshot failed [code=" << std::to_string(add_snapshot_status_code) << "]";
return add_snapshot_status_code;
}
}
Expand Down
2 changes: 1 addition & 1 deletion docs/code_style.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Bad:

BlockNum expected_blocknum{previous_progress + 1};
ChainConfig& config{kMainnetConfig};
ExecutionProcessor processor(block, *rule_set, buffer, *chain_config);
ExecutionProcessor processor{block, *rule_set, buffer, *chain_config};

Copy initialization can use either style:

Expand Down
176 changes: 2 additions & 174 deletions silkworm/capi/silkworm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,20 @@
#include <silkworm/core/execution/call_tracer.hpp>
#include <silkworm/core/execution/execution.hpp>
#include <silkworm/db/access_layer.hpp>
#include <silkworm/db/blocks/bodies/body_index.hpp>
#include <silkworm/db/blocks/headers/header_index.hpp>
#include <silkworm/db/blocks/schema_config.hpp>
#include <silkworm/db/blocks/transactions/txn_index.hpp>
#include <silkworm/db/blocks/transactions/txn_to_block_index.hpp>
#include <silkworm/db/buffer.hpp>
#include <silkworm/db/datastore/snapshots/index_builder.hpp>
#include <silkworm/db/datastore/snapshots/segment/segment_reader.hpp>
#include <silkworm/db/kv/grpc/client/remote_client.hpp>
#include <silkworm/db/stages.hpp>
#include <silkworm/db/state/schema_config.hpp>
#include <silkworm/execution/domain_state.hpp>
#include <silkworm/execution/remote_state.hpp>
#include <silkworm/execution/state_factory.hpp>
#include <silkworm/infra/common/bounded_buffer.hpp>
#include <silkworm/infra/common/directories.hpp>
#include <silkworm/infra/common/stopwatch.hpp>
#include <silkworm/infra/concurrency/context_pool_settings.hpp>
#include <silkworm/infra/concurrency/signal_handler.hpp>
#include <silkworm/infra/concurrency/spawn.hpp>
#include <silkworm/infra/concurrency/thread_pool.hpp>
#include <silkworm/infra/grpc/client/client_context_pool.hpp>
#include <silkworm/node/execution/block/block_executor.hpp>
#include <silkworm/node/stagedsync/execution_engine.hpp>
Expand All @@ -67,12 +60,6 @@

using namespace std::chrono_literals;
using namespace silkworm;
// using namespace silkworm::db;
// using namespace silkworm::rpc;

static MemoryMappedRegion make_region(const SilkwormMemoryMappedFile& mmf) {
return {mmf.memory_address, mmf.memory_length};
}

static constexpr size_t kMaxBlockBufferSize{100};
static constexpr size_t kMaxPrefetchedBlocks{10'240};
Expand Down Expand Up @@ -241,11 +228,11 @@ SILKWORM_EXPORT int silkworm_init(SilkwormHandle* handle, const struct SilkwormS
auto blocks_repository = db::blocks::make_blocks_repository(
snapshots_dir_path,
/* open = */ false,
/* index_salt = */ 0); // TODO: pass from erigon
settings->blocks_repo_index_salt);
auto state_repository = db::state::make_state_repository(
snapshots_dir_path,
/* open = */ false,
/* index_salt = */ 0); // TODO: pass from erigon
settings->state_repo_index_salt);

// NOLINTNEXTLINE(bugprone-unhandled-exception-at-new)
*handle = new SilkwormInstance{
Expand All @@ -265,165 +252,6 @@ SILKWORM_EXPORT int silkworm_init(SilkwormHandle* handle, const struct SilkwormS
return SILKWORM_OK;
}

SILKWORM_EXPORT int silkworm_build_recsplit_indexes(SilkwormHandle handle, struct SilkwormMemoryMappedFile* segments[], size_t len) SILKWORM_NOEXCEPT {
constexpr int kNeededIndexesToBuildInParallel = 2;

if (!handle) {
return SILKWORM_INVALID_HANDLE;
}

auto schema = db::blocks::make_blocks_repository_schema();

std::vector<std::shared_ptr<snapshots::IndexBuilder>> needed_indexes;
for (size_t i = 0; i < len; ++i) {
struct SilkwormMemoryMappedFile* segment = segments[i];
if (!segment) {
return SILKWORM_INVALID_SNAPSHOT;
}
auto segment_region = make_region(*segment);

const auto snapshot_path = snapshots::SnapshotPath::parse(segment->file_path);
if (!snapshot_path) {
return SILKWORM_INVALID_PATH;
}

auto names = schema.entity_name_by_path(*snapshot_path);
if (!names) {
return SILKWORM_INVALID_PATH;
}
datastore::EntityName name = names->second;
{
if (name == db::blocks::kHeaderSegmentName) {
auto index = std::make_shared<snapshots::IndexBuilder>(snapshots::HeaderIndex::make(*snapshot_path, segment_region));
needed_indexes.push_back(index);
} else if (name == db::blocks::kBodySegmentName) {
auto index = std::make_shared<snapshots::IndexBuilder>(snapshots::BodyIndex::make(*snapshot_path, segment_region));
needed_indexes.push_back(index);
} else if (name == db::blocks::kTxnSegmentName) {
auto bodies_segment_path = snapshot_path->related_path(std::string{db::blocks::kBodySegmentTag}, db::blocks::kSegmentExtension);
auto bodies_file = std::find_if(segments, segments + len, [&](SilkwormMemoryMappedFile* file) -> bool {
return snapshots::SnapshotPath::parse(file->file_path) == bodies_segment_path;
});

if (bodies_file < segments + len) {
auto bodies_segment_region = make_region(**bodies_file);

auto index = std::make_shared<snapshots::IndexBuilder>(snapshots::TransactionIndex::make(
bodies_segment_path, bodies_segment_region, *snapshot_path, segment_region));
needed_indexes.push_back(index);

index = std::make_shared<snapshots::IndexBuilder>(snapshots::TransactionToBlockIndex::make(
bodies_segment_path, bodies_segment_region, *snapshot_path, segment_region));
needed_indexes.push_back(index);
}
} else {
SILKWORM_ASSERT(false);
}
}
}

if (needed_indexes.size() < kNeededIndexesToBuildInParallel) {
// sequential build
for (const auto& index : needed_indexes) {
index->build();
}
} else {
// parallel build
ThreadPool workers;

// Create worker tasks for missing indexes
for (const auto& index : needed_indexes) {
workers.push_task([=]() {
try {
SILK_INFO << "Build index: " << index->path().filename() << " start";
index->build();
SILK_INFO << "Build index: " << index->path().filename() << " end";
} catch (const std::exception& ex) {
SILK_CRIT << "Build index: " << index->path().filename() << " failed [" << ex.what() << "]";
}
});
}

// Wait for all missing indexes to be built or stop request
while (workers.get_tasks_total()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}

// Wait for any already-started-but-unfinished work in case of stop request
workers.pause();
workers.wait_for_tasks();
}

return SILKWORM_OK;
}

SILKWORM_EXPORT int silkworm_add_snapshot(SilkwormHandle handle, SilkwormChainSnapshot* snapshot) SILKWORM_NOEXCEPT {
if (!handle || !handle->blocks_repository) {
return SILKWORM_INVALID_HANDLE;
}
if (!snapshot) {
return SILKWORM_INVALID_SNAPSHOT;
}
const SilkwormHeadersSnapshot& hs = snapshot->headers;
if (!hs.segment.file_path || !hs.header_hash_index.file_path) {
return SILKWORM_INVALID_PATH;
}
const auto headers_segment_path = snapshots::SnapshotPath::parse(hs.segment.file_path);
if (!headers_segment_path) {
return SILKWORM_INVALID_PATH;
}
snapshots::segment::SegmentFileReader header_segment{*headers_segment_path, make_region(hs.segment)};
snapshots::rec_split::AccessorIndex idx_header_hash{headers_segment_path->related_path_ext(db::blocks::kIdxExtension), make_region(hs.header_hash_index)};

const SilkwormBodiesSnapshot& bs = snapshot->bodies;
if (!bs.segment.file_path || !bs.block_num_index.file_path) {
return SILKWORM_INVALID_PATH;
}
const auto bodies_segment_path = snapshots::SnapshotPath::parse(bs.segment.file_path);
if (!bodies_segment_path) {
return SILKWORM_INVALID_PATH;
}
snapshots::segment::SegmentFileReader body_segment{*bodies_segment_path, make_region(bs.segment)};
snapshots::rec_split::AccessorIndex idx_body_number{bodies_segment_path->related_path_ext(db::blocks::kIdxExtension), make_region(bs.block_num_index)};

const SilkwormTransactionsSnapshot& ts = snapshot->transactions;
if (!ts.segment.file_path || !ts.tx_hash_index.file_path || !ts.tx_hash_2_block_index.file_path) {
return SILKWORM_INVALID_PATH;
}
const auto transactions_segment_path = snapshots::SnapshotPath::parse(ts.segment.file_path);
if (!transactions_segment_path) {
return SILKWORM_INVALID_PATH;
}
snapshots::segment::SegmentFileReader txn_segment{*transactions_segment_path, make_region(ts.segment)};
snapshots::rec_split::AccessorIndex idx_txn_hash{transactions_segment_path->related_path_ext(db::blocks::kIdxExtension), make_region(ts.tx_hash_index)};
snapshots::rec_split::AccessorIndex idx_txn_hash_2_block{transactions_segment_path->related_path(std::string{db::blocks::kIdxTxnHash2BlockTag}, db::blocks::kIdxExtension), make_region(ts.tx_hash_2_block_index)};

auto bundle_data_provider = [&]() -> snapshots::SnapshotBundleEntityData {
snapshots::SnapshotBundleEntityData data;

data.segments.emplace(db::blocks::kHeaderSegmentName, std::move(header_segment));
data.accessor_indexes.emplace(db::blocks::kIdxHeaderHashName, std::move(idx_header_hash));

data.segments.emplace(db::blocks::kBodySegmentName, std::move(body_segment));
data.accessor_indexes.emplace(db::blocks::kIdxBodyNumberName, std::move(idx_body_number));

data.segments.emplace(db::blocks::kTxnSegmentName, std::move(txn_segment));
data.accessor_indexes.emplace(db::blocks::kIdxTxnHashName, std::move(idx_txn_hash));
data.accessor_indexes.emplace(db::blocks::kIdxTxnHash2BlockName, std::move(idx_txn_hash_2_block));

return data;
};
snapshots::SnapshotBundleData bundle_data;
bundle_data.entities.emplace(snapshots::Schema::kDefaultEntityName, bundle_data_provider());

snapshots::SnapshotBundle bundle{
headers_segment_path->step_range(),
std::move(bundle_data),
};
handle->blocks_repository->add_snapshot_bundle(std::move(bundle));
return SILKWORM_OK;
}

SILKWORM_EXPORT const char* silkworm_libmdbx_version() SILKWORM_NOEXCEPT {
return ::mdbx::get_version().git.describe;
}
Expand Down
49 changes: 47 additions & 2 deletions silkworm/capi/silkworm.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,39 @@ struct SilkwormChainSnapshot {
struct SilkwormTransactionsSnapshot transactions;
};

struct SilkwormInvertedIndexSnapshot {
struct SilkwormMemoryMappedFile segment; // .ef
struct SilkwormMemoryMappedFile accessor_index; // .efi
};

struct SilkwormHistorySnapshot {
struct SilkwormMemoryMappedFile segment; // .v
struct SilkwormMemoryMappedFile accessor_index; // .vi
struct SilkwormInvertedIndexSnapshot inverted_index;
};

struct SilkwormDomainSnapshot {
struct SilkwormMemoryMappedFile segment; // .kv
struct SilkwormMemoryMappedFile existence_index; // .kvei
struct SilkwormMemoryMappedFile btree_index; // .bt
bool has_accessor_index;
struct SilkwormMemoryMappedFile accessor_index; // .kvi
bool has_history;
struct SilkwormHistorySnapshot history;
};

struct SilkwormStateSnapshot {
struct SilkwormDomainSnapshot accounts;
struct SilkwormDomainSnapshot storage;
struct SilkwormDomainSnapshot code;
struct SilkwormDomainSnapshot commitment;
struct SilkwormDomainSnapshot receipts;
struct SilkwormInvertedIndexSnapshot log_addresses;
struct SilkwormInvertedIndexSnapshot log_topics;
struct SilkwormInvertedIndexSnapshot traces_from;
struct SilkwormInvertedIndexSnapshot traces_to;
};

#define SILKWORM_PATH_SIZE 260
#define SILKWORM_GIT_VERSION_SIZE 32

Expand All @@ -120,6 +153,10 @@ struct SilkwormSettings {
char data_dir_path[SILKWORM_PATH_SIZE];
//! libmdbx version string in git describe format.
char libmdbx_version[SILKWORM_GIT_VERSION_SIZE];
//! Index salt for block snapshots
uint32_t blocks_repo_index_salt;
//! Index salt for state snapshots
uint32_t state_repo_index_salt;
};

/**
Expand All @@ -140,12 +177,20 @@ SILKWORM_EXPORT int silkworm_init(SilkwormHandle* handle, const struct SilkwormS
SILKWORM_EXPORT int silkworm_build_recsplit_indexes(SilkwormHandle handle, struct SilkwormMemoryMappedFile* segments[], size_t len) SILKWORM_NOEXCEPT;

/**
* \brief Notify Silkworm about a new snapshot to use.
* \brief Notify Silkworm about a new *block* snapshot to use.
* \param[in] handle A valid Silkworm instance handle, got with silkworm_init.
* \param[in] snapshot A snapshot to use.
* \return SILKWORM_OK (=0) on success, a non-zero error value on failure.
*/
SILKWORM_EXPORT int silkworm_add_snapshot(SilkwormHandle handle, struct SilkwormChainSnapshot* snapshot) SILKWORM_NOEXCEPT;
SILKWORM_EXPORT int silkworm_add_blocks_snapshot(SilkwormHandle handle, struct SilkwormChainSnapshot* snapshot) SILKWORM_NOEXCEPT;

/**
* \brief Notify Silkworm about a new *state* snapshot to use.
* \param[in] handle A valid Silkworm instance handle, got with silkworm_init.
* \param[in] snapshot A *state* snapshot to use.
* \return SILKWORM_OK (=0) on success, a non-zero error value on failure.
*/
SILKWORM_EXPORT int silkworm_add_state_snapshot(SilkwormHandle handle, const struct SilkwormStateSnapshot* snapshot) SILKWORM_NOEXCEPT;

/**
* \brief Get libmdbx version for compatibility checks.
Expand Down
Loading
Loading