Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: supplement RocksDB Metrics. #1560

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,13 @@ Here is an example usage of the script:

More details on [Performance](docs/benchmark/performance.md).

## Observability

## Documents
1. [Metrics](tools/pika_exporter/README.md)

1. [doc](docs/catalogue.md)
## Documents

1. [doc](https://github.com/OpenAtomFoundation/pika/wiki)

## Contact Us

Expand Down
6 changes: 5 additions & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,12 @@ Pika与Redis的极限QPS对比。

<img src="https://deep011.github.io/public/images/pika_benchmark/pika_vs_redis_qps.png" height = "60%" width = "60%" alt="1"/>

## 可观测性

1. [指标 Metrics](tools/pika_exporter/README.md)

## 文档
1. [doc](docs/catalogue.md)
1. [doc](https://github.com/OpenAtomFoundation/pika/wiki)

## 联系方式

Expand Down
3 changes: 3 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class InfoCmd : public Cmd {
kInfoKeyspace,
kInfoLog,
kInfoData,
kInfoRocksDB,
kInfo,
kInfoAll,
kInfoDebug
Expand Down Expand Up @@ -230,6 +231,7 @@ class InfoCmd : public Cmd {
const static std::string kReplicationSection;
const static std::string kKeyspaceSection;
const static std::string kDataSection;
const static std::string kRocksDBSection;
const static std::string kDebugSection;

void DoInitial() override;
Expand All @@ -248,6 +250,7 @@ class InfoCmd : public Cmd {
void InfoReplication(std::string& info);
void InfoKeyspace(std::string& info);
void InfoData(std::string& info);
void InfoRocksDB(std::string& info);
void InfoDebug(std::string& info);
};

Expand Down
90 changes: 60 additions & 30 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ const std::string InfoCmd::kCPUSection = "cpu";
const std::string InfoCmd::kReplicationSection = "replication";
const std::string InfoCmd::kKeyspaceSection = "keyspace";
const std::string InfoCmd::kDataSection = "data";
const std::string InfoCmd::kRocksDBSection = "rocksdb";
const std::string InfoCmd::kDebugSection = "debug";

void InfoCmd::DoInitial() {
Expand Down Expand Up @@ -697,6 +698,8 @@ void InfoCmd::DoInitial() {
return;
} else if (strcasecmp(argv_[1].data(), kDataSection.data()) == 0) {
info_section_ = kInfoData;
} else if (strcasecmp(argv_[1].data(), kRocksDBSection.data()) == 0) {
info_section_ = kInfoRocksDB;
} else if (strcasecmp(argv_[1].data(), kDebugSection.data()) == 0) {
info_section_ = kInfoDebug;
} else {
Expand Down Expand Up @@ -724,6 +727,8 @@ void InfoCmd::Do(std::shared_ptr<Partition> partition) {
InfoReplication(info);
info.append("\r\n");
InfoKeyspace(info);
info.append("\r\n");
InfoRocksDB(info);
break;
case kInfoAll:
InfoServer(info);
Expand All @@ -741,6 +746,8 @@ void InfoCmd::Do(std::shared_ptr<Partition> partition) {
InfoReplication(info);
info.append("\r\n");
InfoKeyspace(info);
info.append("\r\n");
InfoRocksDB(info);
break;
case kInfoServer:
InfoServer(info);
Expand All @@ -766,6 +773,9 @@ void InfoCmd::Do(std::shared_ptr<Partition> partition) {
case kInfoData:
InfoData(info);
break;
case kInfoRocksDB:
InfoRocksDB(info);
break;
case kInfoDebug:
InfoDebug(info);
break;
Expand Down Expand Up @@ -811,15 +821,15 @@ void InfoCmd::InfoServer(std::string& info) {

void InfoCmd::InfoClients(std::string& info) {
std::stringstream tmp_stream;
tmp_stream << "# Clients\r\n";
tmp_stream << "# Clients" << "\r\n";
tmp_stream << "connected_clients:" << g_pika_server->ClientList() << "\r\n";

info.append(tmp_stream.str());
}

void InfoCmd::InfoStats(std::string& info) {
std::stringstream tmp_stream;
tmp_stream << "# Stats\r\n";
tmp_stream << "# Stats" << "\r\n";
tmp_stream << "total_connections_received:" << g_pika_server->accumulative_connections() << "\r\n";
tmp_stream << "instantaneous_ops_per_sec:" << g_pika_server->ServerCurrentQps() << "\r\n";
tmp_stream << "total_commands_processed:" << g_pika_server->ServerQueryNum() << "\r\n";
Expand Down Expand Up @@ -852,7 +862,7 @@ void InfoCmd::InfoCPU(std::string& info) {
getrusage(RUSAGE_SELF, &self_ru);
getrusage(RUSAGE_CHILDREN, &c_ru);
std::stringstream tmp_stream;
tmp_stream << "# CPU\r\n";
tmp_stream << "# CPU" << "\r\n";
tmp_stream << "used_cpu_sys:" << std::setiosflags(std::ios::fixed) << std::setprecision(2)
<< static_cast<float>(self_ru.ru_stime.tv_sec) + static_cast<float>(self_ru.ru_stime.tv_usec) / 1000000 << "\r\n";
tmp_stream << "used_cpu_user:" << std::setiosflags(std::ios::fixed) << std::setprecision(2)
Expand Down Expand Up @@ -1047,14 +1057,12 @@ void InfoCmd::InfoKeyspace(std::string& info) {
int32_t duration;
std::vector<storage::KeyInfo> key_infos;
std::stringstream tmp_stream;
tmp_stream << "# Keyspace\r\n";
tmp_stream << "# Keyspace" << "\r\n";

if (argv_.size() == 3) { // command => `info keyspace 1`
tmp_stream << "# Start async statistics"
<< "\r\n";
tmp_stream << "# Start async statistics" << "\r\n";
} else { // command => `info keyspace` or `info`
tmp_stream << "# Use `info keyspace 1` do async statistics"
<< "\r\n";
tmp_stream << "# Use \"info keyspace 1\" do async statistics" << "\r\n";
}

std::shared_lock rwl(g_pika_server->tables_rw_);
Expand All @@ -1070,14 +1078,11 @@ void InfoCmd::InfoKeyspace(std::string& info) {
}
tmp_stream << "# Time:" << key_scan_info.s_start_time << "\r\n";
if (duration == -2) {
tmp_stream << "# Duration: "
<< "In Waiting\r\n";
tmp_stream << "# Duration: " << "In Waiting\r\n";
} else if (duration == -1) {
tmp_stream << "# Duration: "
<< "In Processing\r\n";
tmp_stream << "# Duration: " << "In Processing\r\n";
} else if (duration >= 0) {
tmp_stream << "# Duration: " << std::to_string(duration) + "s"
<< "\r\n";
tmp_stream << "# Duration: " << std::to_string(duration) + "s" << "\r\n";
}

tmp_stream << table_name << " Strings_keys=" << key_infos[0].keys << ", expires=" << key_infos[0].expires
Expand All @@ -1104,8 +1109,7 @@ void InfoCmd::InfoData(std::string& info) {
std::stringstream db_fatal_msg_stream;

int64_t db_size = pstd::Du(g_pika_conf->db_path());
tmp_stream << "# Data"
<< "\r\n";
tmp_stream << "# Data" << "\r\n";
tmp_stream << "db_size:" << db_size << "\r\n";
tmp_stream << "db_size_human:" << (db_size >> 20) << "M\r\n";
int64_t log_size = pstd::Du(g_pika_conf->log_path());
Expand All @@ -1114,29 +1118,32 @@ void InfoCmd::InfoData(std::string& info) {
tmp_stream << "compression:" << g_pika_conf->compression() << "\r\n";

// rocksdb related memory usage
std::map<std::string, uint64_t> type_result;
std::map<std::string, uint64_t> background_errors;
uint64_t total_background_errors = 0;
uint64_t total_memtable_usage = 0;
uint64_t memtable_usage = 0;
uint64_t total_table_reader_usage = 0;
uint64_t table_reader_usage = 0;
std::shared_lock table_rwl(g_pika_server->tables_rw_);
for (const auto& table_item : g_pika_server->tables_) {
if (!table_item.second) {
continue;
}
std::shared_lock partition_rwl(table_item.second->partitions_rw_);
for (const auto& patition_item : table_item.second->partitions_) {
type_result.clear();
for (const auto& partition_item : table_item.second->partitions_) {
background_errors.clear();
memtable_usage = table_reader_usage = 0;
patition_item.second->DbRWLockReader();
patition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_MEMTABLE, &memtable_usage);
patition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_TABLE_READER, &table_reader_usage);
patition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &type_result);
patition_item.second->DbRWUnLock();
partition_item.second->DbRWLockReader();
partition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_CUR_SIZE_ALL_MEM_TABLES, &memtable_usage);
partition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_ESTIMATE_TABLE_READER_MEM, &table_reader_usage);
partition_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors);
partition_item.second->DbRWUnLock();
total_memtable_usage += memtable_usage;
total_table_reader_usage += table_reader_usage;
for (const auto& item : type_result) {
for (const auto& item : background_errors) {
if (item.second != 0) {
db_fatal_msg_stream << (total_background_errors != 0 ? "," : "");
db_fatal_msg_stream << patition_item.second->GetPartitionName() << "/" << item.first;
db_fatal_msg_stream << partition_item.second->GetPartitionName() << "/" << item.first;
total_background_errors += item.second;
}
}
Expand All @@ -1153,16 +1160,39 @@ void InfoCmd::InfoData(std::string& info) {
info.append(tmp_stream.str());
}

void InfoCmd::InfoRocksDB(std::string &info) {
std::stringstream tmp_stream;

tmp_stream << "# RocksDB" << "\r\n";

std::shared_lock table_rwl(g_pika_server->tables_rw_);
for (const auto& table_item : g_pika_server->tables_) {
if (!table_item.second) {
continue;
}
std::shared_lock partition_rwl(table_item.second->partitions_rw_);
for (const auto& partition_item : table_item.second->partitions_) {
std::string rocksdb_info;
partition_item.second->DbRWLockReader();
partition_item.second->db()->GetRocksDBInfo(rocksdb_info);
partition_item.second->DbRWUnLock();
tmp_stream << rocksdb_info;
}
}

info.append(tmp_stream.str());
}

void InfoCmd::InfoDebug(std::string& info) {
std::stringstream tmp_stream;
tmp_stream << "# Synchronization Status"
<< "\r\n";
tmp_stream << "# Synchronization Status"<< "\r\n";

info.append(tmp_stream.str());
g_pika_rm->RmStatus(&info);

tmp_stream.str(std::string());
tmp_stream << "# Running Status "
<< "\r\n";
tmp_stream << "# Running Status " << "\r\n";

info.append(tmp_stream.str());
g_pika_server->ServerStatus(&info);
}
Expand Down
5 changes: 3 additions & 2 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ namespace storage {
inline constexpr double ZSET_SCORE_MAX = std::numeric_limits<double>::max();
inline constexpr double ZSET_SCORE_MIN = std::numeric_limits<double>::lowest();

inline const std::string PROPERTY_TYPE_ROCKSDB_MEMTABLE = "rocksdb.cur-size-all-mem-tables";
inline const std::string PROPERTY_TYPE_ROCKSDB_TABLE_READER = "rocksdb.estimate-table-readers-mem";
inline const std::string PROPERTY_TYPE_ROCKSDB_CUR_SIZE_ALL_MEM_TABLES = "rocksdb.cur-size-all-mem-tables";
inline const std::string PROPERTY_TYPE_ROCKSDB_ESTIMATE_TABLE_READER_MEM = "rocksdb.estimate-table-readers-mem";
inline const std::string PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS = "rocksdb.background-errors";

inline const std::string ALL_DB = "all";
Expand Down Expand Up @@ -1019,6 +1019,7 @@ class Storage {

Status SetOptions(const OptionType& option_type, const std::string& db_type,
const std::unordered_map<std::string, std::string>& options);
void GetRocksDBInfo(std::string &info);

private:
std::unique_ptr<RedisStrings> strings_db_;
Expand Down
63 changes: 63 additions & 0 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory.

#include "src/redis.h"
#include <sstream>

namespace storage {

Expand Down Expand Up @@ -87,4 +88,66 @@ Status Redis::SetOptions(const OptionType& option_type, const std::unordered_map
return s;
}

void Redis::GetRocksDBInfo(std::string &info, const char *prefix) {
std::ostringstream string_stream;
string_stream << "#" << prefix << " RocksDB" << "\r\n";

auto write_stream_key_value=[&](const Slice& property, const char *metric) {
uint64_t value;
db_->GetAggregatedIntProperty(property, &value);
string_stream << prefix << metric << ':' << value << "\r\n";
};

// memtables num
write_stream_key_value(rocksdb::DB::Properties::kNumImmutableMemTable, "num_immutable_mem_table");
write_stream_key_value(rocksdb::DB::Properties::kNumImmutableMemTableFlushed, "num_immutable_mem_table_flushed");
write_stream_key_value(rocksdb::DB::Properties::kMemTableFlushPending, "mem_table_flush_pending");
write_stream_key_value(rocksdb::DB::Properties::kNumRunningFlushes, "num_running_flushes");

// compaction
write_stream_key_value(rocksdb::DB::Properties::kCompactionPending, "compaction_pending");
write_stream_key_value(rocksdb::DB::Properties::kNumRunningCompactions, "num_running_compactions");

// background errors
write_stream_key_value(rocksdb::DB::Properties::kBackgroundErrors, "background_errors");

// memtables size
write_stream_key_value(rocksdb::DB::Properties::kCurSizeActiveMemTable, "cur_size_active_mem_table");
write_stream_key_value(rocksdb::DB::Properties::kCurSizeAllMemTables, "cur_size_all_mem_tables");
write_stream_key_value(rocksdb::DB::Properties::kSizeAllMemTables, "size_all_mem_tables");

// keys
write_stream_key_value(rocksdb::DB::Properties::kEstimateNumKeys, "estimate_num_keys");

// table readers mem
write_stream_key_value(rocksdb::DB::Properties::kEstimateTableReadersMem, "estimate_table_readers_mem");

// snapshot
write_stream_key_value(rocksdb::DB::Properties::kNumSnapshots, "num_snapshots");

// version
write_stream_key_value(rocksdb::DB::Properties::kNumLiveVersions, "num_live_versions");
write_stream_key_value(rocksdb::DB::Properties::kCurrentSuperVersionNumber, "current_super_version_number");

// live data size
write_stream_key_value(rocksdb::DB::Properties::kEstimateLiveDataSize, "estimate_live_data_size");

// sst files
write_stream_key_value(rocksdb::DB::Properties::kTotalSstFilesSize, "total_sst_files_size");
write_stream_key_value(rocksdb::DB::Properties::kLiveSstFilesSize, "live_sst_files_size");

// block cache
write_stream_key_value(rocksdb::DB::Properties::kBlockCacheCapacity, "block_cache_capacity");
write_stream_key_value(rocksdb::DB::Properties::kBlockCacheUsage, "block_cache_usage");
write_stream_key_value(rocksdb::DB::Properties::kBlockCachePinnedUsage, "block_cache_pinned_usage");

// blob files
write_stream_key_value(rocksdb::DB::Properties::kNumBlobFiles, "num_blob_files");
write_stream_key_value(rocksdb::DB::Properties::kBlobStats, "blob_stats");
write_stream_key_value(rocksdb::DB::Properties::kTotalBlobFileSize, "total_blob_file_size");
write_stream_key_value(rocksdb::DB::Properties::kLiveBlobFileSize, "live_blob_file_size");

info.append(string_stream.str());
}

} // namespace storage
1 change: 1 addition & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class Redis {

Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys);
Status SetSmallCompactionThreshold(size_t small_compaction_threshold);
void GetRocksDBInfo(std::string &info, const char *prefix);

protected:
Storage* const storage_;
Expand Down
8 changes: 8 additions & 0 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1749,4 +1749,12 @@ Status Storage::SetOptions(const OptionType& option_type, const std::string& db_
return s;
}

void Storage::GetRocksDBInfo(std::string &info) {
strings_db_->GetRocksDBInfo(info, "strings_");
hashes_db_->GetRocksDBInfo(info, "hashes_");
lists_db_->GetRocksDBInfo(info, "lists_");
sets_db_->GetRocksDBInfo(info, "sets_");
zsets_db_->GetRocksDBInfo(info, "zsets_");
}

} // namespace storage
7 changes: 6 additions & 1 deletion tools/pika_exporter/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

# export PATH := $(PATH):$(GOPATH)/bin

# for mac
UNAME := $(shell uname)
# for mac
BRANCH := $(shell git branch | sed 's/* \(.*\)/\1/p')
# for Linux
# BRANCH := $(shell git branch | sed --quiet 's/* \(.*\)/\1/p')
Expand Down Expand Up @@ -62,7 +63,11 @@ export TEST_COVER
all: build

build: deps
ifeq ($(UNAME), Linux)
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o bin/$(PROJNAME)
else ifeq ($(UNAME), Darwin)
CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 go build -o bin/$(PROJNAME)
endif

deps: generateVer
@mkdir -p bin
Expand Down
Loading