Skip to content

Commit

Permalink
Merge branch 'OpenAtomFoundation:unstable' into fix_mode
Browse files Browse the repository at this point in the history
  • Loading branch information
chejinge authored and brother-jin committed Apr 9, 2024
2 parents 576e3f3 + 93f0576 commit fe62100
Show file tree
Hide file tree
Showing 15 changed files with 728 additions and 33 deletions.
2 changes: 0 additions & 2 deletions .github/ISSUE_TEMPLATE/1-bug-report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ body:
attributes:
label: Screenshots or videos
description: If you can, upload any screenshots of the bug.
value: |
![images](https://camo.githubusercontent.com/3f51b5a32e6e5d5adabdebc5ef968150bdabc8d17a8dc1a535b8fb255d2165d0/68747470733a2f2f67772e616c697061796f626a656374732e636f6d2f7a6f732f616e7466696e63646e2f79396b776737445643642f726570726f647563652e676966)

- type: textarea
id: environment
Expand Down
17 changes: 15 additions & 2 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return max_write_buffer_num_;
}
uint64_t MaxTotalWalSize() {
std::shared_lock l(rwlock_);
return max_total_wal_size_;
}
int64_t max_client_response_size() {
std::shared_lock l(rwlock_);
return max_client_response_size_;
Expand Down Expand Up @@ -530,16 +534,19 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("masterauth", value);
masterauth_ = value;
}
void SetSlotMigrate(const std::string& value) {
void SetSlotMigrate(const bool value) {
std::lock_guard l(rwlock_);
slotmigrate_ = (value == "yes");
TryPushDiffCommands("slotmigrate", value ? "yes" : "no");
slotmigrate_.store(value);
}
void SetSlotMigrateThreadNum(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slotmigrate-thread-num", std::to_string(value));
slotmigrate_thread_num_ = value;
}
void SetThreadMigrateKeysNum(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("thread-migrate-keys-num", std::to_string(value));
thread_migrate_keys_num_ = value;
}
void SetExpireLogsNums(const int value) {
Expand Down Expand Up @@ -670,6 +677,11 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("max-write-buffer-num", std::to_string(value));
max_write_buffer_num_ = value;
}
void SetMaxTotalWalSize(uint64_t value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("max-total-wal-size", std::to_string(value));
max_total_wal_size_ = value;
}
void SetArenaBlockSize(const int& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("arena-block-size", std::to_string(value));
Expand Down Expand Up @@ -765,6 +777,7 @@ class PikaConf : public pstd::BaseConf {
int64_t slotmigrate_thread_num_ = 0;
int64_t thread_migrate_keys_num_ = 0;
int64_t max_write_buffer_size_ = 0;
int64_t max_total_wal_size_ = 0;
int max_write_buffer_num_ = 0;
int min_write_buffer_number_to_merge_ = 1;
int level0_stop_writes_trigger_ = 36;
Expand Down
50 changes: 40 additions & 10 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1595,13 +1595,13 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, g_pika_conf->slotmigrate() ? "yes" : "no");
}

if (pstd::stringmatch(pattern.data(), "slotmigrate-thread-num", 1)) {
if (pstd::stringmatch(pattern.data(), "slotmigrate-thread-num", 1)!= 0) {
elements += 2;
EncodeString(&config_body, "slotmigrate-thread-num");
EncodeNumber(&config_body, g_pika_conf->slotmigrate_thread_num());
}

if (pstd::stringmatch(pattern.data(), "thread-migrate-keys-num", 1)) {
if (pstd::stringmatch(pattern.data(), "thread-migrate-keys-num", 1)!= 0) {
elements += 2;
EncodeString(&config_body, "thread-migrate-keys-num");
EncodeNumber(&config_body, g_pika_conf->thread_migrate_keys_num());
Expand Down Expand Up @@ -1780,6 +1780,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeNumber(&config_body, g_pika_conf->max_write_buffer_size());
}

if (pstd::stringmatch(pattern.data(), "max-total-wal-size", 1) != 0) {
elements += 2;
EncodeString(&config_body, "max-total-wal-size");
EncodeNumber(&config_body, g_pika_conf->MaxTotalWalSize());
}

if (pstd::stringmatch(pattern.data(), "min-write-buffer-number-to-merge", 1) != 0) {
elements += 2;
EncodeString(&config_body, "min-write-buffer-number-to-merge");
Expand Down Expand Up @@ -2140,6 +2146,7 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
"write-buffer-size",
"max-write-buffer-num",
"min-write-buffer-number-to-merge",
"max-total-wal-size",
"level0-slowdown-writes-trigger",
"level0-stop-writes-trigger",
"level0-file-num-compaction-trigger",
Expand Down Expand Up @@ -2172,9 +2179,6 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
} else if (set_item == "masterauth") {
g_pika_conf->SetMasterAuth(value);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slotmigrate") {
g_pika_conf->SetSlotMigrate(value);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "dump-prefix") {
g_pika_conf->SetBgsavePrefix(value);
res_.AppendStringRaw("+OK\r\n");
Expand Down Expand Up @@ -2223,19 +2227,19 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slotmigrate-thread-num") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'expire-logs-nums'\r\n");
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slotmigrate-thread-num'\r\n");
return;
}
long int migrate_thread_num = (0 > ival || 24 < ival) ? 8 : ival;
g_pika_conf->SetSlotMigrateThreadNum(static_cast<int>(ival));
long int migrate_thread_num = (1 > ival || 24 < ival) ? 8 : ival;
g_pika_conf->SetSlotMigrateThreadNum(migrate_thread_num);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "thread-migrate-keys-num") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'expire-logs-nums'\r\n");
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'thread-migrate-keys-num'\r\n");
return;
}
long int thread_migrate_keys_num = (8 > ival || 128 < ival) ? 64 : ival;
g_pika_conf->SetThreadMigrateKeysNum(static_cast<int>(ival));
g_pika_conf->SetThreadMigrateKeysNum(thread_migrate_keys_num);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slowlog-write-errorlog") {
bool is_write_errorlog;
Expand All @@ -2249,6 +2253,18 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetSlowlogWriteErrorlog(is_write_errorlog);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slotmigrate") {
bool slotmigrate;
if (value == "yes") {
slotmigrate = true;
} else if (value == "no") {
slotmigrate = false;
} else {
res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slotmigrate'\r\n");
return;
}
g_pika_conf->SetSlotMigrate(slotmigrate);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slowlog-log-slower-than") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n");
Expand Down Expand Up @@ -2536,6 +2552,20 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetLevel0SlowdownWritesTrigger(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");

} else if (set_item == "max-total-wal-size") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-total-wal-size'\r\n");
return;
}
std::unordered_map<std::string, std::string> options_map{{"max_total_wal_size", value}};
storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kDB, options_map);
if (!s.ok()) {
res_.AppendStringRaw("-ERR Set max-total-wal-size: " + s.ToString() + "\r\n");
return;
}
g_pika_conf->SetMaxTotalWalSize(static_cast<uint64_t>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "level0-file-num-compaction-trigger") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'level0-file-num-compaction-trigger'\r\n");
Expand Down
4 changes: 2 additions & 2 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBgsave, std::move(bgsaveptr)));

std::unique_ptr<Cmd> compactptr =
std::make_unique<CompactCmd>(kCmdNameCompact, -1, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSlow);
std::make_unique<CompactCmd>(kCmdNameCompact, -1, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSlow | kCmdFlagsSuspend);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameCompact, std::move(compactptr)));

std::unique_ptr<Cmd> compactrangeptr = std::make_unique<CompactRangeCmd>(kCmdNameCompactRange, 5, kCmdFlagsRead | kCmdFlagsAdmin);
std::unique_ptr<Cmd> compactrangeptr = std::make_unique<CompactRangeCmd>(kCmdNameCompactRange, 5, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSuspend);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameCompactRange, std::move(compactrangeptr)));
std::unique_ptr<Cmd> purgelogsto =
std::make_unique<PurgelogstoCmd>(kCmdNamePurgelogsto, -2, kCmdFlagsRead | kCmdFlagsAdmin);
Expand Down
13 changes: 6 additions & 7 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ int PikaConf::Load() {
slowlog_write_errorlog_.store(swe == "yes" ? true : false);

// slot migrate
std::string smgrt = "no";
std::string smgrt;
GetConfStr("slotmigrate", &smgrt);
slotmigrate_ = (smgrt == "yes") ? true : false;
slotmigrate_.store(smgrt == "yes" ? true : false);

int binlog_writer_num = 1;
GetConfInt("binlog-writer-num", &binlog_writer_num);
Expand Down Expand Up @@ -306,13 +306,13 @@ int PikaConf::Load() {

// arena_block_size
GetConfInt64Human("slotmigrate-thread-num", &slotmigrate_thread_num_);
if (slotmigrate_thread_num_ < 0 || slotmigrate_thread_num_ > 24) {
if (slotmigrate_thread_num_ < 1 || slotmigrate_thread_num_ > 24) {
slotmigrate_thread_num_ = 8; // 1/8 of the write_buffer_size_
}

// arena_block_size
GetConfInt64Human("thread-migrate-keys-num", &thread_migrate_keys_num_);
if (thread_migrate_keys_num_ < 64 || thread_migrate_keys_num_ > 128) {
if (thread_migrate_keys_num_ < 8 || thread_migrate_keys_num_ > 128) {
thread_migrate_keys_num_ = 64; // 1/8 of the write_buffer_size_
}

Expand Down Expand Up @@ -716,12 +716,11 @@ int PikaConf::ConfigRewrite() {
SetConfInt("level0-file-num-compaction-trigger", level0_file_num_compaction_trigger_);
SetConfInt64("arena-block-size", arena_block_size_);
SetConfStr("slotmigrate", slotmigrate_.load() ? "yes" : "no");
SetConfInt64("slotmigrate-thread-num", slotmigrate_thread_num_);
SetConfInt64("thread-migrate-keys-num", thread_migrate_keys_num_);
// slaveof config item is special
SetConfStr("slaveof", slaveof_);
// cache config
SetConfStr("share-block-cache", share_block_cache_ ? "yes" : "no");
SetConfInt("block-size", block_size_);
SetConfInt("block-cache", block_cache_);
SetConfStr("cache-index-and-filter-blocks", cache_index_and_filter_blocks_ ? "yes" : "no");
SetConfInt("cache-model", cache_mode_);
SetConfInt("zset-cache-start-direction", zset_cache_start_direction_);
Expand Down
2 changes: 1 addition & 1 deletion src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ void DB::SetCompactRangeOptions(const bool is_canceled) {
}

DisplayCacheInfo DB::GetCacheInfo() {
std::lock_guard l(key_info_protector_);
std::lock_guard l(cache_info_rwlock_);
return cache_info_;
}

Expand Down
1 change: 1 addition & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,7 @@ void PikaServer::InitStorageOptions() {
storage_options_.options.arena_block_size = g_pika_conf->arena_block_size();
storage_options_.options.write_buffer_manager =
std::make_shared<rocksdb::WriteBufferManager>(g_pika_conf->max_write_buffer_size());
storage_options_.options.max_total_wal_size = g_pika_conf->MaxTotalWalSize();
storage_options_.options.max_write_buffer_number = g_pika_conf->max_write_buffer_number();
storage_options_.options.level0_file_num_compaction_trigger = g_pika_conf->level0_file_num_compaction_trigger();
storage_options_.options.level0_stop_writes_trigger = g_pika_conf->level0_stop_writes_trigger();
Expand Down
89 changes: 89 additions & 0 deletions tests/integration/admin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package pika_integration

import (
"context"
"time"

. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"
"github.com/redis/go-redis/v9"
)

var _ = Describe("admin test", func() {
ctx := context.TODO()
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})

AfterEach(func() {
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
Expect(client.Close()).NotTo(HaveOccurred())
})

It("should info", func() {
set := client.Set(ctx, "key", "foobar", 0)
Expect(set.Err()).NotTo(HaveOccurred())
Expect(set.Val()).To(Equal("OK"))

bitCount := client.BitCount(ctx, "key", nil)
Expect(bitCount.Err()).NotTo(HaveOccurred())
Expect(bitCount.Val()).To(Equal(int64(26)))

set1 := client.Set(ctx, "key1", "value1", 0)
Expect(set1.Err()).NotTo(HaveOccurred())
Expect(set1.Val()).To(Equal("OK"))

set2 := client.Set(ctx, "key2", "value2", 0)
Expect(set2.Err()).NotTo(HaveOccurred())
Expect(set2.Val()).To(Equal("OK"))

set3 := client.Set(ctx, "key3", "value3", 0)
Expect(set3.Err()).NotTo(HaveOccurred())
Expect(set3.Val()).To(Equal("OK"))

set4 := client.Set(ctx, "key4", "value4", 0)
Expect(set4.Err()).NotTo(HaveOccurred())
Expect(set4.Val()).To(Equal("OK"))

lPush := client.LPush(ctx, "key2", "b")
Expect(lPush.Err()).NotTo(HaveOccurred())

sAdd := client.SAdd(ctx, "key3", "c")
Expect(sAdd.Err()).NotTo(HaveOccurred())
Expect(sAdd.Val()).To(Equal(int64(1)))

infokeyspace := client.Info(ctx, "keyspace", "1")
Expect(infokeyspace.Err()).NotTo(HaveOccurred())
Expect(infokeyspace.Val()).NotTo(Equal(""))

start := time.Now()
infoall := client.Info(ctx, "all")
Expect(infoall.Err()).NotTo(HaveOccurred())
Expect(infoall.Val()).NotTo(Equal(""))
end := time.Now()
duration := end.Sub(start)
Expect(duration).To(BeNumerically("<", time.Second))

start1 := time.Now()
infoacache := client.Info(ctx)
Expect(infoacache.Err()).NotTo(HaveOccurred())
Expect(infoacache.Val()).NotTo(Equal(""))
end1 := time.Now()
duration1 := end1.Sub(start1)
Expect(duration1).To(BeNumerically("<", time.Second))

start2 := time.Now()
info := client.Info(ctx, "cache")
Expect(info.Err()).NotTo(HaveOccurred())
Expect(info.Val()).NotTo(Equal(""))
end2 := time.Now()
duration2 := end2.Sub(start2)
Expect(duration2).To(BeNumerically("<", time.Second))
})

})
Loading

0 comments on commit fe62100

Please sign in to comment.