From 23e6bc45b066dcaea7490dca465e9a79e3337542 Mon Sep 17 00:00:00 2001 From: chenbt Date: Fri, 29 Dec 2023 17:31:23 +0800 Subject: [PATCH 1/2] feat: Dynamically set disable_auto_compaction 1. disable_auto_compations controls whether to automatically start compat. 2. Dynamic control via config get disable_auto_compations/config set disable_auto_compations true(false). --- conf/pika.conf | 3 ++ include/pika_conf.h | 10 +++++ src/pika_admin.cc | 21 +++++++++- src/pika_conf.cc | 4 ++ src/pika_server.cc | 1 + src/storage/include/storage/storage.h | 4 ++ src/storage/src/options_helper.cc | 17 ++++++++ src/storage/src/options_helper.h | 5 ++- src/storage/src/redis.h | 1 + src/storage/src/storage.cc | 56 +++++++++++++++++++++++++++ 10 files changed, 119 insertions(+), 3 deletions(-) diff --git a/conf/pika.conf b/conf/pika.conf index e902d45a1e..c5bc276a30 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -217,6 +217,9 @@ slave-priority : 100 # [NOTICE]: compact-interval is prior than compact-cron. #compact-interval : +# The disable_auto_compactions option is [true | false] +disable_auto_compactions : false + # The minimum disk usage ratio for checking resume. # If the disk usage ratio is lower than min-check-resume-ratio, it will not check resume, only higher will check resume. # Its default value is 0.7. diff --git a/include/pika_conf.h b/include/pika_conf.h index 92ec11c74b..812ac6a501 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -87,6 +87,10 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return compact_interval_; } + bool disable_auto_compactions() { + std::shared_lock l(rwlock_); + return disable_auto_compactions_; + } int64_t least_resume_free_disk_size() { std::shared_lock l(rwlock_); return least_free_disk_to_resume_; @@ -538,6 +542,11 @@ class PikaConf : public pstd::BaseConf { TryPushDiffCommands("compact-interval", value); compact_interval_ = value; } + void SetDisableAutoCompaction(const std::string& value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("disable_auto_compactions", value); + disable_auto_compactions_ = value == "true"; + } void SetLeastResumeFreeDiskSize(const int64_t& value) { std::lock_guard l(rwlock_); TryPushDiffCommands("least-free-disk-resume-size", std::to_string(value)); @@ -648,6 +657,7 @@ class PikaConf : public pstd::BaseConf { int db_sync_speed_ = 0; std::string compact_cron_; std::string compact_interval_; + bool disable_auto_compactions_ = false; int64_t resume_check_interval_ = 60; // seconds int64_t least_free_disk_to_resume_ = 268435456; // 256 MB double min_check_resume_ratio_ = 0.7; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index c62732358b..965a18e0e4 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1808,7 +1808,6 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeString(&config_body, "write-binlog"); EncodeString(&config_body, g_pika_conf->write_binlog() ? "yes" : "no"); } - if (pstd::stringmatch(pattern.data(), "binlog-file-size", 1) != 0) { elements += 2; EncodeString(&config_body, "binlog-file-size"); @@ -1856,7 +1855,11 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeString(&config_body, "compact-interval"); EncodeString(&config_body, g_pika_conf->compact_interval()); } - + if (pstd::stringmatch(pattern.data(), "disable_auto_compactions", 1) != 0) { + elements += 2; + EncodeString(&config_body, "disable_auto_compactions"); + EncodeString(&config_body, g_pika_conf->disable_auto_compactions() ? "true" : "false"); + } if (pstd::stringmatch(pattern.data(), "network-interface", 1) != 0) { elements += 2; EncodeString(&config_body, "network-interface"); @@ -2082,6 +2085,7 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr slot) { EncodeString(&ret, "db-sync-speed"); EncodeString(&ret, "compact-cron"); EncodeString(&ret, "compact-interval"); + EncodeString(&ret, "disable_auto_compactions"); EncodeString(&ret, "slave-priority"); EncodeString(&ret, "sync-window-size"); // Options for storage engine @@ -2218,6 +2222,19 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr slot) { g_pika_conf->SetSmallCompactionDurationThreshold(static_cast(ival)); g_pika_server->SlotSetSmallCompactionDurationThreshold(static_cast(ival)); ret = "+OK\r\n"; + } else if (set_item == "disable_auto_compactions") { + if (value != "true" && value != "false") { + ret = "-ERR invalid disable_auto_compactions (true or false)\r\n"; + return; + } + std::unordered_map options_map{{"disable_auto_compactions", value}}; + storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kColumnFamily, options_map); + if (!s.ok()) { + ret = "-ERR Set storage::OptionType::kColumnFamily disable_auto_compactions wrong: " + s.ToString() + "\r\n"; + return; + } + g_pika_conf->SetDisableAutoCompaction(value); + ret = "+OK\r\n"; } else if (set_item == "max-client-response-size") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-client-response-size'\r\n"; diff --git a/src/pika_conf.cc b/src/pika_conf.cc index d194363757..afa27d9e19 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -447,6 +447,9 @@ int PikaConf::Load() { if (max_cache_statistic_keys_ <= 0) { max_cache_statistic_keys_ = 0; } + + // disable_auto_compactions + GetConfBool("disable_auto_compactions", &disable_auto_compactions_); small_compaction_threshold_ = 5000; GetConfInt("small-compaction-threshold", &small_compaction_threshold_); @@ -738,6 +741,7 @@ int PikaConf::ConfigRewrite() { SetConfInt("db-sync-speed", db_sync_speed_); SetConfStr("compact-cron", compact_cron_); SetConfStr("compact-interval", compact_interval_); + SetConfStr("disable_auto_compactions", disable_auto_compactions_ ? "true" : "false"); SetConfInt64("least-free-disk-resume-size", least_free_disk_to_resume_); SetConfInt64("manually-resume-interval", resume_check_interval_); SetConfDouble("min-check-resume-ratio", min_check_resume_ratio_); diff --git a/src/pika_server.cc b/src/pika_server.cc index 5cf0873ed2..a5d2468705 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -1571,6 +1571,7 @@ void PikaServer::InitStorageOptions() { storage_options_.options.target_file_size_base = g_pika_conf->target_file_size_base(); storage_options_.options.max_background_flushes = g_pika_conf->max_background_flushes(); storage_options_.options.max_background_compactions = g_pika_conf->max_background_compactions(); + storage_options_.options.disable_auto_compactions = g_pika_conf->disable_auto_compactions(); storage_options_.options.max_background_jobs = g_pika_conf->max_background_jobs(); storage_options_.options.max_open_files = g_pika_conf->max_cache_files(); storage_options_.options.max_bytes_for_level_multiplier = g_pika_conf->max_bytes_for_level_multiplier(); diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index fce6d546c5..77b3e8abdd 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -1049,6 +1049,10 @@ class Storage { Status SetOptions(const OptionType& option_type, const std::string& db_type, const std::unordered_map& options); + Status EnableDymayticOptions(const OptionType& option_type, + const std::string& db_type, const std::unordered_map& options); + Status EnableAutoCompaction(const OptionType& option_type, + const std::string& db_type, const std::unordered_map& options); void GetRocksDBInfo(std::string& info); private: diff --git a/src/storage/src/options_helper.cc b/src/storage/src/options_helper.cc index b0783c35d8..a7a7c401b1 100644 --- a/src/storage/src/options_helper.cc +++ b/src/storage/src/options_helper.cc @@ -37,6 +37,15 @@ static bool strToUint32(const std::string& value, uint32_t* num, int base = 10) return true; } +// strToBool may throw exception +static bool strToBool(const std::string& value, bool* boolVal, int base = 10) { + if (value != "true" && value != "false") { + throw std::invalid_argument(value); + } + *boolVal = value == "true"; + return true; +} + bool ParseOptionMember(const MemberType& member_type, const std::string& value, char* member_address) { switch (member_type) { case MemberType::kInt: { @@ -71,6 +80,14 @@ bool ParseOptionMember(const MemberType& member_type, const std::string& value, *reinterpret_cast(member_address) = static_cast(uint64Val); break; } + case MemberType::kBool: { + bool boolVal; + if (!strToBool(value, &boolVal)) { + return false; + } + *reinterpret_cast(member_address) = static_cast(boolVal); + break; + } default: { return false; } diff --git a/src/storage/src/options_helper.h b/src/storage/src/options_helper.h index 8ef81e2434..2e81202d9c 100644 --- a/src/storage/src/options_helper.h +++ b/src/storage/src/options_helper.h @@ -18,6 +18,7 @@ enum class MemberType { kUint64T, kSizeT, kUnknown, + kBool, }; struct MemberTypeInfo { @@ -65,9 +66,11 @@ static std::unordered_map mutable_cf_options_member {offset_of(&rocksdb::ColumnFamilyOptions::soft_pending_compaction_bytes_limit), MemberType::kUint64T}}, {"hard_pending_compaction_bytes_limit", {offset_of(&rocksdb::ColumnFamilyOptions::hard_pending_compaction_bytes_limit), MemberType::kUint64T}}, + {"disable_auto_compactions", + {offset_of(&rocksdb::ColumnFamilyOptions::disable_auto_compactions), MemberType::kBool}}, }; extern bool ParseOptionMember(const MemberType& member_type, const std::string& value, char* member_address); } // namespace storage -#endif // SRC_OPTIONS_HELPER_H +#endif // SRC_OPTIONS_HELPER_H \ No newline at end of file diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index f1615e7b8f..8e9f3f0022 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -113,6 +113,7 @@ class Redis { Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys); Status SetSmallCompactionThreshold(uint64_t small_compaction_threshold); Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold); + std::vector GetHandles(){ return handles_;}; void GetRocksDBInfo(std::string &info, const char *prefix); protected: diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index d16548b9c2..89161f1a74 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -1800,6 +1800,62 @@ Status Storage::SetOptions(const OptionType& option_type, const std::string& db_ return s; } } + s = EnableDymayticOptions(option_type,db_type,options); + return s; +} + +Status Storage::EnableDymayticOptions(const OptionType& option_type, + const std::string& db_type, const std::unordered_map& options) { + Status s; + auto it = options.find("disable_auto_compactions"); + if (it != options.end() && it->second == "false") { + s = EnableAutoCompaction(option_type,db_type,options); + LOG(WARNING) << "EnableAutoCompaction " << (s.ok() ? "success" : "failed") << " when Options get disable_auto_compactions: " << it->second; + } + return s; +} + +Status Storage::EnableAutoCompaction(const OptionType& option_type, + const std::string& db_type, const std::unordered_map& options){ + Status s; + std::vector cfs; + std::vector cfhds; + + if (db_type == ALL_DB || db_type == STRINGS_DB) { + cfhds = strings_db_->GetHandles(); + s = strings_db_.get()->GetDB()->EnableAutoCompaction(cfhds); + if (!s.ok()) { + return s; + } + } + if (db_type == ALL_DB || db_type == HASHES_DB) { + cfhds = hashes_db_->GetHandles(); + s = hashes_db_.get()->GetDB()->EnableAutoCompaction(cfhds); + if (!s.ok()) { + return s; + } + } + if (db_type == ALL_DB || db_type == LISTS_DB) { + cfhds = lists_db_->GetHandles(); + s = lists_db_.get()->GetDB()->EnableAutoCompaction(cfhds); + if (!s.ok()) { + return s; + } + } + if (db_type == ALL_DB || db_type == ZSETS_DB) { + cfhds = zsets_db_->GetHandles(); + s = zsets_db_.get()->GetDB()->EnableAutoCompaction(cfhds); + if (!s.ok()) { + return s; + } + } + if (db_type == ALL_DB || db_type == SETS_DB) { + cfhds = sets_db_->GetHandles(); + s = sets_db_.get()->GetDB()->EnableAutoCompaction(cfhds); + if (!s.ok()) { + return s; + } + } return s; } From e29f0e752f4affc59a05f1293250bf225f2483f5 Mon Sep 17 00:00:00 2001 From: chenbt Date: Tue, 2 Jan 2024 20:37:01 +0800 Subject: [PATCH 2/2] log about db_type --- src/storage/src/storage.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 89161f1a74..641d149e20 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -1810,7 +1810,8 @@ Status Storage::EnableDymayticOptions(const OptionType& option_type, auto it = options.find("disable_auto_compactions"); if (it != options.end() && it->second == "false") { s = EnableAutoCompaction(option_type,db_type,options); - LOG(WARNING) << "EnableAutoCompaction " << (s.ok() ? "success" : "failed") << " when Options get disable_auto_compactions: " << it->second; + LOG(WARNING) << "EnableAutoCompaction " << (s.ok() ? "success" : "failed") + << " when Options get disable_auto_compactions: " << it->second << ",db_type:" << db_type; } return s; }