Skip to content

Commit

Permalink
feat: supplement RocksDB Metrics. (OpenAtomFoundation#1560)
Browse files Browse the repository at this point in the history
* feat: supplement RocksDB Metrics on pika info command.

Supplement RocksDB Metrics on pika info command.

Fixes: OpenAtomFoundation#1559

Signed-off-by: yaoyinnan <[email protected]>

* feat: parser RocksDB Metrics on pika exporter.

Parser RocksDB Metrics on pika exporter.

Fixes: OpenAtomFoundation#1559

Signed-off-by: yaoyinnan <[email protected]>

* feat: add RocksDB metrics in grafana.

Add RocksDB metrics in grafana.

Fixes: OpenAtomFoundation#1559

Signed-off-by: yaoyinnan <[email protected]>

* feat: support darwin for pika_exporter.

Support darwin for pika_exporter.

Fixes: OpenAtomFoundation#1559

Signed-off-by: yaoyinnan <[email protected]>

* docs: add RocksDB metrics.

Add RocksDB metrics.

Fixes: OpenAtomFoundation#1559

Signed-off-by: yaoyinnan <[email protected]>

* fix: unified data_type.

Unified data_type.

Fixes: OpenAtomFoundation#1559

Signed-off-by: yaoyinnan <[email protected]>

* fix: capture to err of convertTimeToUnix.

Capture to err of convertTimeToUnix.

Fixes: OpenAtomFoundation#1559

Signed-off-by: yaoyinnan <[email protected]>

* feat: add test for RocksDB metrics.

Add test for RocksDB metrics.

Fixes: OpenAtomFoundation#1559

Signed-off-by: yaoyinnan <[email protected]>

* fix: fix fmt.

Fix fmt.

Fixes: OpenAtomFoundation#1559

Signed-off-by: yaoyinnan <[email protected]>

* docs: update readme.

Add the Metrics document link in README, and update the Doc link to wiki.

Fixes: OpenAtomFoundation#1559

Signed-off-by: yaoyinnan <[email protected]>

---------

Signed-off-by: yaoyinnan <[email protected]>
  • Loading branch information
yaoyinnan authored Jun 1, 2023
1 parent eb89246 commit 32d498d
Show file tree
Hide file tree
Showing 21 changed files with 7,132 additions and 3,913 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,13 @@ Here is an example usage of the script:

More details on [Performance](docs/benchmark/performance.md).

## Observability

## Documents
1. [Metrics](tools/pika_exporter/README.md)

1. [doc](docs/catalogue.md)
## Documents

1. [doc](https://github.com/OpenAtomFoundation/pika/wiki)

## Contact Us

Expand Down
6 changes: 5 additions & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,12 @@ Pika与Redis的极限QPS对比。
<img src="https://deep011.github.io/public/images/pika_benchmark/pika_vs_redis_qps.png" height = "60%" width = "60%" alt="1"/>
## 可观测性
1. [指标 Metrics](tools/pika_exporter/README.md)
## 文档
1. [doc](docs/catalogue.md)
1. [doc](https://github.com/OpenAtomFoundation/pika/wiki)
## 联系方式
Expand Down
3 changes: 3 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class InfoCmd : public Cmd {
kInfoKeyspace,
kInfoLog,
kInfoData,
kInfoRocksDB,
kInfo,
kInfoAll,
kInfoDebug
Expand Down Expand Up @@ -230,6 +231,7 @@ class InfoCmd : public Cmd {
const static std::string kReplicationSection;
const static std::string kKeyspaceSection;
const static std::string kDataSection;
const static std::string kRocksDBSection;
const static std::string kDebugSection;

void DoInitial() override;
Expand All @@ -248,6 +250,7 @@ class InfoCmd : public Cmd {
void InfoReplication(std::string& info);
void InfoKeyspace(std::string& info);
void InfoData(std::string& info);
void InfoRocksDB(std::string& info);
void InfoDebug(std::string& info);
};

Expand Down
90 changes: 60 additions & 30 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ const std::string InfoCmd::kCPUSection = "cpu";
const std::string InfoCmd::kReplicationSection = "replication";
const std::string InfoCmd::kKeyspaceSection = "keyspace";
const std::string InfoCmd::kDataSection = "data";
const std::string InfoCmd::kRocksDBSection = "rocksdb";
const std::string InfoCmd::kDebugSection = "debug";

void InfoCmd::DoInitial() {
Expand Down Expand Up @@ -697,6 +698,8 @@ void InfoCmd::DoInitial() {
return;
} else if (strcasecmp(argv_[1].data(), kDataSection.data()) == 0) {
info_section_ = kInfoData;
} else if (strcasecmp(argv_[1].data(), kRocksDBSection.data()) == 0) {
info_section_ = kInfoRocksDB;
} else if (strcasecmp(argv_[1].data(), kDebugSection.data()) == 0) {
info_section_ = kInfoDebug;
} else {
Expand Down Expand Up @@ -724,6 +727,8 @@ void InfoCmd::Do(std::shared_ptr<Partition> partition) {
InfoReplication(info);
info.append("\r\n");
InfoKeyspace(info);
info.append("\r\n");
InfoRocksDB(info);
break;
case kInfoAll:
InfoServer(info);
Expand All @@ -741,6 +746,8 @@ void InfoCmd::Do(std::shared_ptr<Partition> partition) {
InfoReplication(info);
info.append("\r\n");
InfoKeyspace(info);
info.append("\r\n");
InfoRocksDB(info);
break;
case kInfoServer:
InfoServer(info);
Expand All @@ -766,6 +773,9 @@ void InfoCmd::Do(std::shared_ptr<Partition> partition) {
case kInfoData:
InfoData(info);
break;
case kInfoRocksDB:
InfoRocksDB(info);
break;
case kInfoDebug:
InfoDebug(info);
break;
Expand Down Expand Up @@ -811,15 +821,15 @@ void InfoCmd::InfoServer(std::string& info) {

void InfoCmd::InfoClients(std::string& info) {
std::stringstream tmp_stream;
tmp_stream << "# Clients\r\n";
tmp_stream << "# Clients" << "\r\n";
tmp_stream << "connected_clients:" << g_pika_server->ClientList() << "\r\n";

info.append(tmp_stream.str());
}

void InfoCmd::InfoStats(std::string& info) {
std::stringstream tmp_stream;
tmp_stream << "# Stats\r\n";
tmp_stream << "# Stats" << "\r\n";
tmp_stream << "total_connections_received:" << g_pika_server->accumulative_connections() << "\r\n";
tmp_stream << "instantaneous_ops_per_sec:" << g_pika_server->ServerCurrentQps() << "\r\n";
tmp_stream << "total_commands_processed:" << g_pika_server->ServerQueryNum() << "\r\n";
Expand Down Expand Up @@ -852,7 +862,7 @@ void InfoCmd::InfoCPU(std::string& info) {
getrusage(RUSAGE_SELF, &self_ru);
getrusage(RUSAGE_CHILDREN, &c_ru);
std::stringstream tmp_stream;
tmp_stream << "# CPU\r\n";
tmp_stream << "# CPU" << "\r\n";
tmp_stream << "used_cpu_sys:" << std::setiosflags(std::ios::fixed) << std::setprecision(2)
<< static_cast<float>(self_ru.ru_stime.tv_sec) + static_cast<float>(self_ru.ru_stime.tv_usec) / 1000000 << "\r\n";
tmp_stream << "used_cpu_user:" << std::setiosflags(std::ios::fixed) << std::setprecision(2)
Expand Down Expand Up @@ -1047,14 +1057,12 @@ void InfoCmd::InfoKeyspace(std::string& info) {
int32_t duration;
std::vector<storage::KeyInfo> key_infos;
std::stringstream tmp_stream;
tmp_stream << "# Keyspace\r\n";
tmp_stream << "# Keyspace" << "\r\n";

if (argv_.size() == 3) { // command => `info keyspace 1`
tmp_stream << "# Start async statistics"
<< "\r\n";
tmp_stream << "# Start async statistics" << "\r\n";
} else { // command => `info keyspace` or `info`
tmp_stream << "# Use `info keyspace 1` do async statistics"
<< "\r\n";
tmp_stream << "# Use \"info keyspace 1\" do async statistics" << "\r\n";
}

std::shared_lock rwl(g_pika_server->tables_rw_);
Expand All @@ -1070,14 +1078,11 @@ void InfoCmd::InfoKeyspace(std::string& info) {
}
tmp_stream << "# Time:" << key_scan_info.s_start_time << "\r\n";
if (duration == -2) {
tmp_stream << "# Duration: "
<< "In Waiting\r\n";
tmp_stream << "# Duration: " << "In Waiting\r\n";
} else if (duration == -1) {
tmp_stream << "# Duration: "
<< "In Processing\r\n";
tmp_stream << "# Duration: " << "In Processing\r\n";
} else if (duration >= 0) {
tmp_stream << "# Duration: " << std::to_string(duration) + "s"
<< "\r\n";
tmp_stream << "# Duration: " << std::to_string(duration) + "s" << "\r\n";
}

tmp_stream << table_name << " Strings_keys=" << key_infos[0].keys << ", expires=" << key_infos[0].expires
Expand All @@ -1104,8 +1109,7 @@ void InfoCmd::InfoData(std::string& info) {
std::stringstream db_fatal_msg_stream;

int64_t db_size = pstd::Du(g_pika_conf->db_path());
tmp_stream << "# Data"
<< "\r\n";
tmp_stream << "# Data" << "\r\n";
tmp_stream << "db_size:" << db_size << "\r\n";
tmp_stream << "db_size_human:" << (db_size >> 20) << "M\r\n";
int64_t log_size = pstd::Du(g_pika_conf->log_path());
Expand All @@ -1114,29 +1118,32 @@ void InfoCmd::InfoData(std::string& info) {
tmp_stream << "compression:" << g_pika_conf->compression() << "\r\n";

// rocksdb related memory usage
std::map<std::string, uint64_t> type_result;
std::map<std::string, uint64_t> background_errors;
uint64_t total_background_errors = 0;
uint64_t total_memtable_usage = 0;
uint64_t memtable_usage = 0;
uint64_t total_table_reader_usage = 0;
uint64_t table_reader_usage = 0;
std::shared_lock table_rwl(g_pika_server->tables_rw_);
for (const auto& table_item : g_pika_server->tables_) {
if (!table_item.second) {
continue;
}
std::shared_lock partition_rwl(table_item.second->partitions_rw_);
for (const auto& patition_item : table_item.second->partitions_) {
type_result.clear();
for (const auto& partition_item : table_item.second->partitions_) {
background_errors.clear();
memtable_usage = table_reader_usage = 0;
patition_item.second->DbRWLockReader();
patition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_MEMTABLE, &memtable_usage);
patition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_TABLE_READER, &table_reader_usage);
patition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &type_result);
patition_item.second->DbRWUnLock();
partition_item.second->DbRWLockReader();
partition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_CUR_SIZE_ALL_MEM_TABLES, &memtable_usage);
partition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_ESTIMATE_TABLE_READER_MEM, &table_reader_usage);
partition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors);
partition_item.second->DbRWUnLock();
total_memtable_usage += memtable_usage;
total_table_reader_usage += table_reader_usage;
for (const auto& item : type_result) {
for (const auto& item : background_errors) {
if (item.second != 0) {
db_fatal_msg_stream << (total_background_errors != 0 ? "," : "");
db_fatal_msg_stream << patition_item.second->GetPartitionName() << "/" << item.first;
db_fatal_msg_stream << partition_item.second->GetPartitionName() << "/" << item.first;
total_background_errors += item.second;
}
}
Expand All @@ -1153,16 +1160,39 @@ void InfoCmd::InfoData(std::string& info) {
info.append(tmp_stream.str());
}

void InfoCmd::InfoRocksDB(std::string &info) {
std::stringstream tmp_stream;

tmp_stream << "# RocksDB" << "\r\n";

std::shared_lock table_rwl(g_pika_server->tables_rw_);
for (const auto& table_item : g_pika_server->tables_) {
if (!table_item.second) {
continue;
}
std::shared_lock partition_rwl(table_item.second->partitions_rw_);
for (const auto& partition_item : table_item.second->partitions_) {
std::string rocksdb_info;
partition_item.second->DbRWLockReader();
partition_item.second->db()->GetRocksDBInfo(rocksdb_info);
partition_item.second->DbRWUnLock();
tmp_stream << rocksdb_info;
}
}

info.append(tmp_stream.str());
}

void InfoCmd::InfoDebug(std::string& info) {
std::stringstream tmp_stream;
tmp_stream << "# Synchronization Status"
<< "\r\n";
tmp_stream << "# Synchronization Status"<< "\r\n";

info.append(tmp_stream.str());
g_pika_rm->RmStatus(&info);

tmp_stream.str(std::string());
tmp_stream << "# Running Status "
<< "\r\n";
tmp_stream << "# Running Status " << "\r\n";

info.append(tmp_stream.str());
g_pika_server->ServerStatus(&info);
}
Expand Down
5 changes: 3 additions & 2 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ namespace storage {
inline constexpr double ZSET_SCORE_MAX = std::numeric_limits<double>::max();
inline constexpr double ZSET_SCORE_MIN = std::numeric_limits<double>::lowest();

inline const std::string PROPERTY_TYPE_ROCKSDB_MEMTABLE = "rocksdb.cur-size-all-mem-tables";
inline const std::string PROPERTY_TYPE_ROCKSDB_TABLE_READER = "rocksdb.estimate-table-readers-mem";
inline const std::string PROPERTY_TYPE_ROCKSDB_CUR_SIZE_ALL_MEM_TABLES = "rocksdb.cur-size-all-mem-tables";
inline const std::string PROPERTY_TYPE_ROCKSDB_ESTIMATE_TABLE_READER_MEM = "rocksdb.estimate-table-readers-mem";
inline const std::string PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS = "rocksdb.background-errors";

inline const std::string ALL_DB = "all";
Expand Down Expand Up @@ -1019,6 +1019,7 @@ class Storage {

Status SetOptions(const OptionType& option_type, const std::string& db_type,
const std::unordered_map<std::string, std::string>& options);
void GetRocksDBInfo(std::string &info);

private:
std::unique_ptr<RedisStrings> strings_db_;
Expand Down
63 changes: 63 additions & 0 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory.

#include "src/redis.h"
#include <sstream>

namespace storage {

Expand Down Expand Up @@ -87,4 +88,66 @@ Status Redis::SetOptions(const OptionType& option_type, const std::unordered_map
return s;
}

void Redis::GetRocksDBInfo(std::string &info, const char *prefix) {
std::ostringstream string_stream;
string_stream << "#" << prefix << " RocksDB" << "\r\n";

auto write_stream_key_value=[&](const Slice& property, const char *metric) {
uint64_t value;
db_->GetAggregatedIntProperty(property, &value);
string_stream << prefix << metric << ':' << value << "\r\n";
};

// memtables num
write_stream_key_value(rocksdb::DB::Properties::kNumImmutableMemTable, "num_immutable_mem_table");
write_stream_key_value(rocksdb::DB::Properties::kNumImmutableMemTableFlushed, "num_immutable_mem_table_flushed");
write_stream_key_value(rocksdb::DB::Properties::kMemTableFlushPending, "mem_table_flush_pending");
write_stream_key_value(rocksdb::DB::Properties::kNumRunningFlushes, "num_running_flushes");

// compaction
write_stream_key_value(rocksdb::DB::Properties::kCompactionPending, "compaction_pending");
write_stream_key_value(rocksdb::DB::Properties::kNumRunningCompactions, "num_running_compactions");

// background errors
write_stream_key_value(rocksdb::DB::Properties::kBackgroundErrors, "background_errors");

// memtables size
write_stream_key_value(rocksdb::DB::Properties::kCurSizeActiveMemTable, "cur_size_active_mem_table");
write_stream_key_value(rocksdb::DB::Properties::kCurSizeAllMemTables, "cur_size_all_mem_tables");
write_stream_key_value(rocksdb::DB::Properties::kSizeAllMemTables, "size_all_mem_tables");

// keys
write_stream_key_value(rocksdb::DB::Properties::kEstimateNumKeys, "estimate_num_keys");

// table readers mem
write_stream_key_value(rocksdb::DB::Properties::kEstimateTableReadersMem, "estimate_table_readers_mem");

// snapshot
write_stream_key_value(rocksdb::DB::Properties::kNumSnapshots, "num_snapshots");

// version
write_stream_key_value(rocksdb::DB::Properties::kNumLiveVersions, "num_live_versions");
write_stream_key_value(rocksdb::DB::Properties::kCurrentSuperVersionNumber, "current_super_version_number");

// live data size
write_stream_key_value(rocksdb::DB::Properties::kEstimateLiveDataSize, "estimate_live_data_size");

// sst files
write_stream_key_value(rocksdb::DB::Properties::kTotalSstFilesSize, "total_sst_files_size");
write_stream_key_value(rocksdb::DB::Properties::kLiveSstFilesSize, "live_sst_files_size");

// block cache
write_stream_key_value(rocksdb::DB::Properties::kBlockCacheCapacity, "block_cache_capacity");
write_stream_key_value(rocksdb::DB::Properties::kBlockCacheUsage, "block_cache_usage");
write_stream_key_value(rocksdb::DB::Properties::kBlockCachePinnedUsage, "block_cache_pinned_usage");

// blob files
write_stream_key_value(rocksdb::DB::Properties::kNumBlobFiles, "num_blob_files");
write_stream_key_value(rocksdb::DB::Properties::kBlobStats, "blob_stats");
write_stream_key_value(rocksdb::DB::Properties::kTotalBlobFileSize, "total_blob_file_size");
write_stream_key_value(rocksdb::DB::Properties::kLiveBlobFileSize, "live_blob_file_size");

info.append(string_stream.str());
}

} // namespace storage
1 change: 1 addition & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class Redis {

Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys);
Status SetSmallCompactionThreshold(size_t small_compaction_threshold);
void GetRocksDBInfo(std::string &info, const char *prefix);

protected:
Storage* const storage_;
Expand Down
8 changes: 8 additions & 0 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1749,4 +1749,12 @@ Status Storage::SetOptions(const OptionType& option_type, const std::string& db_
return s;
}

void Storage::GetRocksDBInfo(std::string &info) {
strings_db_->GetRocksDBInfo(info, "strings_");
hashes_db_->GetRocksDBInfo(info, "hashes_");
lists_db_->GetRocksDBInfo(info, "lists_");
sets_db_->GetRocksDBInfo(info, "sets_");
zsets_db_->GetRocksDBInfo(info, "zsets_");
}

} // namespace storage
7 changes: 6 additions & 1 deletion tools/pika_exporter/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

# export PATH := $(PATH):$(GOPATH)/bin

# for mac
UNAME := $(shell uname)
# for mac
BRANCH := $(shell git branch | sed 's/* \(.*\)/\1/p')
# for Linux
# BRANCH := $(shell git branch | sed --quiet 's/* \(.*\)/\1/p')
Expand Down Expand Up @@ -62,7 +63,11 @@ export TEST_COVER
all: build

build: deps
ifeq ($(UNAME), Linux)
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o bin/$(PROJNAME)
else ifeq ($(UNAME), Darwin)
CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 go build -o bin/$(PROJNAME)
endif

deps: generateVer
@mkdir -p bin
Expand Down
Loading

0 comments on commit 32d498d

Please sign in to comment.