Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Dynamically set disable_auto_compaction #2257

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ slave-priority : 100
# [NOTICE]: compact-interval is prior than compact-cron.
#compact-interval :

# The disable_auto_compactions option is [true | false]
disable_auto_compactions : false

# The minimum disk usage ratio for checking resume.
# If the disk usage ratio is lower than min-check-resume-ratio, it will not check resume, only higher will check resume.
# Its default value is 0.7.
Expand Down
10 changes: 10 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return compact_interval_;
}
bool disable_auto_compactions() {
std::shared_lock l(rwlock_);
return disable_auto_compactions_;
}
int64_t least_resume_free_disk_size() {
std::shared_lock l(rwlock_);
return least_free_disk_to_resume_;
Expand Down Expand Up @@ -538,6 +542,11 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("compact-interval", value);
compact_interval_ = value;
}
void SetDisableAutoCompaction(const std::string& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("disable_auto_compactions", value);
disable_auto_compactions_ = value == "true";
}
void SetLeastResumeFreeDiskSize(const int64_t& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("least-free-disk-resume-size", std::to_string(value));
Expand Down Expand Up @@ -648,6 +657,7 @@ class PikaConf : public pstd::BaseConf {
int db_sync_speed_ = 0;
std::string compact_cron_;
std::string compact_interval_;
bool disable_auto_compactions_ = false;
int64_t resume_check_interval_ = 60; // seconds
int64_t least_free_disk_to_resume_ = 268435456; // 256 MB
double min_check_resume_ratio_ = 0.7;
Expand Down
21 changes: 19 additions & 2 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1808,7 +1808,6 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, "write-binlog");
EncodeString(&config_body, g_pika_conf->write_binlog() ? "yes" : "no");
}

if (pstd::stringmatch(pattern.data(), "binlog-file-size", 1) != 0) {
elements += 2;
EncodeString(&config_body, "binlog-file-size");
Expand Down Expand Up @@ -1856,7 +1855,11 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, "compact-interval");
EncodeString(&config_body, g_pika_conf->compact_interval());
}

if (pstd::stringmatch(pattern.data(), "disable_auto_compactions", 1) != 0) {
elements += 2;
EncodeString(&config_body, "disable_auto_compactions");
EncodeString(&config_body, g_pika_conf->disable_auto_compactions() ? "true" : "false");
}
if (pstd::stringmatch(pattern.data(), "network-interface", 1) != 0) {
elements += 2;
EncodeString(&config_body, "network-interface");
Expand Down Expand Up @@ -2082,6 +2085,7 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr<Slot> slot) {
EncodeString(&ret, "db-sync-speed");
EncodeString(&ret, "compact-cron");
EncodeString(&ret, "compact-interval");
EncodeString(&ret, "disable_auto_compactions");
EncodeString(&ret, "slave-priority");
EncodeString(&ret, "sync-window-size");
// Options for storage engine
Expand Down Expand Up @@ -2218,6 +2222,19 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr<Slot> slot) {
g_pika_conf->SetSmallCompactionDurationThreshold(static_cast<int>(ival));
g_pika_server->SlotSetSmallCompactionDurationThreshold(static_cast<int>(ival));
ret = "+OK\r\n";
} else if (set_item == "disable_auto_compactions") {
if (value != "true" && value != "false") {
ret = "-ERR invalid disable_auto_compactions (true or false)\r\n";
return;
}
std::unordered_map<std::string, std::string> options_map{{"disable_auto_compactions", value}};
storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kColumnFamily, options_map);
if (!s.ok()) {
ret = "-ERR Set storage::OptionType::kColumnFamily disable_auto_compactions wrong: " + s.ToString() + "\r\n";
return;
}
g_pika_conf->SetDisableAutoCompaction(value);
ret = "+OK\r\n";
} else if (set_item == "max-client-response-size") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-client-response-size'\r\n";
Expand Down
4 changes: 4 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,9 @@ int PikaConf::Load() {
if (max_cache_statistic_keys_ <= 0) {
max_cache_statistic_keys_ = 0;
}

// disable_auto_compactions
GetConfBool("disable_auto_compactions", &disable_auto_compactions_);

small_compaction_threshold_ = 5000;
GetConfInt("small-compaction-threshold", &small_compaction_threshold_);
Expand Down Expand Up @@ -738,6 +741,7 @@ int PikaConf::ConfigRewrite() {
SetConfInt("db-sync-speed", db_sync_speed_);
SetConfStr("compact-cron", compact_cron_);
SetConfStr("compact-interval", compact_interval_);
SetConfStr("disable_auto_compactions", disable_auto_compactions_ ? "true" : "false");
SetConfInt64("least-free-disk-resume-size", least_free_disk_to_resume_);
SetConfInt64("manually-resume-interval", resume_check_interval_);
SetConfDouble("min-check-resume-ratio", min_check_resume_ratio_);
Expand Down
1 change: 1 addition & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1571,6 +1571,7 @@ void PikaServer::InitStorageOptions() {
storage_options_.options.target_file_size_base = g_pika_conf->target_file_size_base();
storage_options_.options.max_background_flushes = g_pika_conf->max_background_flushes();
storage_options_.options.max_background_compactions = g_pika_conf->max_background_compactions();
storage_options_.options.disable_auto_compactions = g_pika_conf->disable_auto_compactions();
storage_options_.options.max_background_jobs = g_pika_conf->max_background_jobs();
storage_options_.options.max_open_files = g_pika_conf->max_cache_files();
storage_options_.options.max_bytes_for_level_multiplier = g_pika_conf->max_bytes_for_level_multiplier();
Expand Down
4 changes: 4 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,10 @@ class Storage {

Status SetOptions(const OptionType& option_type, const std::string& db_type,
const std::unordered_map<std::string, std::string>& options);
Status EnableDymayticOptions(const OptionType& option_type,
const std::string& db_type, const std::unordered_map<std::string, std::string>& options);
Status EnableAutoCompaction(const OptionType& option_type,
const std::string& db_type, const std::unordered_map<std::string, std::string>& options);
void GetRocksDBInfo(std::string& info);

private:
Expand Down
17 changes: 17 additions & 0 deletions src/storage/src/options_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ static bool strToUint32(const std::string& value, uint32_t* num, int base = 10)
return true;
}

// strToBool may throw exception
static bool strToBool(const std::string& value, bool* boolVal, int base = 10) {
if (value != "true" && value != "false") {
throw std::invalid_argument(value);
}
*boolVal = value == "true";
return true;
}

bool ParseOptionMember(const MemberType& member_type, const std::string& value, char* member_address) {
switch (member_type) {
case MemberType::kInt: {
Expand Down Expand Up @@ -71,6 +80,14 @@ bool ParseOptionMember(const MemberType& member_type, const std::string& value,
*reinterpret_cast<size_t*>(member_address) = static_cast<size_t>(uint64Val);
break;
}
case MemberType::kBool: {
bool boolVal;
if (!strToBool(value, &boolVal)) {
return false;
}
*reinterpret_cast<bool*>(member_address) = static_cast<bool>(boolVal);
break;
}
default: {
return false;
}
Expand Down
5 changes: 4 additions & 1 deletion src/storage/src/options_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ enum class MemberType {
kUint64T,
kSizeT,
kUnknown,
kBool,
};

struct MemberTypeInfo {
Expand Down Expand Up @@ -65,9 +66,11 @@ static std::unordered_map<std::string, MemberTypeInfo> mutable_cf_options_member
{offset_of(&rocksdb::ColumnFamilyOptions::soft_pending_compaction_bytes_limit), MemberType::kUint64T}},
{"hard_pending_compaction_bytes_limit",
{offset_of(&rocksdb::ColumnFamilyOptions::hard_pending_compaction_bytes_limit), MemberType::kUint64T}},
{"disable_auto_compactions",
{offset_of(&rocksdb::ColumnFamilyOptions::disable_auto_compactions), MemberType::kBool}},
};

extern bool ParseOptionMember(const MemberType& member_type, const std::string& value, char* member_address);

} // namespace storage
#endif // SRC_OPTIONS_HELPER_H
#endif // SRC_OPTIONS_HELPER_H
1 change: 1 addition & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class Redis {
Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys);
Status SetSmallCompactionThreshold(uint64_t small_compaction_threshold);
Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold);
std::vector<rocksdb::ColumnFamilyHandle*> GetHandles(){ return handles_;};
void GetRocksDBInfo(std::string &info, const char *prefix);

protected:
Expand Down
57 changes: 57 additions & 0 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,63 @@ Status Storage::SetOptions(const OptionType& option_type, const std::string& db_
return s;
}
}
s = EnableDymayticOptions(option_type,db_type,options);
return s;
}

Status Storage::EnableDymayticOptions(const OptionType& option_type,
const std::string& db_type, const std::unordered_map<std::string, std::string>& options) {
Status s;
auto it = options.find("disable_auto_compactions");
if (it != options.end() && it->second == "false") {
s = EnableAutoCompaction(option_type,db_type,options);
LOG(WARNING) << "EnableAutoCompaction " << (s.ok() ? "success" : "failed")
<< " when Options get disable_auto_compactions: " << it->second << ",db_type:" << db_type;
}
return s;
}

Status Storage::EnableAutoCompaction(const OptionType& option_type,
const std::string& db_type, const std::unordered_map<std::string, std::string>& options){
Status s;
std::vector<std::string> cfs;
std::vector<rocksdb::ColumnFamilyHandle*> cfhds;

if (db_type == ALL_DB || db_type == STRINGS_DB) {
cfhds = strings_db_->GetHandles();
s = strings_db_.get()->GetDB()->EnableAutoCompaction(cfhds);
if (!s.ok()) {
return s;
}
}
if (db_type == ALL_DB || db_type == HASHES_DB) {
cfhds = hashes_db_->GetHandles();
s = hashes_db_.get()->GetDB()->EnableAutoCompaction(cfhds);
if (!s.ok()) {
return s;
}
}
if (db_type == ALL_DB || db_type == LISTS_DB) {
cfhds = lists_db_->GetHandles();
s = lists_db_.get()->GetDB()->EnableAutoCompaction(cfhds);
if (!s.ok()) {
return s;
}
}
if (db_type == ALL_DB || db_type == ZSETS_DB) {
cfhds = zsets_db_->GetHandles();
s = zsets_db_.get()->GetDB()->EnableAutoCompaction(cfhds);
if (!s.ok()) {
return s;
}
}
if (db_type == ALL_DB || db_type == SETS_DB) {
cfhds = sets_db_->GetHandles();
s = sets_db_.get()->GetDB()->EnableAutoCompaction(cfhds);
if (!s.ok()) {
return s;
}
}
return s;
}

Expand Down
Loading