diff --git a/Builds/CMake/FindMySQL.cmake b/Builds/CMake/FindMySQL.cmake new file mode 100644 index 000000000..7e1da1ec9 --- /dev/null +++ b/Builds/CMake/FindMySQL.cmake @@ -0,0 +1,48 @@ +# - Find MySQL +find_path(MYSQL_INCLUDE_DIR + NAMES mysql.h + PATHS + /usr/include/mysql + /usr/local/include/mysql + /opt/mysql/mysql/include + DOC "MySQL include directory" +) + +find_library(MYSQL_LIBRARY + NAMES mysqlclient + PATHS + /usr/lib + /usr/lib/x86_64-linux-gnu + /usr/lib/mysql + /usr/local/lib/mysql + /opt/mysql/mysql/lib + DOC "MySQL client library" +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(MySQL + REQUIRED_VARS + MYSQL_LIBRARY + MYSQL_INCLUDE_DIR +) + +if(MYSQL_FOUND) + set(MYSQL_INCLUDE_DIRS ${MYSQL_INCLUDE_DIR}) + set(MYSQL_LIBRARIES ${MYSQL_LIBRARY}) + + # Create an imported target + if(NOT TARGET MySQL::MySQL) + add_library(MySQL::MySQL UNKNOWN IMPORTED) + set_target_properties(MySQL::MySQL PROPERTIES + IMPORTED_LOCATION "${MYSQL_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${MYSQL_INCLUDE_DIR}" + ) + endif() + + mark_as_advanced(MYSQL_INCLUDE_DIR MYSQL_LIBRARY) +else() + message(FATAL_ERROR "Could not find MySQL development files") +endif() + +message(STATUS "Using MySQL include dir: ${MYSQL_INCLUDE_DIR}") +message(STATUS "Using MySQL library: ${MYSQL_LIBRARY}") diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 78843991f..2bdc1b050 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -540,6 +540,7 @@ target_sources (rippled PRIVATE #]===============================] src/ripple/nodestore/backend/CassandraFactory.cpp src/ripple/nodestore/backend/RWDBFactory.cpp + src/ripple/nodestore/backend/MySQLFactory.cpp src/ripple/nodestore/backend/MemoryFactory.cpp src/ripple/nodestore/backend/FlatmapFactory.cpp src/ripple/nodestore/backend/NuDBFactory.cpp diff --git a/Builds/CMake/deps/MySQL.cmake b/Builds/CMake/deps/MySQL.cmake new file mode 100644 index 000000000..a814360b6 --- /dev/null +++ b/Builds/CMake/deps/MySQL.cmake @@ -0,0 +1,56 @@ +#[===================================================================[ + dep: MySQL + MySQL client library integration for rippled (static linking) +#]===================================================================] +# Create an IMPORTED target for MySQL +add_library(mysql_client UNKNOWN IMPORTED) + +# Find MySQL client library and headers +find_path(MYSQL_INCLUDE_DIR + NAMES mysql.h + PATHS + /usr/include/mysql + /usr/local/include/mysql + /opt/mysql/mysql/include + DOC "MySQL include directory" +) + +# Modified to specifically look for static library +find_library(MYSQL_LIBRARY + NAMES libmysqlclient.a mysqlclient.a # Look for static libraries first + PATHS + /usr/lib + /usr/lib/x86_64-linux-gnu + /usr/lib/mysql + /usr/local/lib/mysql + /opt/mysql/mysql/lib + DOC "MySQL client static library" + NO_DEFAULT_PATH # Prevents finding dynamic library first +) + +# Set properties on the imported target +if(MYSQL_INCLUDE_DIR AND MYSQL_LIBRARY) + set_target_properties(mysql_client PROPERTIES + IMPORTED_LOCATION "${MYSQL_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${MYSQL_INCLUDE_DIR}" + IMPORTED_LINK_INTERFACE_LANGUAGES "CXX" # Added for static linking + IMPORTED_LINK_INTERFACE_MULTIPLICITY "1" # Added for static linking + ) + message(STATUS "Found MySQL include dir: ${MYSQL_INCLUDE_DIR}") + message(STATUS "Found MySQL library: ${MYSQL_LIBRARY}") +else() + message(FATAL_ERROR "Could not find MySQL static development files. Please install libmysqlclient-dev") +endif() + +# Add MySQL backend source to rippled sources +list(APPEND rippled_src + src/ripple/nodestore/backend/MySQLBackend.cpp) + +# Link MySQL to rippled +target_link_libraries(ripple_libs + INTERFACE + mysql_client +) + +# Create an alias target for consistency with other deps +add_library(deps::mysql ALIAS mysql_client) diff --git a/CMakeLists.txt b/CMakeLists.txt index d62541fad..8a8acc601 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -75,6 +75,7 @@ include(deps/gRPC) include(deps/cassandra) include(deps/Postgres) include(deps/WasmEdge) +include(deps/MySQL) ### diff --git a/build-full.sh b/build-full.sh index 3ae0251d7..c8d68935c 100755 --- a/build-full.sh +++ b/build-full.sh @@ -69,7 +69,14 @@ fi mkdir .nih_c; mkdir .nih_toolchain; cd .nih_toolchain && -yum install -y wget lz4 lz4-devel git llvm13-static.x86_64 llvm13-devel.x86_64 devtoolset-10-binutils zlib-static ncurses-static -y \ +(cat > /etc/yum.repos.d/MariaDB.repo << EOF +[mariadb] +name = MariaDB +baseurl = http://yum.mariadb.org/10.5/centos7-amd64 +gpgkey=https://yum.mariadb.org/RPM-GPG-KEY-MariaDB +gpgcheck=1 +EOF ) && +yum install -y wget lz4 lz4-devel git llvm13-static.x86_64 llvm13-devel.x86_64 devtoolset-10-binutils zlib-static ncurses-static MariaDB-devel MariaDB-shared -y \ devtoolset-7-gcc-c++ \ devtoolset-9-gcc-c++ \ devtoolset-10-gcc-c++ \ diff --git a/src/ripple/app/rdb/backend/MySQLDatabase.h b/src/ripple/app/rdb/backend/MySQLDatabase.h new file mode 100644 index 000000000..6b89bf7db --- /dev/null +++ b/src/ripple/app/rdb/backend/MySQLDatabase.h @@ -0,0 +1,1639 @@ +#ifndef RIPPLE_APP_RDB_BACKEND_MYSQLDATABASE_H_INCLUDED +#define RIPPLE_APP_RDB_BACKEND_MYSQLDATABASE_H_INCLUDED + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { + +struct MySQLDeleter +{ + void + operator()(MYSQL* mysql) + { + if (mysql) + { + mysql_close(mysql); + } + } +}; + +// Thread-local MySQL connection +static thread_local std::unique_ptr threadLocalMySQL_; + +class MySQLDatabase : public SQLiteDatabase +{ +private: + Application& app_; + bool const useTxTables_; + + // Configuration for creating new connections + struct MySQLConfig + { + std::string host; + std::string user; + std::string pass; + std::string name; + unsigned int port; + }; + MySQLConfig config_; + + MYSQL* + initializeConnection() + { + MYSQL* mysql = mysql_init(nullptr); + if (!mysql) + { + throw std::runtime_error("Failed to initialize MySQL"); + } + + if (!mysql_real_connect( + mysql, + config_.host.c_str(), + config_.user.c_str(), + config_.pass.c_str(), + nullptr, // Don't select database in connection + config_.port, + nullptr, + 0)) + { + auto error = mysql_error(mysql); + mysql_close(mysql); + throw std::runtime_error( + std::string("Failed to connect to MySQL: ") + error); + } + + // Try to select the database first + if (mysql_select_db(mysql, config_.name.c_str())) + { + // Database selection failed, try to create it + std::string create_db_query = "CREATE DATABASE IF NOT EXISTS " + + std::string(config_.name.c_str()); + + if (mysql_query(mysql, create_db_query.c_str())) + { + // Creation failed for some reason + auto error = mysql_error(mysql); + mysql_close(mysql); + throw std::runtime_error( + std::string("Failed to create database: ") + error); + } + } + + // Try selecting again (either after creation or if it existed already) + if (mysql_select_db(mysql, config_.name.c_str())) + { + auto error = mysql_error(mysql); + mysql_close(mysql); + throw std::runtime_error( + std::string("Failed to select database: ") + error); + } + + return mysql; + } + + // Get the thread-local MySQL connection, creating it if necessary + MYSQL* + getConnection() + { + if (!threadLocalMySQL_) + { + threadLocalMySQL_.reset(initializeConnection()); + } + return threadLocalMySQL_.get(); + } + + // Schema creation statements + static constexpr auto CREATE_LEDGERS_TABLE = R"SQL( + CREATE TABLE IF NOT EXISTS ledgers ( + ledger_seq BIGINT PRIMARY KEY, + ledger_hash VARCHAR(64) UNIQUE NOT NULL, + parent_hash VARCHAR(64) NOT NULL, + total_coins BIGINT NOT NULL, + closing_time BIGINT NOT NULL, + prev_closing_time BIGINT NOT NULL, + close_time_resolution BIGINT NOT NULL, + close_flags INT NOT NULL, + account_hash VARCHAR(64) NOT NULL, + tx_hash VARCHAR(64) NOT NULL + ) + )SQL"; + + static constexpr auto CREATE_TRANSACTIONS_TABLE = R"SQL( + CREATE TABLE IF NOT EXISTS transactions ( + tx_hash VARCHAR(64) PRIMARY KEY, + ledger_seq BIGINT NOT NULL, + tx_seq INT NOT NULL, + raw_tx MEDIUMBLOB NOT NULL, + meta_data MEDIUMBLOB NOT NULL, + FOREIGN KEY (ledger_seq) REFERENCES ledgers(ledger_seq) + ) + )SQL"; + + static constexpr auto CREATE_ACCOUNT_TRANSACTIONS_TABLE = R"SQL( + CREATE TABLE IF NOT EXISTS account_transactions ( + account_id VARCHAR(64) NOT NULL, + tx_hash VARCHAR(64) NOT NULL, + ledger_seq BIGINT NOT NULL, + tx_seq INT NOT NULL, + PRIMARY KEY (account_id, ledger_seq, tx_seq), + FOREIGN KEY (tx_hash) REFERENCES transactions(tx_hash), + FOREIGN KEY (ledger_seq) REFERENCES ledgers(ledger_seq) + ) + )SQL"; + +public: + MySQLDatabase(Application& app, Config const& config, JobQueue& jobQueue) + : app_(app), useTxTables_(config.useTxTables()) + { + if (!config.mysql.has_value()) + throw std::runtime_error( + "[mysql_settings] stanza missing from config!"); + + // Store configuration for creating new connections + config_.host = config.mysql->host; + config_.user = config.mysql->user; + config_.pass = config.mysql->pass; + config_.name = config.mysql->name; + config_.port = config.mysql->port; + + // Initialize first connection and create schema + auto mysql = getConnection(); + + // Create database if it doesn't exist + std::string create_db = "CREATE DATABASE IF NOT EXISTS " + config_.name; + if (mysql_query(mysql, create_db.c_str())) + throw std::runtime_error( + std::string("Failed to create database: ") + + mysql_error(mysql)); + + // Create schema tables + if (mysql_query(mysql, CREATE_LEDGERS_TABLE)) + throw std::runtime_error( + std::string("Failed to create ledgers table: ") + + mysql_error(mysql)); + + if (useTxTables_) + { + if (mysql_query(mysql, CREATE_TRANSACTIONS_TABLE)) + throw std::runtime_error( + std::string("Failed to create transactions table: ") + + mysql_error(mysql)); + + if (mysql_query(mysql, CREATE_ACCOUNT_TRANSACTIONS_TABLE)) + throw std::runtime_error( + std::string( + "Failed to create account_transactions table: ") + + mysql_error(mysql)); + } + } + + bool + saveValidatedLedger( + std::shared_ptr const& ledger, + bool current) override + { + auto j = app_.journal("Ledger"); + auto seq = ledger->info().seq; + + if (!ledger->info().accountHash.isNonZero()) + { + JLOG(j.fatal()) << "AH is zero: " << getJson({*ledger, {}}); + assert(false); + return false; + } + + // Save the ledger header + std::stringstream sql; + sql << "INSERT INTO ledgers (" + << "ledger_seq, ledger_hash, parent_hash, total_coins, " + << "closing_time, prev_closing_time, close_time_resolution, " + << "close_flags, account_hash, tx_hash) VALUES (" << seq << ", " + << "'" << strHex(ledger->info().hash) << "', " + << "'" << strHex(ledger->info().parentHash) << "', " + << ledger->info().drops.drops() << ", " + << ledger->info().closeTime.time_since_epoch().count() << ", " + << ledger->info().parentCloseTime.time_since_epoch().count() << ", " + << ledger->info().closeTimeResolution.count() << ", " + << ledger->info().closeFlags << ", " + << "'" << strHex(ledger->info().accountHash) << "', " + << "'" << strHex(ledger->info().txHash) << "') " + << "ON DUPLICATE KEY UPDATE " + << "parent_hash = VALUES(parent_hash), " + << "total_coins = VALUES(total_coins), " + << "closing_time = VALUES(closing_time), " + << "prev_closing_time = VALUES(prev_closing_time), " + << "close_time_resolution = VALUES(close_time_resolution), " + << "close_flags = VALUES(close_flags), " + << "account_hash = VALUES(account_hash), " + << "tx_hash = VALUES(tx_hash)"; + + if (mysql_query(getConnection(), sql.str().c_str())) + { + JLOG(j.fatal()) + << "Failed to save ledger: " << mysql_error(getConnection()); + return false; + } + + if (useTxTables_) + { + std::shared_ptr aLedger; + try + { + aLedger = + app_.getAcceptedLedgerCache().fetch(ledger->info().hash); + if (!aLedger) + { + aLedger = std::make_shared(ledger, app_); + app_.getAcceptedLedgerCache().canonicalize_replace_client( + ledger->info().hash, aLedger); + } + } + catch (std::exception const&) + { + JLOG(j.warn()) << "An accepted ledger was missing nodes"; + return false; + } + + // Start a transaction for saving all transactions + if (mysql_query(getConnection(), "START TRANSACTION")) + { + JLOG(j.fatal()) << "Failed to start transaction: " + << mysql_error(getConnection()); + return false; + } + + try + { + for (auto const& acceptedLedgerTx : *aLedger) + { + auto const& txn = acceptedLedgerTx->getTxn(); + auto const& meta = acceptedLedgerTx->getMeta(); + auto const& id = txn->getTransactionID(); + + // Save transaction + std::stringstream txSql; + txSql << "INSERT INTO transactions (" + << "tx_hash, ledger_seq, tx_seq, raw_tx, meta_data) " + "VALUES (" + << "'" << strHex(id) << "', " << seq << ", " + << acceptedLedgerTx->getTxnSeq() << ", " + << "?, ?) " // Using placeholders for BLOB data + << "ON DUPLICATE KEY UPDATE " + << "ledger_seq = VALUES(ledger_seq), " + << "tx_seq = VALUES(tx_seq), " + << "raw_tx = VALUES(raw_tx), " + << "meta_data = VALUES(meta_data)"; + + MYSQL_STMT* stmt = mysql_stmt_init(getConnection()); + if (!stmt) + { + throw std::runtime_error( + "Failed to initialize statement"); + } + + if (mysql_stmt_prepare( + stmt, txSql.str().c_str(), txSql.str().length())) + { + mysql_stmt_close(stmt); + throw std::runtime_error("Failed to prepare statement"); + } + + // Bind parameters for BLOB data + MYSQL_BIND bind[2]; + memset(bind, 0, sizeof(bind)); + + Serializer s; + txn->add(s); + bind[0].buffer_type = MYSQL_TYPE_BLOB; + bind[0].buffer = (void*)s.data(); + bind[0].buffer_length = s.size(); + + Serializer s2; + meta.getAsObject().addWithoutSigningFields(s2); + + bind[1].buffer_type = MYSQL_TYPE_BLOB; + bind[1].buffer = (void*)s2.data(); + bind[1].buffer_length = s2.size(); + + if (mysql_stmt_bind_param(stmt, bind)) + { + mysql_stmt_close(stmt); + throw std::runtime_error("Failed to bind parameters"); + } + + if (mysql_stmt_execute(stmt)) + { + mysql_stmt_close(stmt); + throw std::runtime_error("Failed to execute statement"); + } + + mysql_stmt_close(stmt); + + // Save account transactions + for (auto const& account : meta.getAffectedAccounts()) + { + std::stringstream accTxSql; + accTxSql << "INSERT INTO account_transactions (" + << "account_id, tx_hash, ledger_seq, tx_seq) " + "VALUES (" + << "'" << strHex(account) << "', " + << "'" << strHex(id) << "', " << seq << ", " + << acceptedLedgerTx->getTxnSeq() << ") " + << "ON DUPLICATE KEY UPDATE " + << "tx_hash = VALUES(tx_hash)"; + + if (mysql_query( + getConnection(), accTxSql.str().c_str())) + { + throw std::runtime_error( + mysql_error(getConnection())); + } + } + + app_.getMasterTransaction().inLedger( + id, + seq, + acceptedLedgerTx->getTxnSeq(), + app_.config().NETWORK_ID); + } + + if (mysql_query(getConnection(), "COMMIT")) + { + throw std::runtime_error(mysql_error(getConnection())); + } + } + catch (std::exception const& e) + { + JLOG(j.fatal()) << "Error saving transactions: " << e.what(); + mysql_query(getConnection(), "ROLLBACK"); + return false; + } + } + + return true; + } + + std::optional + getMinLedgerSeq() override + { + if (mysql_query(getConnection(), "SELECT MIN(ledger_seq) FROM ledgers")) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = std::stoll(row[0]); + mysql_free_result(result); + return seq; + } + + std::optional + getTransactionsMinLedgerSeq() override + { + if (!useTxTables_) + return {}; + + if (mysql_query( + getConnection(), "SELECT MIN(ledger_seq) FROM transactions")) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = std::stoll(row[0]); + mysql_free_result(result); + return seq; + } + + std::optional + getMaxLedgerSeq() override + { + if (mysql_query(getConnection(), "SELECT MAX(ledger_seq) FROM ledgers")) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = std::stoll(row[0]); + mysql_free_result(result); + return seq; + } + + void + deleteTransactionByLedgerSeq(LedgerIndex ledgerSeq) override + { + if (!useTxTables_) + return; + + std::stringstream sql; + sql << "DELETE FROM account_transactions WHERE ledger_seq = " + << ledgerSeq; + mysql_query(getConnection(), sql.str().c_str()); + + sql.str(""); + sql << "DELETE FROM transactions WHERE ledger_seq = " << ledgerSeq; + mysql_query(getConnection(), sql.str().c_str()); + } + + void + deleteBeforeLedgerSeq(LedgerIndex ledgerSeq) override + { + if (useTxTables_) + { + std::stringstream sql; + sql << "DELETE FROM account_transactions WHERE ledger_seq < " + << ledgerSeq; + mysql_query(getConnection(), sql.str().c_str()); + + sql.str(""); + sql << "DELETE FROM transactions WHERE ledger_seq < " << ledgerSeq; + mysql_query(getConnection(), sql.str().c_str()); + } + + std::stringstream sql; + sql << "DELETE FROM ledgers WHERE ledger_seq < " << ledgerSeq; + mysql_query(getConnection(), sql.str().c_str()); + } + + void + deleteTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) override + { + if (!useTxTables_) + return; + + std::stringstream sql; + sql << "DELETE FROM account_transactions WHERE ledger_seq < " + << ledgerSeq; + mysql_query(getConnection(), sql.str().c_str()); + + sql.str(""); + sql << "DELETE FROM transactions WHERE ledger_seq < " << ledgerSeq; + mysql_query(getConnection(), sql.str().c_str()); + } + + void + deleteAccountTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) override + { + if (!useTxTables_) + return; + + std::stringstream sql; + sql << "DELETE FROM account_transactions WHERE ledger_seq < " + << ledgerSeq; + mysql_query(getConnection(), sql.str().c_str()); + } + + std::size_t + getTransactionCount() override + { + if (!useTxTables_) + return 0; + + if (mysql_query(getConnection(), "SELECT COUNT(*) FROM transactions")) + return 0; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return 0; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return 0; + } + + std::size_t count = std::stoull(row[0]); + mysql_free_result(result); + return count; + } + + std::size_t + getAccountTransactionCount() override + { + if (!useTxTables_) + return 0; + + if (mysql_query( + getConnection(), "SELECT COUNT(*) FROM account_transactions")) + return 0; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return 0; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return 0; + } + + std::size_t count = std::stoull(row[0]); + mysql_free_result(result); + return count; + } + + CountMinMax + getLedgerCountMinMax() override + { + if (mysql_query( + getConnection(), + "SELECT COUNT(*), MIN(ledger_seq), MAX(ledger_seq) FROM " + "ledgers")) + return {0, 0, 0}; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return {0, 0, 0}; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0] || !row[1] || !row[2]) + { + mysql_free_result(result); + return {0, 0, 0}; + } + + CountMinMax ret{ + std::stoull(row[0]), + static_cast(std::stoll(row[1])), + static_cast(std::stoll(row[2]))}; + mysql_free_result(result); + return ret; + } + + std::optional + getLedgerInfoByIndex(LedgerIndex ledgerSeq) override + { + std::stringstream sql; + sql << "SELECT ledger_hash, parent_hash, total_coins, closing_time, " + << "prev_closing_time, close_time_resolution, close_flags, " + << "account_hash, tx_hash FROM ledgers WHERE ledger_seq = " + << ledgerSeq; + + if (mysql_query(getConnection(), sql.str().c_str())) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerInfo info; + info.seq = ledgerSeq; + info.hash = uint256(row[0]); + info.parentHash = uint256(row[1]); + info.drops = XRPAmount(std::stoull(row[2])); + info.closeTime = + NetClock::time_point{NetClock::duration{std::stoll(row[3])}}; + info.parentCloseTime = + NetClock::time_point{NetClock::duration{std::stoll(row[4])}}; + info.closeTimeResolution = NetClock::duration{std::stoll(row[5])}; + info.closeFlags = std::stoul(row[6]); + info.accountHash = uint256(row[7]); + info.txHash = uint256(row[8]); + + mysql_free_result(result); + return info; + } + + std::optional + getLimitedOldestLedgerInfo(LedgerIndex ledgerFirstIndex) override + { + std::stringstream sql; + sql << "SELECT ledger_seq FROM ledgers WHERE ledger_seq >= " + << ledgerFirstIndex << " ORDER BY ledger_seq ASC LIMIT 1"; + + if (mysql_query(getConnection(), sql.str().c_str())) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = std::stoll(row[0]); + mysql_free_result(result); + return getLedgerInfoByIndex(seq); + } + + std::optional + getLimitedNewestLedgerInfo(LedgerIndex ledgerFirstIndex) override + { + std::stringstream sql; + sql << "SELECT ledger_seq FROM ledgers WHERE ledger_seq >= " + << ledgerFirstIndex << " ORDER BY ledger_seq DESC LIMIT 1"; + + if (mysql_query(getConnection(), sql.str().c_str())) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = std::stoll(row[0]); + mysql_free_result(result); + return getLedgerInfoByIndex(seq); + } + + std::optional + getLedgerInfoByHash(uint256 const& ledgerHash) override + { + std::stringstream sql; + sql << "SELECT ledger_seq FROM ledgers WHERE ledger_hash = '" + << strHex(ledgerHash) << "'"; + + if (mysql_query(getConnection(), sql.str().c_str())) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = std::stoll(row[0]); + mysql_free_result(result); + return getLedgerInfoByIndex(seq); + } + + uint256 + getHashByIndex(LedgerIndex ledgerIndex) override + { + std::stringstream sql; + sql << "SELECT ledger_hash FROM ledgers WHERE ledger_seq = " + << ledgerIndex; + + if (mysql_query(getConnection(), sql.str().c_str())) + return uint256(); + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return uint256(); + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return uint256(); + } + + uint256 hash(row[0]); + mysql_free_result(result); + return hash; + } + + std::optional + getHashesByIndex(LedgerIndex ledgerIndex) override + { + std::stringstream sql; + sql << "SELECT ledger_hash, parent_hash FROM ledgers WHERE ledger_seq " + "= " + << ledgerIndex; + + if (mysql_query(getConnection(), sql.str().c_str())) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0] || !row[1]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerHashPair pair{uint256(row[0]), uint256(row[1])}; + mysql_free_result(result); + return pair; + } + + std::map + getHashesByIndex(LedgerIndex minSeq, LedgerIndex maxSeq) override + { + std::map result; + std::stringstream sql; + sql << "SELECT ledger_seq, ledger_hash, parent_hash FROM ledgers " + << "WHERE ledger_seq BETWEEN " << minSeq << " AND " << maxSeq + << " ORDER BY ledger_seq"; + + if (mysql_query(getConnection(), sql.str().c_str())) + return result; + + MYSQL_RES* sqlResult = mysql_store_result(getConnection()); + if (!sqlResult) + return result; + + MYSQL_ROW row; + while ((row = mysql_fetch_row(sqlResult))) + { + LedgerIndex const seq = + static_cast(std::stoull(row[0])); + result.emplace( + seq, LedgerHashPair{uint256{row[1]}, uint256{row[2]}}); + } + + mysql_free_result(sqlResult); + return result; + } + + std::optional + getAccountTransactionsMinLedgerSeq() override + { + if (!useTxTables_) + return {}; + + if (mysql_query( + getConnection(), + "SELECT MIN(ledger_seq) FROM account_transactions")) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = static_cast(std::stoull(row[0])); + mysql_free_result(result); + return seq; + } + + std::optional + getNewestLedgerInfo() override + { + if (mysql_query( + getConnection(), + "SELECT ledger_seq FROM ledgers ORDER BY ledger_seq DESC LIMIT " + "1")) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = static_cast(std::stoull(row[0])); + mysql_free_result(result); + return getLedgerInfoByIndex(seq); + } + + std::variant + getTransaction( + uint256 const& id, + std::optional> const& range, + error_code_i& ec) override + { + if (!useTxTables_) + return TxSearched::unknown; + + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq " + << "FROM transactions t WHERE t.tx_hash = '" << strHex(id) << "'"; + + if (range) + { + sql << " AND t.ledger_seq BETWEEN " << range->first() << " AND " + << range->last(); + } + + if (mysql_query(getConnection(), sql.str().c_str())) + return TxSearched::unknown; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return TxSearched::unknown; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row) + { + mysql_free_result(result); + if (range) + { + sql.str(""); + sql << "SELECT COUNT(*) FROM ledgers WHERE ledger_seq BETWEEN " + << range->first() << " AND " << range->last(); + + if (mysql_query(getConnection(), sql.str().c_str())) + return TxSearched::unknown; + + result = mysql_store_result(getConnection()); + if (!result) + return TxSearched::unknown; + + row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return TxSearched::unknown; + } + + std::size_t count = std::stoull(row[0]); + mysql_free_result(result); + + return (count == (range->last() - range->first() + 1)) + ? TxSearched::all + : TxSearched::some; + } + return TxSearched::unknown; + } + + unsigned long* lengths = mysql_fetch_lengths(result); + if (!lengths) + { + mysql_free_result(result); + return TxSearched::unknown; + } + + // Deserialize transaction and metadata + try + { + SerialIter sit(row[0], lengths[0]); + auto txn = std::make_shared(sit); + + auto meta = std::make_shared( + id, + static_cast(std::stoull(row[2])), + Blob(row[1], row[1] + lengths[1])); + + mysql_free_result(result); + + AccountTx at; + std::string reason; + at.first = std::make_shared(txn, reason, app_); + at.first->setStatus(COMMITTED); + at.first->setLedger(static_cast(std::stoull(row[2]))); + at.second = meta; + + return at; + } + catch (std::exception const&) + { + mysql_free_result(result); + return TxSearched::unknown; + } + } + + std::pair> + oldestAccountTxPage(AccountTxPageOptions const& options) override + { + if (!useTxTables_) + return {}; + + static std::uint32_t const page_length(200); + auto onUnsavedLedger = + std::bind(saveLedgerAsync, std::ref(app_), std::placeholders::_1); + AccountTxs ret; + Application& app = app_; + auto onTransaction = [&ret, &app]( + std::uint32_t ledger_index, + std::string const& status, + Blob&& rawTxn, + Blob&& rawMeta) { + convertBlobsToTxResult( + ret, ledger_index, status, rawTxn, rawMeta, app); + }; + + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq, t.tx_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger << " AND " + << options.maxLedger << " ORDER BY at.ledger_seq, at.tx_seq" + << " LIMIT " << (options.limit + 1); + + if (mysql_query(getConnection(), sql.str().c_str())) + return {}; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return {}; + + std::optional marker; + std::size_t count = 0; + MYSQL_ROW row; + + while ((row = mysql_fetch_row(result)) && + (!options.limit || count < options.limit)) + { + unsigned long* lengths = mysql_fetch_lengths(result); + if (!lengths) + continue; + + Blob rawTxn(row[0], row[0] + lengths[0]); + Blob rawMeta(row[1], row[1] + lengths[1]); + std::uint32_t ledgerSeq = + static_cast(std::stoull(row[2])); + std::uint32_t txSeq = + static_cast(std::stoull(row[3])); + + if (count == options.limit) + { + marker = AccountTxMarker{ledgerSeq, txSeq}; + break; + } + + onTransaction( + ledgerSeq, "COMMITTED", std::move(rawTxn), std::move(rawMeta)); + ++count; + } + + mysql_free_result(result); + return {ret, marker}; + } + + std::pair> + newestAccountTxPage(AccountTxPageOptions const& options) override + { + if (!useTxTables_) + return {}; + + static std::uint32_t const page_length(200); + auto onUnsavedLedger = + std::bind(saveLedgerAsync, std::ref(app_), std::placeholders::_1); + AccountTxs ret; + Application& app = app_; + auto onTransaction = [&ret, &app]( + std::uint32_t ledger_index, + std::string const& status, + Blob&& rawTxn, + Blob&& rawMeta) { + convertBlobsToTxResult( + ret, ledger_index, status, rawTxn, rawMeta, app); + }; + + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq, t.tx_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger << " AND " + << options.maxLedger + << " ORDER BY at.ledger_seq DESC, at.tx_seq DESC" + << " LIMIT " << (options.limit + 1); + + if (mysql_query(getConnection(), sql.str().c_str())) + return {}; + + MYSQL_RES* result = mysql_store_result(getConnection()); + if (!result) + return {}; + + std::optional marker; + std::size_t count = 0; + MYSQL_ROW row; + + while ((row = mysql_fetch_row(result)) && + (!options.limit || count < options.limit)) + { + unsigned long* lengths = mysql_fetch_lengths(result); + if (!lengths) + continue; + + Blob rawTxn(row[0], row[0] + lengths[0]); + Blob rawMeta(row[1], row[1] + lengths[1]); + std::uint32_t ledgerSeq = + static_cast(std::stoull(row[2])); + std::uint32_t txSeq = + static_cast(std::stoull(row[3])); + + if (count == options.limit) + { + marker = AccountTxMarker{ledgerSeq, txSeq}; + break; + } + + onTransaction( + ledgerSeq, "COMMITTED", std::move(rawTxn), std::move(rawMeta)); + ++count; + } + + mysql_free_result(result); + return {ret, marker}; + } + + bool + ledgerDbHasSpace(Config const&) override + { + // MySQL manages its own space + return true; + } + + std::vector> + getTxHistory(LedgerIndex startIndex) override + { + if (!useTxTables_) + return {}; + + std::vector> result; + std::stringstream sql; + sql << "SELECT t.raw_tx, t.ledger_seq " + << "FROM transactions t " + << "ORDER BY t.ledger_seq DESC, t.tx_seq DESC " + << "LIMIT 20 OFFSET " << startIndex; + + if (mysql_query(getConnection(), sql.str().c_str())) + return result; + + MYSQL_RES* sqlResult = mysql_store_result(getConnection()); + if (!sqlResult) + return result; + + MYSQL_ROW row; + while ((row = mysql_fetch_row(sqlResult))) + { + unsigned long* lengths = mysql_fetch_lengths(sqlResult); + if (!lengths) + continue; + + try + { + SerialIter sit(row[0], lengths[0]); + auto txn = std::make_shared(sit); + std::string reason; + auto tx = std::make_shared(txn, reason, app_); + + auto const ledgerSeq = + static_cast(std::stoull(row[1])); + tx->setStatus(COMMITTED); + tx->setLedger(ledgerSeq); + result.push_back(tx); + } + catch (std::exception const&) + { + // Skip any malformed transactions + continue; + } + } + + mysql_free_result(sqlResult); + return result; + } + + bool + transactionDbHasSpace(Config const&) override + { + // MySQL manages its own space + return true; + } + + AccountTxs + getOldestAccountTxs(AccountTxOptions const& options) override + { + if (!useTxTables_) + return {}; + + AccountTxs result; + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger << " AND " + << options.maxLedger + << " ORDER BY at.ledger_seq ASC, at.tx_seq ASC "; + + if (!options.bUnlimited) + { + sql << "LIMIT " << options.limit; + if (options.offset) + sql << " OFFSET " << options.offset; + } + + if (mysql_query(getConnection(), sql.str().c_str())) + return result; + + MYSQL_RES* sqlResult = mysql_store_result(getConnection()); + if (!sqlResult) + return result; + + MYSQL_ROW row; + while ((row = mysql_fetch_row(sqlResult))) + { + unsigned long* lengths = mysql_fetch_lengths(sqlResult); + if (!lengths) + continue; + + try + { + SerialIter sit(row[0], lengths[0]); + auto txn = std::make_shared(sit); + std::string reason; + auto tx = std::make_shared(txn, reason, app_); + + auto const ledgerSeq = + static_cast(std::stoull(row[2])); + + auto meta = std::make_shared( + txn->getTransactionID(), + ledgerSeq, + Blob(row[1], row[1] + lengths[1])); + + tx->setStatus(COMMITTED); + tx->setLedger(ledgerSeq); + + result.emplace_back(tx, meta); + } + catch (std::exception const&) + { + continue; + } + } + + mysql_free_result(sqlResult); + return result; + } + + AccountTxs + getNewestAccountTxs(AccountTxOptions const& options) override + { + if (!useTxTables_) + return {}; + + AccountTxs result; + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger << " AND " + << options.maxLedger + << " ORDER BY at.ledger_seq DESC, at.tx_seq DESC "; + + if (!options.bUnlimited) + { + sql << "LIMIT " << options.limit; + if (options.offset) + sql << " OFFSET " << options.offset; + } + + if (mysql_query(getConnection(), sql.str().c_str())) + return result; + + MYSQL_RES* sqlResult = mysql_store_result(getConnection()); + if (!sqlResult) + return result; + + MYSQL_ROW row; + while ((row = mysql_fetch_row(sqlResult))) + { + unsigned long* lengths = mysql_fetch_lengths(sqlResult); + if (!lengths) + continue; + + try + { + SerialIter sit(row[0], lengths[0]); + auto txn = std::make_shared(sit); + std::string reason; + auto tx = std::make_shared(txn, reason, app_); + + auto const ledgerSeq = + static_cast(std::stoull(row[2])); + + auto meta = std::make_shared( + txn->getTransactionID(), + ledgerSeq, + Blob(row[1], row[1] + lengths[1])); + + tx->setStatus(COMMITTED); + tx->setLedger(ledgerSeq); + + result.emplace_back(tx, meta); + } + catch (std::exception const&) + { + continue; + } + } + + mysql_free_result(sqlResult); + return result; + } + + MetaTxsList + getOldestAccountTxsB(AccountTxOptions const& options) override + { + if (!useTxTables_) + return {}; + + MetaTxsList result; + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger << " AND " + << options.maxLedger + << " ORDER BY at.ledger_seq ASC, at.tx_seq ASC "; + + if (!options.bUnlimited) + { + sql << "LIMIT " << options.limit; + if (options.offset) + sql << " OFFSET " << options.offset; + } + + if (mysql_query(getConnection(), sql.str().c_str())) + return result; + + MYSQL_RES* sqlResult = mysql_store_result(getConnection()); + if (!sqlResult) + return result; + + MYSQL_ROW row; + while ((row = mysql_fetch_row(sqlResult))) + { + unsigned long* lengths = mysql_fetch_lengths(sqlResult); + if (!lengths) + continue; + + auto const ledgerSeq = + static_cast(std::stoull(row[2])); + + result.emplace_back( + Blob(row[0], row[0] + lengths[0]), + Blob(row[1], row[1] + lengths[1]), + ledgerSeq); + } + + mysql_free_result(sqlResult); + return result; + } + + MetaTxsList + getNewestAccountTxsB(AccountTxOptions const& options) override + { + if (!useTxTables_) + return {}; + + MetaTxsList result; + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger << " AND " + << options.maxLedger + << " ORDER BY at.ledger_seq DESC, at.tx_seq DESC "; + + if (!options.bUnlimited) + { + sql << "LIMIT " << options.limit; + if (options.offset) + sql << " OFFSET " << options.offset; + } + + if (mysql_query(getConnection(), sql.str().c_str())) + return result; + + MYSQL_RES* sqlResult = mysql_store_result(getConnection()); + if (!sqlResult) + return result; + + MYSQL_ROW row; + while ((row = mysql_fetch_row(sqlResult))) + { + unsigned long* lengths = mysql_fetch_lengths(sqlResult); + if (!lengths) + continue; + + auto const ledgerSeq = + static_cast(std::stoull(row[2])); + + result.emplace_back( + Blob(row[0], row[0] + lengths[0]), + Blob(row[1], row[1] + lengths[1]), + ledgerSeq); + } + + mysql_free_result(sqlResult); + return result; + } + + std::pair> + oldestAccountTxPageB(AccountTxPageOptions const& options) override + { + if (!useTxTables_) + return {}; + + MetaTxsList result; + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq, t.tx_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger << " AND " + << options.maxLedger; + + if (options.marker) + { + sql << " AND (at.ledger_seq > " << options.marker->ledgerSeq + << " OR (at.ledger_seq = " << options.marker->ledgerSeq + << " AND at.tx_seq > " << options.marker->txnSeq << "))"; + } + + sql << " ORDER BY at.ledger_seq ASC, at.tx_seq ASC "; + sql << "LIMIT " << (options.limit + 1); + + if (mysql_query(getConnection(), sql.str().c_str())) + return {}; + + MYSQL_RES* sqlResult = mysql_store_result(getConnection()); + if (!sqlResult) + return {}; + + std::optional marker; + std::size_t count = 0; + MYSQL_ROW row; + + while ((row = mysql_fetch_row(sqlResult))) + { + if (count >= options.limit) + { + marker = AccountTxMarker{ + static_cast(std::stoull(row[2])), + static_cast(std::stoull(row[3]))}; + break; + } + + unsigned long* lengths = mysql_fetch_lengths(sqlResult); + if (!lengths) + continue; + + auto const ledgerSeq = + static_cast(std::stoull(row[2])); + + result.emplace_back( + Blob(row[0], row[0] + lengths[0]), + Blob(row[1], row[1] + lengths[1]), + ledgerSeq); + + ++count; + } + + mysql_free_result(sqlResult); + return {result, marker}; + } + + std::pair> + newestAccountTxPageB(AccountTxPageOptions const& options) override + { + if (!useTxTables_) + return {}; + + MetaTxsList result; + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq, t.tx_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger << " AND " + << options.maxLedger; + + if (options.marker) + { + sql << " AND (at.ledger_seq < " << options.marker->ledgerSeq + << " OR (at.ledger_seq = " << options.marker->ledgerSeq + << " AND at.tx_seq < " << options.marker->txnSeq << "))"; + } + + sql << " ORDER BY at.ledger_seq DESC, at.tx_seq DESC "; + sql << "LIMIT " << (options.limit + 1); + + if (mysql_query(getConnection(), sql.str().c_str())) + return {}; + + MYSQL_RES* sqlResult = mysql_store_result(getConnection()); + if (!sqlResult) + return {}; + + std::optional marker; + std::size_t count = 0; + MYSQL_ROW row; + + while ((row = mysql_fetch_row(sqlResult))) + { + if (count >= options.limit) + { + marker = AccountTxMarker{ + static_cast(std::stoull(row[2])), + static_cast(std::stoull(row[3]))}; + break; + } + + unsigned long* lengths = mysql_fetch_lengths(sqlResult); + if (!lengths) + continue; + + auto const ledgerSeq = + static_cast(std::stoull(row[2])); + + result.emplace_back( + Blob(row[0], row[0] + lengths[0]), + Blob(row[1], row[1] + lengths[1]), + ledgerSeq); + + ++count; + } + + mysql_free_result(sqlResult); + return {result, marker}; + } + + std::uint32_t + getKBUsedAll() override + { + std::uint32_t total = 0; + + // Get ledger table size + if (!mysql_query( + getConnection(), + "SELECT ROUND(SUM(data_length + index_length) / 1024) " + "FROM information_schema.tables " + "WHERE table_schema = DATABASE() " + "AND table_name = 'ledgers'")) + { + MYSQL_RES* result = mysql_store_result(getConnection()); + if (result) + { + MYSQL_ROW row = mysql_fetch_row(result); + if (row && row[0]) + total += static_cast(std::stoull(row[0])); + mysql_free_result(result); + } + } + + // Get transaction tables size + if (useTxTables_) + { + if (!mysql_query( + getConnection(), + "SELECT ROUND(SUM(data_length + index_length) / 1024) " + "FROM information_schema.tables " + "WHERE table_schema = DATABASE() " + "AND (table_name = 'transactions' " + "OR table_name = 'account_transactions')")) + { + MYSQL_RES* result = mysql_store_result(getConnection()); + if (result) + { + MYSQL_ROW row = mysql_fetch_row(result); + if (row && row[0]) + total += + static_cast(std::stoull(row[0])); + mysql_free_result(result); + } + } + } + + return total; + } + + std::uint32_t + getKBUsedLedger() override + { + std::uint32_t total = 0; + + if (!mysql_query( + getConnection(), + "SELECT ROUND(SUM(data_length + index_length) / 1024) " + "FROM information_schema.tables " + "WHERE table_schema = DATABASE() " + "AND table_name = 'ledgers'")) + { + MYSQL_RES* result = mysql_store_result(getConnection()); + if (result) + { + MYSQL_ROW row = mysql_fetch_row(result); + if (row && row[0]) + total = static_cast(std::stoull(row[0])); + mysql_free_result(result); + } + } + + return total; + } + + std::uint32_t + getKBUsedTransaction() override + { + if (!useTxTables_) + return 0; + + std::uint32_t total = 0; + + if (!mysql_query( + getConnection(), + "SELECT ROUND(SUM(data_length + index_length) / 1024) " + "FROM information_schema.tables " + "WHERE table_schema = DATABASE() " + "AND (table_name = 'transactions' " + "OR table_name = 'account_transactions')")) + { + MYSQL_RES* result = mysql_store_result(getConnection()); + if (result) + { + MYSQL_ROW row = mysql_fetch_row(result); + if (row && row[0]) + total = static_cast(std::stoull(row[0])); + mysql_free_result(result); + } + } + + return total; + } + + void + closeLedgerDB() override + { + // No explicit closing needed for MySQL + // The connection will be closed when mysql_ is destroyed + } + + void + closeTransactionDB() override + { + // No explicit closing needed for MySQL + // The connection will be closed when mysql_ is destroyed + } +}; + +// Factory function +std::unique_ptr +getMySQLDatabase(Application& app, Config const& config, JobQueue& jobQueue) +{ + return std::make_unique(app, config, jobQueue); +} +} // namespace ripple +#endif // RIPPLE_APP_RDB_BACKEND_MYSQLDATABASE_H_INCLUDED diff --git a/src/ripple/app/rdb/impl/RelationalDatabase.cpp b/src/ripple/app/rdb/impl/RelationalDatabase.cpp index 64161bd53..cc91f716b 100644 --- a/src/ripple/app/rdb/impl/RelationalDatabase.cpp +++ b/src/ripple/app/rdb/impl/RelationalDatabase.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,7 @@ RelationalDatabase::init( bool use_postgres = false; bool use_rwdb = false; bool use_flatmap = false; + bool use_mysql = false; if (config.reporting()) { @@ -64,6 +66,10 @@ RelationalDatabase::init( { use_flatmap = true; } + else if (boost::iequals(get(rdb_section, "backend"), "mysql")) + { + use_mysql = true; + } else { Throw( @@ -93,6 +99,10 @@ RelationalDatabase::init( { return getFlatmapDatabase(app, config, jobQueue); } + else if (use_mysql) + { + return getMySQLDatabase(app, config, jobQueue); + } return std::unique_ptr(); } diff --git a/src/ripple/basics/BasicConfig.h b/src/ripple/basics/BasicConfig.h index db293979f..a0714324c 100644 --- a/src/ripple/basics/BasicConfig.h +++ b/src/ripple/basics/BasicConfig.h @@ -36,6 +36,8 @@ using IniFileSections = std::map>; //------------------------------------------------------------------------------ +class Config; + /** Holds a collection of configuration values. A configuration file contains zero or more sections. */ @@ -48,11 +50,22 @@ class Section std::vector values_; bool had_trailing_comments_ = false; + Config const* parent_; + using const_iterator = decltype(lookup_)::const_iterator; public: + // throws if no parent for this section + Config const& + getParent() const + { + if (!parent_) + Throw("No parent_ for config section"); + return *parent_; + } + /** Create an empty section. */ - explicit Section(std::string const& name = ""); + explicit Section(std::string const& name = "", Config* parent = nullptr); /** Returns the name of this section. */ std::string const& @@ -218,6 +231,8 @@ class BasicConfig std::map map_; public: + virtual ~BasicConfig() = default; + /** Returns `true` if a section with the given name exists. */ bool exists(std::string const& name) const; diff --git a/src/ripple/basics/impl/BasicConfig.cpp b/src/ripple/basics/impl/BasicConfig.cpp index f557d2e6d..504b2d9c3 100644 --- a/src/ripple/basics/impl/BasicConfig.cpp +++ b/src/ripple/basics/impl/BasicConfig.cpp @@ -24,7 +24,10 @@ namespace ripple { -Section::Section(std::string const& name) : name_(name) +class Config; + +Section::Section(std::string const& name, Config* parent) + : name_(name), parent_(parent) { } @@ -175,12 +178,14 @@ BasicConfig::legacy(std::string const& sectionName) const void BasicConfig::build(IniFileSections const& ifs) { + Config* config_this = dynamic_cast(this); for (auto const& entry : ifs) { auto const result = map_.emplace( std::piecewise_construct, std::make_tuple(entry.first), - std::make_tuple(entry.first)); + std::make_tuple( + entry.first, config_this)); // Will be nullptr if cast failed result.first->second.append(entry.second); } } diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index 3e2c3c81a..4e5754797 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -175,6 +175,17 @@ class Config : public BasicConfig // Network parameters uint32_t NETWORK_ID = 0; + struct MysqlSettings + { + std::string host; + std::string user; + std::string pass; + std::string name; + uint16_t port; + }; + + std::optional mysql; + // DEPRECATED - Fee units for a reference transction. // Only provided for backwards compatibility in a couple of places static constexpr std::uint32_t FEE_UNITS_DEPRECATED = 10; diff --git a/src/ripple/core/ConfigSections.h b/src/ripple/core/ConfigSections.h index def5b3c82..a20e16135 100644 --- a/src/ripple/core/ConfigSections.h +++ b/src/ripple/core/ConfigSections.h @@ -102,6 +102,7 @@ struct ConfigSection #define SECTION_NETWORK_ID "network_id" #define SECTION_IMPORT_VL_KEYS "import_vl_keys" #define SECTION_DATAGRAM_MONITOR "datagram_monitor" +#define SECTION_MYSQL_SETTINGS "mysql_settings" } // namespace ripple diff --git a/src/ripple/core/impl/Config.cpp b/src/ripple/core/impl/Config.cpp index 7673d16ec..5ecc021f5 100644 --- a/src/ripple/core/impl/Config.cpp +++ b/src/ripple/core/impl/Config.cpp @@ -756,6 +756,30 @@ Config::loadFromString(std::string const& fileContents) SERVER_DOMAIN = strTemp; } + if (exists(SECTION_MYSQL_SETTINGS)) + { + auto const sec = section(SECTION_MYSQL_SETTINGS); + if (!sec.exists("host") || !sec.exists("user") || !sec.exists("pass") || + !sec.exists("port") || !sec.exists("name")) + { + Throw( + "[mysql_settings] requires host=, user=, pass=, name= and " + "port= keys."); + } + + MysqlSettings my; + + my.host = *sec.get("host"); + my.user = *sec.get("user"); + my.pass = *sec.get("pass"); + my.name = *sec.get("name"); + + std::string portStr = *sec.get("port"); + my.port = beast::lexicalCastThrow(portStr); + + mysql = my; + } + if (exists(SECTION_OVERLAY)) { auto const sec = section(SECTION_OVERLAY); diff --git a/src/ripple/nodestore/backend/MySQLFactory.cpp b/src/ripple/nodestore/backend/MySQLFactory.cpp new file mode 100644 index 000000000..02da4b2bb --- /dev/null +++ b/src/ripple/nodestore/backend/MySQLFactory.cpp @@ -0,0 +1,966 @@ +#ifndef RIPPLE_NODESTORE_MYSQLBACKEND_H_INCLUDED +#define RIPPLE_NODESTORE_MYSQLBACKEND_H_INCLUDED + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { +namespace NodeStore { + +// SQL statements as constants +static constexpr auto CREATE_DATABASE = R"SQL( + CREATE DATABASE IF NOT EXISTS `%s` + CHARACTER SET utf8mb4 + COLLATE utf8mb4_unicode_ci +)SQL"; + +static constexpr auto CREATE_TABLE = R"SQL( + CREATE TABLE IF NOT EXISTS `%s` ( + hash BINARY(32) PRIMARY KEY, + data MEDIUMBLOB NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + INDEX idx_created_at (created_at) + ) ENGINE=InnoDB +)SQL"; + +static constexpr auto INSERT_NODE = R"SQL( + INSERT INTO %s (hash, data) + VALUES (?, ?) + ON DUPLICATE KEY UPDATE data = VALUES(data) +)SQL"; + +static constexpr auto SET_ISOLATION_LEVEL = R"SQL( + SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED +)SQL"; + +class MySQLConnection +{ +private: + std::unique_ptr mysql_; + Config const& config_; + beast::Journal journal_; + static constexpr int MAX_RETRY_ATTEMPTS = 3; + static constexpr auto RETRY_DELAY_MS = 1000; + + bool + connect() + { + mysql_.reset(mysql_init(nullptr)); + if (!mysql_) + return false; + + // Set connection options + unsigned int timeout = 5; + mysql_options(mysql_.get(), MYSQL_OPT_CONNECT_TIMEOUT, &timeout); + uint8_t const reconnect = 1; + mysql_options(mysql_.get(), MYSQL_OPT_RECONNECT, &reconnect); + + // Connect without database first + auto* conn = mysql_real_connect( + mysql_.get(), + config_.mysql->host.c_str(), + config_.mysql->user.c_str(), + config_.mysql->pass.c_str(), + nullptr, // No database selected yet + config_.mysql->port, + nullptr, + CLIENT_MULTI_STATEMENTS); + + if (!conn) + return false; + + // Set isolation level for dirty reads + if (mysql_query(mysql_.get(), SET_ISOLATION_LEVEL)) + { + JLOG(journal_.warn()) << "Failed to set isolation level: " + << mysql_error(mysql_.get()); + return false; + } + + // Create database (unconditionally) + std::string query(1024, '\0'); + int length = snprintf( + &query[0], + query.size(), + CREATE_DATABASE, + config_.mysql->name.c_str()); + query.resize(length); + + if (mysql_query(mysql_.get(), query.c_str())) + { + JLOG(journal_.error()) + << "Failed to create database: " << mysql_error(mysql_.get()); + return false; + } + + // Now select the database + if (mysql_select_db(mysql_.get(), config_.mysql->name.c_str())) + { + JLOG(journal_.error()) + << "Failed to select database: " << mysql_error(mysql_.get()); + return false; + } + + return true; + } + +public: + MySQLConnection(Config const& config, beast::Journal journal) + : mysql_(nullptr, mysql_close), config_(config), journal_(journal) + { + if (!config_.mysql.has_value()) + throw std::runtime_error( + "[mysql_settings] stanza missing from config!"); + + if (config_.mysql->name.empty()) + throw std::runtime_error( + "Database name missing from mysql_settings!"); + + if (!connect()) + { + Throw( + std::string("Failed to connect to MySQL: ") + + (mysql_ ? mysql_error(mysql_.get()) : "initialization failed")); + } + } + + MYSQL* + get() + { + return mysql_.get(); + } + + bool + ensureConnection() + { + for (int attempt = 0; attempt < MAX_RETRY_ATTEMPTS; ++attempt) + { + if (!mysql_ || mysql_ping(mysql_.get()) != 0) + { + JLOG(journal_.warn()) + << "MySQL connection lost, attempting reconnect (attempt " + << (attempt + 1) << "/" << MAX_RETRY_ATTEMPTS << ")"; + + if (connect()) + return true; + + if (attempt < MAX_RETRY_ATTEMPTS - 1) + { + std::this_thread::sleep_for( + std::chrono::milliseconds(RETRY_DELAY_MS)); + } + } + else + { + return true; + } + } + return false; + } + + // Helper method to execute a query with retry logic + bool + executeQuery(std::string const& query) + { + for (int attempt = 0; attempt < MAX_RETRY_ATTEMPTS; ++attempt) + { + if (ensureConnection() && !mysql_query(mysql_.get(), query.c_str())) + return true; + + if (attempt < MAX_RETRY_ATTEMPTS - 1) + { + std::this_thread::sleep_for( + std::chrono::milliseconds(RETRY_DELAY_MS)); + } + } + return false; + } +}; + +static thread_local std::unique_ptr threadConnection_; + +class MySQLBackend : public Backend +{ +private: + std::string name_; + beast::Journal journal_; + bool isOpen_{false}; + Config const& config_; + static constexpr std::size_t BATCH_SIZE = 1000; + static constexpr std::size_t MAX_CACHE_SIZE = + 100000; // Maximum number of entries + static constexpr std::size_t CACHE_CLEANUP_THRESHOLD = + 120000; // When to trigger cleanup + + using DataStore = std::map>; + DataStore cache_; + std::mutex cacheMutex_; + + // LRU tracking for cache management + struct CacheEntry + { + std::chrono::steady_clock::time_point last_access; + size_t size; + bool pending{false}; + }; + + std::map cacheMetadata_; + std::mutex metadataMutex_; + std::atomic currentCacheSize_{0}; + + // Background write queue + struct WriteOp + { + uint256 hash; + std::vector data; + }; + std::queue writeQueue_; + std::mutex queueMutex_; + std::condition_variable queueCV_; + std::atomic shouldStop_{false}; + std::thread writeThread_; + + MySQLConnection* + getConnection() + { + if (!threadConnection_) + { + threadConnection_ = + std::make_unique(config_, journal_); + } + return threadConnection_.get(); + } + + std::string + sanitizeTableName(std::string name) + { + name.erase( + std::unique( + name.begin(), + std::transform( + name.begin(), + name.end(), + name.begin(), + [](char c) { return std::isalnum(c) ? c : '_'; })), + name.end()); + return "nodes_" + name; + } + + void + cleanupCache() + { + if (currentCacheSize_.load() < CACHE_CLEANUP_THRESHOLD) + return; + + // Collect entries sorted by last access time + std::vector> + entries; + { + std::lock_guard metadataLock(metadataMutex_); + for (const auto& [hash, metadata] : cacheMetadata_) + { + if (!metadata.pending) + entries.emplace_back(hash, metadata.last_access); + } + } + + // Sort by access time, oldest first + std::sort( + entries.begin(), entries.end(), [](const auto& a, const auto& b) { + return a.second < b.second; + }); + + // Remove oldest entries until we're below target size + size_t removedSize = 0; + for (const auto& entry : entries) + { + if (currentCacheSize_.load() <= MAX_CACHE_SIZE) + break; + + { + std::lock_guard metadataLock(metadataMutex_); + auto metaIt = cacheMetadata_.find(entry.first); + if (metaIt != cacheMetadata_.end()) + { + removedSize += metaIt->second.size; + cacheMetadata_.erase(metaIt); + } + } + { + std::lock_guard cacheLock(cacheMutex_); + cache_.erase(entry.first); + } + currentCacheSize_--; + } + + JLOG(journal_.debug()) + << "Cache cleanup removed " << removedSize + << " bytes, current size: " << currentCacheSize_.load(); + } + + void + updateCacheMetadata(const uint256& hash, size_t size) + { + CacheEntry entry{std::chrono::steady_clock::now(), size}; + { + std::lock_guard metadataLock(metadataMutex_); + cacheMetadata_[hash] = entry; + } + + if (++currentCacheSize_ >= CACHE_CLEANUP_THRESHOLD) + { + cleanupCache(); + } + } + + Status + fetch(void const* key, std::shared_ptr* pObject) override + { + if (!isOpen_) + return notFound; + + uint256 const hash(uint256::fromVoid(key)); + + // Check cache first + { + std::lock_guard cacheLock(cacheMutex_); + auto it = cache_.find(hash); + if (it != cache_.end()) + { + // Update access time + { + std::lock_guard metadataLock(metadataMutex_); + auto metaIt = cacheMetadata_.find(hash); + if (metaIt != cacheMetadata_.end()) + { + metaIt->second.last_access = + std::chrono::steady_clock::now(); + } + } + + nudb::detail::buffer decompressed; + auto const result = nodeobject_decompress( + it->second.data(), it->second.size(), decompressed); + + DecodedBlob decoded(hash.data(), result.first, result.second); + if (decoded.wasOk()) + { + *pObject = decoded.createObject(); + return ok; + } + } + } + + // If not in cache, fetch from MySQL + return fetchFromMySQL(key, pObject); + } + + void + startWriteThread() + { + writeThread_ = std::thread([this]() { + while (!shouldStop_) + { + std::vector batch; + { + std::unique_lock lock(queueMutex_); + queueCV_.wait_for( + lock, std::chrono::milliseconds(100), [this]() { + return !writeQueue_.empty() || shouldStop_; + }); + + // Grab up to BATCH_SIZE operations + while (!writeQueue_.empty() && batch.size() < BATCH_SIZE) + { + batch.push_back(std::move(writeQueue_.front())); + writeQueue_.pop(); + } + } + + if (!batch.empty()) + { + auto* conn = getConnection(); + if (!conn->ensureConnection()) + continue; + + if (mysql_query(conn->get(), "START TRANSACTION")) + continue; + + bool success = true; + for (auto const& op : batch) + { + MYSQL_STMT* stmt = mysql_stmt_init(conn->get()); + if (!stmt) + { + success = false; + break; + } + + std::string const sql = "INSERT INTO " + name_ + + " (hash, data) VALUES (?, ?) " + + "ON DUPLICATE KEY UPDATE data = VALUES(data)"; + + if (mysql_stmt_prepare(stmt, sql.c_str(), sql.length())) + { + mysql_stmt_close(stmt); + success = false; + break; + } + + MYSQL_BIND bind[2]; + std::memset(bind, 0, sizeof(bind)); + + bind[0].buffer_type = MYSQL_TYPE_BLOB; + bind[0].buffer = const_cast( + static_cast(op.hash.data())); + bind[0].buffer_length = op.hash.size(); + + bind[1].buffer_type = MYSQL_TYPE_BLOB; + bind[1].buffer = const_cast(op.data.data()); + bind[1].buffer_length = op.data.size(); + + if (mysql_stmt_bind_param(stmt, bind)) + { + mysql_stmt_close(stmt); + success = false; + break; + } + + if (mysql_stmt_execute(stmt)) + { + mysql_stmt_close(stmt); + success = false; + break; + } + + mysql_stmt_close(stmt); + } + + if (success) + { + if (mysql_query(conn->get(), "COMMIT") == 0) + { + // Clear pending flag for successfully written + // entries + std::lock_guard metadataLock( + metadataMutex_); + for (const auto& op : batch) + { + auto it = cacheMetadata_.find(op.hash); + if (it != cacheMetadata_.end()) + it->second.pending = false; + } + } + } + else + mysql_query(conn->get(), "ROLLBACK"); + } + } + }); + } + + void + queueWrite(uint256 const& hash, std::vector const& data) + { + { + std::lock_guard metadataLock(metadataMutex_); + auto& entry = cacheMetadata_[hash]; + entry.pending = true; + } + std::lock_guard lock(queueMutex_); + writeQueue_.push({hash, data}); + queueCV_.notify_one(); + } + + Status + fetchFromMySQL(void const* key, std::shared_ptr* pObject) + { + auto* conn = getConnection(); + if (!conn->ensureConnection()) + { + JLOG(journal_.warn()) << "fetch: Failed to ensure connection"; + return dataCorrupt; + } + + uint256 const hash(uint256::fromVoid(key)); + + MYSQL_STMT* stmt = mysql_stmt_init(conn->get()); + if (!stmt) + { + JLOG(journal_.warn()) << "fetch: Failed to init stmt"; + return dataCorrupt; + } + + std::string const sql = "SELECT data FROM " + name_ + " WHERE hash = ?"; + + if (mysql_stmt_prepare(stmt, sql.c_str(), sql.length())) + { + JLOG(journal_.warn()) << "fetch: Failed to prepare stmt"; + mysql_stmt_close(stmt); + return dataCorrupt; + } + + MYSQL_BIND bindParam; + std::memset(&bindParam, 0, sizeof(bindParam)); + bindParam.buffer_type = MYSQL_TYPE_BLOB; + bindParam.buffer = + const_cast(static_cast(hash.data())); + bindParam.buffer_length = hash.size(); + + if (mysql_stmt_bind_param(stmt, &bindParam)) + { + JLOG(journal_.warn()) << "fetch: Failed to bind param"; + mysql_stmt_close(stmt); + return dataCorrupt; + } + + if (mysql_stmt_execute(stmt)) + { + mysql_stmt_close(stmt); + return notFound; + } + + MYSQL_BIND bindResult; + std::memset(&bindResult, 0, sizeof(bindResult)); + uint64_t length = 0; + +#if MYSQL_VERSION_ID < 80000 + char +#else + bool +#endif + is_null = 0; + bindResult.buffer_type = MYSQL_TYPE_BLOB; + bindResult.length = &length; + bindResult.is_null = &is_null; + + std::vector buffer(16 * 1024 * 1024); // 16MB buffer + bindResult.buffer = buffer.data(); + bindResult.buffer_length = buffer.size(); + + if (mysql_stmt_bind_result(stmt, &bindResult)) + { + JLOG(journal_.warn()) << "fetch: Failed to bind result"; + mysql_stmt_close(stmt); + return dataCorrupt; + } + + if (mysql_stmt_store_result(stmt)) + { + JLOG(journal_.warn()) << "fetch: Failed to store result"; + mysql_stmt_close(stmt); + return dataCorrupt; + } + + if (mysql_stmt_num_rows(stmt) == 0) + { + mysql_stmt_close(stmt); + return notFound; + } + + if (mysql_stmt_fetch(stmt)) + { + JLOG(journal_.warn()) << "fetch: Failed to fetch stmt"; + mysql_stmt_close(stmt); + return dataCorrupt; + } + + mysql_stmt_close(stmt); + + // Add to cache + std::vector cached_data( + buffer.begin(), buffer.begin() + length); + cache_.insert_or_assign(hash, cached_data); + updateCacheMetadata(hash, length); + + nudb::detail::buffer decompressed; + auto const result = nodeobject_decompress( + cached_data.data(), cached_data.size(), decompressed); + + DecodedBlob decoded(hash.data(), result.first, result.second); + if (!decoded.wasOk()) + { + JLOG(journal_.warn()) << "fetch: Failed to decode blob"; + return dataCorrupt; + } + + *pObject = decoded.createObject(); + return ok; + } + +public: + MySQLBackend( + std::size_t keyBytes, + Section const& keyValues, + beast::Journal journal) + : name_(sanitizeTableName(get(keyValues, "path", "nodestore"))) + , journal_(journal) + , config_(keyValues.getParent()) + { + startWriteThread(); + } + + ~MySQLBackend() + { + shouldStop_ = true; + queueCV_.notify_all(); + if (writeThread_.joinable()) + writeThread_.join(); + } + + std::string + getName() override + { + return name_; + } + + void + open(bool createIfMissing) override + { + if (isOpen_) + Throw("database already open"); + + auto* conn = getConnection(); + if (!conn->ensureConnection()) + Throw("Failed to establish MySQL connection"); + + if (createIfMissing) + createTable(); + + isOpen_ = true; + } + + bool + isOpen() override + { + return isOpen_; + } + + void + close() override + { + // Wait for write queue to empty + { + std::unique_lock lock(queueMutex_); + while (!writeQueue_.empty()) + { + queueCV_.wait(lock); + } + } + + threadConnection_.reset(); + cache_.clear(); + cacheMetadata_.clear(); + currentCacheSize_ = 0; + isOpen_ = false; + } + + std::pair>, Status> + fetchBatch(std::vector const& hashes) override + { + std::vector> results; + results.reserve(hashes.size()); + + std::vector mysqlFetch; + mysqlFetch.reserve(hashes.size()); + + // First try cache + for (auto const& h : hashes) + { + auto it = cache_.find(*h); + if (it != cache_.end()) + { + // Update access time + auto metaIt = cacheMetadata_.find(*h); + if (metaIt != cacheMetadata_.end()) + { + metaIt->second.last_access = + std::chrono::steady_clock::now(); + } + + nudb::detail::buffer decompressed; + auto const result = nodeobject_decompress( + it->second.data(), it->second.size(), decompressed); + + DecodedBlob decoded(h->data(), result.first, result.second); + if (decoded.wasOk()) + { + results.push_back(decoded.createObject()); + continue; + } + } + + mysqlFetch.push_back(h); + results.push_back(nullptr); // Placeholder for MySQL fetch + } + + // If everything was in cache, return early + if (mysqlFetch.empty()) + return {results, ok}; + + // Fetch remaining from MySQL + auto* conn = getConnection(); + if (!conn->ensureConnection()) + return {results, dataCorrupt}; + + if (mysql_query(conn->get(), "START TRANSACTION")) + return {results, dataCorrupt}; + + try + { + for (size_t i = 0; i < mysqlFetch.size(); ++i) + { + std::shared_ptr nObj; + Status status = fetchFromMySQL(mysqlFetch[i]->data(), &nObj); + + // Find the original position in results + auto originalPos = std::distance( + hashes.begin(), + std::find(hashes.begin(), hashes.end(), mysqlFetch[i])); + + results[originalPos] = (status == ok ? nObj : nullptr); + } + + if (mysql_query(conn->get(), "COMMIT")) + return {results, dataCorrupt}; + + return {results, ok}; + } + catch (...) + { + mysql_query(conn->get(), "ROLLBACK"); + throw; + } + } + + void + store(std::shared_ptr const& object) override + { + if (!isOpen_ || !object) + return; + + EncodedBlob encoded(object); + nudb::detail::buffer compressed; + auto const result = nodeobject_compress( + encoded.getData(), encoded.getSize(), compressed); + + std::vector data( + static_cast(result.first), + static_cast(result.first) + result.second); + + // Update cache immediately + cache_.insert_or_assign(object->getHash(), data); + updateCacheMetadata(object->getHash(), data.size()); + + // Queue async write to MySQL + queueWrite(object->getHash(), data); + } + + void + storeBatch(Batch const& batch) override + { + for (auto const& e : batch) + { + if (!e) + continue; + + EncodedBlob encoded(e); + nudb::detail::buffer compressed; + auto const result = nodeobject_compress( + encoded.getData(), encoded.getSize(), compressed); + + std::vector data( + static_cast(result.first), + static_cast(result.first) + result.second); + + // Update cache immediately + cache_.insert_or_assign(e->getHash(), data); + updateCacheMetadata(e->getHash(), data.size()); + + // Queue async write to MySQL + queueWrite(e->getHash(), data); + } + } + + void + sync() override + { + // Wait for write queue to empty + std::unique_lock lock(queueMutex_); + while (!writeQueue_.empty()) + { + queueCV_.wait(lock); + } + } + + void + for_each(std::function)> f) override + { + if (!isOpen_) + return; + + // First, process all cached entries + std::vector>> + cached_entries; + for (const auto& entry : cache_) + { + cached_entries.push_back(entry); + } + + for (const auto& entry : cached_entries) + { + nudb::detail::buffer decompressed; + auto const result = nodeobject_decompress( + entry.second.data(), entry.second.size(), decompressed); + + DecodedBlob decoded( + entry.first.data(), result.first, result.second); + if (decoded.wasOk()) + f(decoded.createObject()); + } + + // Then fetch any remaining entries from MySQL + auto* conn = getConnection(); + if (!conn->ensureConnection()) + return; + + if (mysql_query( + conn->get(), + ("SELECT hash, data FROM " + name_ + " ORDER BY created_at") + .c_str())) + return; + + MYSQL_RES* result = mysql_store_result(conn->get()); + if (!result) + return; + + MYSQL_ROW row; + while ((row = mysql_fetch_row(result))) + { + unsigned long* lengths = mysql_fetch_lengths(result); + if (!lengths) + continue; + + uint256 hash; + std::memcpy(hash.data(), row[0], hash.size()); + + // Skip if already processed from cache + if (cache_.find(hash) != cache_.end()) + continue; + + nudb::detail::buffer decompressed; + auto const decomp_result = nodeobject_decompress( + row[1], static_cast(lengths[1]), decompressed); + + DecodedBlob decoded( + hash.data(), decomp_result.first, decomp_result.second); + + if (decoded.wasOk()) + { + auto obj = decoded.createObject(); + f(obj); + + // Add to cache for future use + std::vector data( + reinterpret_cast(row[1]), + reinterpret_cast(row[1]) + lengths[1]); + cache_.insert_or_assign(hash, std::move(data)); + updateCacheMetadata(hash, lengths[1]); + } + } + + mysql_free_result(result); + } + + int + getWriteLoad() override + { + std::lock_guard lock(queueMutex_); + return static_cast(writeQueue_.size()); + } + + void + setDeletePath() override + { + close(); + } + + int + fdRequired() const override + { + return 1; + } + +private: + void + createTable() + { + auto* conn = getConnection(); + if (!conn->ensureConnection()) + Throw("Failed to connect to MySQL server"); + + std::string query(1024, '\0'); + int length = + snprintf(&query[0], query.size(), CREATE_TABLE, name_.c_str()); + query.resize(length); + + if (!conn->executeQuery(query)) + { + JLOG(journal_.error()) + << "Failed to create table: " << mysql_error(conn->get()); + Throw("Failed to create table"); + } + } +}; + +class MySQLFactory : public Factory +{ +public: + MySQLFactory() + { + Manager::instance().insert(*this); + } + + ~MySQLFactory() override + { + Manager::instance().erase(*this); + } + + std::string + getName() const override + { + return "MySQL"; + } + + std::unique_ptr + createInstance( + std::size_t keyBytes, + Section const& keyValues, + std::size_t burstSize, + Scheduler& scheduler, + beast::Journal journal) override + { + return std::make_unique(keyBytes, keyValues, journal); + } +}; + +static MySQLFactory mysqlFactory; + +} // namespace NodeStore +} // namespace ripple + +#endif // RIPPLE_NODESTORE_MYSQLBACKEND_H_INCLUDED