Skip to content

Commit

Permalink
feat: supplement RocksDB Metrics.
Browse files Browse the repository at this point in the history
Supplement RocksDB Metrics.

Fixes: #1559

Signed-off-by: yaoyinnan <[email protected]>
  • Loading branch information
yaoyinnan committed May 30, 2023
1 parent 47a7e64 commit 006975c
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 12 deletions.
3 changes: 3 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class InfoCmd : public Cmd {
kInfoKeyspace,
kInfoLog,
kInfoData,
kInfoRocksDB,
kInfo,
kInfoAll,
kInfoDebug
Expand Down Expand Up @@ -230,6 +231,7 @@ class InfoCmd : public Cmd {
const static std::string kReplicationSection;
const static std::string kKeyspaceSection;
const static std::string kDataSection;
const static std::string kRocksDBSection;
const static std::string kDebugSection;

void DoInitial() override;
Expand All @@ -248,6 +250,7 @@ class InfoCmd : public Cmd {
void InfoReplication(std::string& info);
void InfoKeyspace(std::string& info);
void InfoData(std::string& info);
void InfoRocksDB(std::string& info);
void InfoDebug(std::string& info);
};

Expand Down
58 changes: 48 additions & 10 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <sys/time.h>
#include <sys/utsname.h>
#include <sys/statvfs.h>

#include <algorithm>

Expand Down Expand Up @@ -632,6 +633,7 @@ const std::string InfoCmd::kCPUSection = "cpu";
const std::string InfoCmd::kReplicationSection = "replication";
const std::string InfoCmd::kKeyspaceSection = "keyspace";
const std::string InfoCmd::kDataSection = "data";
const std::string InfoCmd::kRocksDBSection = "rocksdb";
const std::string InfoCmd::kDebugSection = "debug";

void InfoCmd::DoInitial() {
Expand Down Expand Up @@ -697,6 +699,8 @@ void InfoCmd::DoInitial() {
return;
} else if (strcasecmp(argv_[1].data(), kDataSection.data()) == 0) {
info_section_ = kInfoData;
} else if (strcasecmp(argv_[1].data(), kRocksDBSection.data()) == 0) {
info_section_ = kInfoRocksDB;
} else if (strcasecmp(argv_[1].data(), kDebugSection.data()) == 0) {
info_section_ = kInfoDebug;
} else {
Expand Down Expand Up @@ -724,6 +728,8 @@ void InfoCmd::Do(std::shared_ptr<Partition> partition) {
InfoReplication(info);
info.append("\r\n");
InfoKeyspace(info);
info.append("\r\n");
InfoRocksDB(info);
break;
case kInfoAll:
InfoServer(info);
Expand All @@ -741,6 +747,8 @@ void InfoCmd::Do(std::shared_ptr<Partition> partition) {
InfoReplication(info);
info.append("\r\n");
InfoKeyspace(info);
info.append("\r\n");
InfoRocksDB(info);
break;
case kInfoServer:
InfoServer(info);
Expand All @@ -766,6 +774,9 @@ void InfoCmd::Do(std::shared_ptr<Partition> partition) {
case kInfoData:
InfoData(info);
break;
case kInfoRocksDB:
InfoRocksDB(info);
break;
case kInfoDebug:
InfoDebug(info);
break;
Expand Down Expand Up @@ -1114,29 +1125,32 @@ void InfoCmd::InfoData(std::string& info) {
tmp_stream << "compression:" << g_pika_conf->compression() << "\r\n";

// rocksdb related memory usage
std::map<std::string, uint64_t> type_result;
std::map<std::string, uint64_t> background_errors;
uint64_t total_background_errors = 0;
uint64_t total_memtable_usage = 0;
uint64_t memtable_usage = 0;
uint64_t total_table_reader_usage = 0;
uint64_t table_reader_usage = 0;
std::shared_lock table_rwl(g_pika_server->tables_rw_);
for (const auto& table_item : g_pika_server->tables_) {
if (!table_item.second) {
continue;
}
std::shared_lock partition_rwl(table_item.second->partitions_rw_);
for (const auto& patition_item : table_item.second->partitions_) {
type_result.clear();
for (const auto& partition_item : table_item.second->partitions_) {
background_errors.clear();
memtable_usage = table_reader_usage = 0;
patition_item.second->DbRWLockReader();
patition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_MEMTABLE, &memtable_usage);
patition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_TABLE_READER, &table_reader_usage);
patition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &type_result);
patition_item.second->DbRWUnLock();
partition_item.second->DbRWLockReader();
partition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_CUR_SIZE_ALL_MEM_TABLES, &memtable_usage);
partition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_ESTIMATE_TABLE_READER_MEM, &table_reader_usage);
partition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors);
partition_item.second->DbRWUnLock();
total_memtable_usage += memtable_usage;
total_table_reader_usage += table_reader_usage;
for (const auto& item : type_result) {
for (const auto& item : background_errors) {
if (item.second != 0) {
db_fatal_msg_stream << (total_background_errors != 0 ? "," : "");
db_fatal_msg_stream << patition_item.second->GetPartitionName() << "/" << item.first;
db_fatal_msg_stream << partition_item.second->GetPartitionName() << "/" << item.first;
total_background_errors += item.second;
}
}
Expand All @@ -1153,6 +1167,30 @@ void InfoCmd::InfoData(std::string& info) {
info.append(tmp_stream.str());
}

void InfoCmd::InfoRocksDB(std::string &info) {
std::stringstream tmp_stream;

tmp_stream << "# RocksDB"
<< "\r\n";

std::shared_lock table_rwl(g_pika_server->tables_rw_);
for (const auto& table_item : g_pika_server->tables_) {
if (!table_item.second) {
continue;
}
std::shared_lock partition_rwl(table_item.second->partitions_rw_);
for (const auto& partition_item : table_item.second->partitions_) {
std::string rocksdb_info;
partition_item.second->DbRWLockReader();
partition_item.second->db()->GetRocksDBInfo(rocksdb_info);
partition_item.second->DbRWUnLock();
tmp_stream << rocksdb_info;
}
}

info.append(tmp_stream.str());
}

void InfoCmd::InfoDebug(std::string& info) {
std::stringstream tmp_stream;
tmp_stream << "# Synchronization Status"
Expand Down
3 changes: 3 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,9 @@ void PikaServer::InitStorageOptions() {
storage_options_.options.compression = PikaConf::GetCompression(g_pika_conf->compression());
storage_options_.options.compression_per_level = g_pika_conf->compression_per_level();

storage_options_.options.statistics = rocksdb::CreateDBStatistics();
storage_options_.options.stats_dump_period_sec = 600;

// default l0 l1 noCompression l2 and more use `compression` option
if (storage_options_.options.compression_per_level.empty() &&
storage_options_.options.compression != rocksdb::kNoCompression) {
Expand Down
5 changes: 3 additions & 2 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ namespace storage {
inline constexpr double ZSET_SCORE_MAX = std::numeric_limits<double>::max();
inline constexpr double ZSET_SCORE_MIN = std::numeric_limits<double>::lowest();

inline const std::string PROPERTY_TYPE_ROCKSDB_MEMTABLE = "rocksdb.cur-size-all-mem-tables";
inline const std::string PROPERTY_TYPE_ROCKSDB_TABLE_READER = "rocksdb.estimate-table-readers-mem";
inline const std::string PROPERTY_TYPE_ROCKSDB_CUR_SIZE_ALL_MEM_TABLES = "rocksdb.cur-size-all-mem-tables";
inline const std::string PROPERTY_TYPE_ROCKSDB_ESTIMATE_TABLE_READER_MEM = "rocksdb.estimate-table-readers-mem";
inline const std::string PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS = "rocksdb.background-errors";

inline const std::string ALL_DB = "all";
Expand Down Expand Up @@ -1019,6 +1019,7 @@ class Storage {

Status SetOptions(const OptionType& option_type, const std::string& db_type,
const std::unordered_map<std::string, std::string>& options);
void GetRocksDBInfo(std::string &info);

private:
std::unique_ptr<RedisStrings> strings_db_;
Expand Down
42 changes: 42 additions & 0 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory.

#include "src/redis.h"
#include <sstream>

namespace storage {

Expand Down Expand Up @@ -87,4 +88,45 @@ Status Redis::SetOptions(const OptionType& option_type, const std::unordered_map
return s;
}

void Redis::GetRocksDBInfo(std::string &info, const char *prefix) {
std::ostringstream string_stream;
string_stream << "#" << prefix << " RocksDB"
<< "\r\n";

auto write_stream_key_value=[&](const Slice& property, const char *metric) {
uint64_t value;
db_->GetAggregatedIntProperty(property, &value);
string_stream << prefix << metric << value << "\r\n";
};
write_stream_key_value(rocksdb::DB::Properties::kBackgroundErrors, "background_errors:");
write_stream_key_value(rocksdb::DB::Properties::kSizeAllMemTables, "size_all_mem_tables:");
write_stream_key_value(rocksdb::DB::Properties::kCompactionPending, "compaction_pending:");
write_stream_key_value(rocksdb::DB::Properties::kCurSizeAllMemTables, "cur_size_all_mem_tables:");
write_stream_key_value(rocksdb::DB::Properties::kCurrentSuperVersionNumber, "current_super_version_number:");
write_stream_key_value(rocksdb::DB::Properties::kMemTableFlushPending, "mem_table_flush_pending:");
write_stream_key_value(rocksdb::DB::Properties::kNumImmutableMemTable, "num_immutable_mem_table:");
write_stream_key_value(rocksdb::DB::Properties::kNumLiveVersions, "num_live_versions:");
write_stream_key_value(rocksdb::DB::Properties::kNumRunningCompactions, "num_running_compactions:");
write_stream_key_value(rocksdb::DB::Properties::kNumRunningFlushes, "num_running_flushes:");
write_stream_key_value(rocksdb::DB::Properties::kNumSnapshots, "num_snapshots:");
write_stream_key_value(rocksdb::DB::Properties::kEstimateNumKeys, "estimate_num_keys:");
write_stream_key_value(rocksdb::DB::Properties::kEstimateTableReadersMem, "estimate_table_readers_mem:");
write_stream_key_value(rocksdb::DB::Properties::kBlockCacheUsage, "block_cache_usage:");
write_stream_key_value(rocksdb::DB::Properties::kBlockCachePinnedUsage, "block_cache_pinned_usage:");

std::map<std::string, std::string> cf_stats_map;
auto write_stream_strings_strings=[&](const char* strkey, const char *map_key) {
string_stream << prefix << strkey << ":" << cf_stats_map[map_key] << "\r\n";
};
db_->GetMapProperty(rocksdb::DB::Properties::kCFStats, &cf_stats_map);
write_stream_strings_strings("level0_numfiles_", "io_stalls.level0_numfiles");
write_stream_strings_strings("level0_slowdown_", "io_stalls.level0_slowdown");
write_stream_strings_strings("memtable_compaction_", "io_stalls.memtable_compaction");
write_stream_strings_strings("memtable_slowdown_", "io_stalls.memtable_slowdown");
write_stream_strings_strings("stop_for_pending_compaction_bytes_", "io_stalls.stop_for_pending_compaction_bytes");
write_stream_strings_strings("slowdown_for_pending_compaction_bytes_", "io_stalls.slowdown_for_pending_compaction_bytes");

info.append(string_stream.str());
}

} // namespace storage
1 change: 1 addition & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class Redis {

Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys);
Status SetSmallCompactionThreshold(size_t small_compaction_threshold);
void GetRocksDBInfo(std::string &info, const char *prefix);

protected:
Storage* const storage_;
Expand Down
8 changes: 8 additions & 0 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1749,4 +1749,12 @@ Status Storage::SetOptions(const OptionType& option_type, const std::string& db_
return s;
}

void Storage::GetRocksDBInfo(std::string &info) {
strings_db_->GetRocksDBInfo(info, "strings_");
hashes_db_->GetRocksDBInfo(info, "hashes_");
lists_db_->GetRocksDBInfo(info, "lists_");
sets_db_->GetRocksDBInfo(info, "sets_");
zsets_db_->GetRocksDBInfo(info, "zsets_");
}

} // namespace storage

0 comments on commit 006975c

Please sign in to comment.