Skip to content

Commit

Permalink
Merge branch 'unstable' into admin_thread
Browse files Browse the repository at this point in the history
  • Loading branch information
chejinge authored and brother-jin committed Jun 25, 2024
2 parents be96fed + 3eb2e48 commit e7fd91b
Show file tree
Hide file tree
Showing 46 changed files with 1,978 additions and 1,161 deletions.
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,14 +252,21 @@ Users can directly download the latest binary version package from [releases](ht
* #### 3.1 Running with Docker
```bash
Modify the following configuration items of conf/pika.conf file:
```
log-path : /data/log/
db-path : /data/db/
db-sync-path : /data/dbsync/
dump-path : /data/dump/
```
And then execute the following statement to start pika in docker:
```bash
docker run -d \
--restart=always \
-p 9221:9221 \
-v <log_dir>:/pika/log \
-v <db_dir>:/pika/db \
-v <dump_dir>:/pika/dump \
-v <dbsync_dir>:/pika/dbsync \
-v "$(pwd)/conf":"/pika/conf" \
-v "/tmp/pika-data":"/data" \
pikadb/pika:v3.3.6
redis-cli -p 9221 "info"
Expand Down
56 changes: 45 additions & 11 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ thread-num : 1
# are dedicated to handling user requests.
thread-pool-size : 12

# This parameter is used to control whether to separate fast and slow commands.
# When slow-cmd-pool is set to yes, fast and slow commands are separated.
# When set to no, they are not separated.
slow-cmd-pool : no

# Size of the low level thread pool, The threads within this pool
# are dedicated to handling slow user requests.
slow-cmd-thread-pool-size : 1
Expand Down Expand Up @@ -245,7 +250,8 @@ slave-priority : 100
# The disable_auto_compactions option is [true | false]
disable_auto_compactions : false

# Rocksdb max_subcompactions
# Rocksdb max_subcompactions, increasing this value can accelerate the exec speed of a single compaction task
# it's recommended to increase it's value if large compaction is found in you instance
max-subcompactions : 1
# The minimum disk usage ratio for checking resume.
# If the disk usage ratio is lower than min-check-resume-ratio, it will not check resume, only higher will check resume.
Expand Down Expand Up @@ -357,17 +363,42 @@ compression : snappy
# https://github.com/facebook/rocksdb/wiki/Compression
#compression_per_level : [none:none:snappy:lz4:lz4]

# The number of rocksdb background threads(sum of max-background-compactions and max-background-flushes)
# If max-background-jobs has a valid value AND both 'max-background-flushs' and 'max-background-compactions' is set to -1,
# then max-background-flushs' and 'max-background-compactions will be auto config by rocksdb, specifically:
# 1/4 of max-background-jobs will be given to max-background-flushs' and the rest(3/4) will be given to 'max-background-compactions'.
# 'max-background-jobs' default value is 3 and the value range is [2, 12].
max-background-jobs : 3

# The number of background flushing threads.
# max-background-flushes default value is 1 and the value range is [1, 4].
max-background-flushes : 1
# max-background-flushes default value is -1 and the value range is [1, 4] or -1.
# if 'max-background-flushes' is set to -1, the 'max-background-compactions' should also be set to -1,
# which means let rocksdb to auto config them based on the value of 'max-background-jobs'
max-background-flushes : -1

# [NOTICE] you MUST NOT set one of the max-background-flushes or max-background-compactions to -1 while setting another one to other values(not -1).
# They SHOULD both be -1 or both not(if you want to config them manually).

# The number of background compacting threads.
# max-background-compactions default value is 2 and the value range is [1, 8].
max-background-compactions : 2
# max-background-compactions default value is -1 and the value range is [1, 8] or -1.
# if 'max-background-compactions' is set to -1, the 'max-background-flushes' should also be set to -1,
# which means let rocksdb to auto config them based on the value of 'max-background-jobs'
max-background-compactions : -1

# RocksDB delayed-write-rate, default is 0(infer from rate-limiter by RocksDB)
# Ref from rocksdb: Whenever stall conditions are triggered, RocksDB will reduce write rate to delayed_write_rate,
# and could possibly reduce write rate to even lower than delayed_write_rate if estimated pending compaction bytes accumulates.
# If the value is 0, RcoksDB will infer a value from `rater_limiter` value if it is not empty, or 16MB if `rater_limiter` is empty.
# Note that if users change the rate in `rate_limiter` after DB is opened, delayed_write_rate won't be adjusted.
# [Support Dynamically changeable] send 'config set delayed-write-rate' to a running pika can change it's value dynamically
delayed-write-rate : 0


# RocksDB will try to limit number of bytes in one compaction to be lower than this max-compaction-bytes.
# But it's NOT guaranteed.
# default value is -1, means let it be 25 * target-file-size-base (Which is RocksDB's default value)
max-compaction-bytes : -1

# The number of background threads.
# max-background-jobs default value is 3 and the value range is [2, 12].
max-background-jobs : 3

# maximum value of RocksDB cached open file descriptors
max-cache-files : 5000
Expand Down Expand Up @@ -433,14 +464,17 @@ default-slot-num : 1024
# 0: Read 1: Write 2: ReadAndWrite
# rate-limiter-mode : default 1

# rate limiter bandwidth, default 2000MB/s
#rate-limiter-bandwidth : 2097152000
# rate limiter bandwidth, units in bytes, default 1024GB/s (No limit)
# [Support Dynamically changeable] send 'rate-limiter-bandwidth' to a running pika can change it's value dynamically
#rate-limiter-bandwidth : 1099511627776

#rate-limiter-refill-period-us : 100000
#
#rate-limiter-fairness: 10

# rate limiter auto tune https://rocksdb.org/blog/2017/12/18/17-auto-tuned-rate-limiter.html. the default value is false.
# if auto_tuned is true: Enables dynamic adjustment of rate limit within the range
#`[rate-limiter-bandwidth / 20, rate-limiter-bandwidth]`, according to the recent demand for background I/O.
# rate limiter auto tune https://rocksdb.org/blog/2017/12/18/17-auto-tuned-rate-limiter.html. the default value is true.
#rate-limiter-auto-tuned : true

################################## RocksDB Blob Configure #####################
Expand Down
2 changes: 0 additions & 2 deletions include/pika_client_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ class PikaClientProcessor {
int Start();
void Stop();
void SchedulePool(net::TaskFunc func, void* arg);
void ScheduleBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str);
size_t ThreadPoolCurQueueSize();
size_t ThreadPoolMaxQueueSize();

private:
std::unique_ptr<net::ThreadPool> pool_;
std::vector<std::unique_ptr<net::BGThread>> bg_threads_;
};
#endif // PIKA_CLIENT_PROCESSOR_H_
46 changes: 43 additions & 3 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return slotmigrate_;
}
bool slow_cmd_pool() {
std::shared_lock l(rwlock_);
return slow_cmd_pool_;
}
std::string server_id() {
std::shared_lock l(rwlock_);
return server_id_;
Expand Down Expand Up @@ -259,6 +263,12 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return target_file_size_base_;
}

uint64_t max_compaction_bytes() {
std::shared_lock l(rwlock_);
return static_cast<uint64_t>(max_compaction_bytes_);
}

int max_cache_statistic_keys() {
std::shared_lock l(rwlock_);
return max_cache_statistic_keys_;
Expand All @@ -283,6 +293,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return max_background_jobs_;
}
uint64_t delayed_write_rate(){
std::shared_lock l(rwlock_);
return static_cast<uint64_t>(delayed_write_rate_);
}
int max_cache_files() {
std::shared_lock l(rwlock_);
return max_cache_files_;
Expand Down Expand Up @@ -583,6 +597,11 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("slotmigrate", value ? "yes" : "no");
slotmigrate_.store(value);
}
void SetSlowCmdPool(const bool value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slow-cmd-pool", value ? "yes" : "no");
slow_cmd_pool_.store(value);
}
void SetSlotMigrateThreadNum(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slotmigrate-thread-num", std::to_string(value));
Expand Down Expand Up @@ -732,6 +751,24 @@ class PikaConf : public pstd::BaseConf {
arena_block_size_ = value;
}

void SetRateLmiterBandwidth(int64_t value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("rate-limiter-bandwidth", std::to_string(value));
rate_limiter_bandwidth_ = value;
}

void SetDelayedWriteRate(int64_t value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("delayed-write-rate", std::to_string(value));
delayed_write_rate_ = value;
}

void SetMaxCompactionBytes(int64_t value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("max-compaction-bytes", std::to_string(value));
max_compaction_bytes_ = value;
}

void SetLogLevel(const std::string& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("loglevel", value);
Expand Down Expand Up @@ -854,6 +891,7 @@ class PikaConf : public pstd::BaseConf {
std::string bgsave_path_;
std::string bgsave_prefix_;
std::string pidfile_;
std::atomic<bool> slow_cmd_pool_;

std::string compression_;
std::string compression_per_level_;
Expand All @@ -872,9 +910,10 @@ class PikaConf : public pstd::BaseConf {
int max_cache_statistic_keys_ = 0;
int small_compaction_threshold_ = 0;
int small_compaction_duration_threshold_ = 0;
int max_background_flushes_ = 1;
int max_background_compactions_ = 2;
int max_background_flushes_ = -1;
int max_background_compactions_ = -1;
int max_background_jobs_ = 0;
int64_t delayed_write_rate_ = 0;
int max_cache_files_ = 0;
std::atomic<uint64_t> rocksdb_ttl_second_ = 0;
std::atomic<uint64_t> rocksdb_periodic_second_ = 0;
Expand Down Expand Up @@ -918,6 +957,7 @@ class PikaConf : public pstd::BaseConf {
//
bool write_binlog_ = false;
int target_file_size_base_ = 0;
int64_t max_compaction_bytes_ = 0;
int binlog_file_size_ = 0;

// cache
Expand Down Expand Up @@ -952,7 +992,7 @@ class PikaConf : public pstd::BaseConf {
std::shared_mutex rwlock_;

// Rsync Rate limiting configuration
int throttle_bytes_per_second_ = 207200000;
int throttle_bytes_per_second_ = 200 << 20; // 200MB/s
int max_rsync_parallel_num_ = kMaxRsyncParallelNum;
std::atomic_int64_t rsync_timeout_ms_ = 1000;
};
Expand Down
2 changes: 2 additions & 0 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,8 @@ class PKSetexAtCmd : public Cmd {
return res;
}
void Do() override;
void DoThroughDB() override;
void DoUpdateCache() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PKSetexAtCmd(*this); }
Expand Down
3 changes: 3 additions & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class PikaReplBgWorker {
void QueueClear();
static void HandleBGWorkerWriteBinlog(void* arg);
static void HandleBGWorkerWriteDB(void* arg);
void SetThreadName(const std::string& thread_name) {
bg_thread_.set_thread_name(thread_name);
}

BinlogItem binlog_item_;
net::RedisParser redis_parser_;
Expand Down
40 changes: 22 additions & 18 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class PikaServer : public pstd::noncopyable {
bool force_full_sync();
void SetForceFullSync(bool v);
void SetDispatchQueueLimit(int queue_limit);
void SetSlowCmdThreadPoolFlag(bool flag);
storage::StorageOptions storage_options();
std::unique_ptr<PikaDispatchThread>& pika_dispatch_thread() {
return pika_dispatch_thread_;
Expand Down Expand Up @@ -170,7 +171,6 @@ class PikaServer : public pstd::noncopyable {
void FinishMetaSync();
bool MetaSyncDone();
void ResetMetaSyncStatus();
void SetLoopDBStateMachine(bool need_loop);
int GetMetaSyncTimestamp();
void UpdateMetaSyncTimestamp();
void UpdateMetaSyncTimestampWithoutLock();
Expand All @@ -180,8 +180,8 @@ class PikaServer : public pstd::noncopyable {
/*
* PikaClientProcessor Process Task
*/
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_admin_cmd);
void ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str);
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_monitor_cmd);

// for info debug
size_t ClientProcessorThreadPoolCurQueueSize();
size_t ClientProcessorThreadPoolMaxQueueSize();
Expand Down Expand Up @@ -310,8 +310,7 @@ class PikaServer : public pstd::noncopyable {
bool SlotsMigrateBatch(const std::string &ip, int64_t port, int64_t time_out, int64_t slots, int64_t keys_num, const std::shared_ptr<DB>& db);
void GetSlotsMgrtSenderStatus(std::string *ip, int64_t* port, int64_t *slot, bool *migrating, int64_t *moved, int64_t *remained);
bool SlotsMigrateAsyncCancel();
std::shared_mutex bgsave_protector_;
BgSaveInfo bgsave_info_;
std::shared_mutex bgslots_protector_;

/*
* BGSlotsReload used
Expand All @@ -337,28 +336,28 @@ class PikaServer : public pstd::noncopyable {
BGSlotsReload bgslots_reload_;

BGSlotsReload bgslots_reload() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
return bgslots_reload_;
}
bool GetSlotsreloading() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
return bgslots_reload_.reloading;
}
void SetSlotsreloading(bool reloading) {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
bgslots_reload_.reloading = reloading;
}
void SetSlotsreloadingCursor(int64_t cursor) {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
bgslots_reload_.cursor = cursor;
}
int64_t GetSlotsreloadingCursor() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
return bgslots_reload_.cursor;
}

void SetSlotsreloadingEndTime() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
bgslots_reload_.end_time = time(nullptr);
}
void Bgslotsreload(const std::shared_ptr<DB>& db);
Expand Down Expand Up @@ -399,33 +398,33 @@ class PikaServer : public pstd::noncopyable {
net::BGThread bgslots_cleanup_thread_;

BGSlotsCleanup bgslots_cleanup() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
return bgslots_cleanup_;
}
bool GetSlotscleaningup() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
return bgslots_cleanup_.cleaningup;
}
void SetSlotscleaningup(bool cleaningup) {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
bgslots_cleanup_.cleaningup = cleaningup;
}
void SetSlotscleaningupCursor(int64_t cursor) {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
bgslots_cleanup_.cursor = cursor;
}
void SetCleanupSlots(std::vector<int> cleanup_slots) {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
bgslots_cleanup_.cleanup_slots.swap(cleanup_slots);
}
std::vector<int> GetCleanupSlots() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
return bgslots_cleanup_.cleanup_slots;
}

void Bgslotscleanup(std::vector<int> cleanup_slots, const std::shared_ptr<DB>& db);
void StopBgslotscleanup() {
std::lock_guard ml(bgsave_protector_);
std::lock_guard ml(bgslots_protector_);
bgslots_cleanup_.cleaningup = false;
std::vector<int> cleanup_slots;
bgslots_cleanup_.cleanup_slots.swap(cleanup_slots);
Expand Down Expand Up @@ -645,6 +644,11 @@ class PikaServer : public pstd::noncopyable {
* acl
*/
std::unique_ptr<::Acl> acl_ = nullptr;

/*
* fast and slow thread pools
*/
bool slow_cmd_thread_pool_flag_;
};

#endif
1 change: 1 addition & 0 deletions src/net/include/backend_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class BackendThread : public Thread {
*/
int StartThread() override;
int StopThread() override;
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
pstd::Status Write(int fd, const std::string& msg);
pstd::Status Close(int fd);
// Try to connect fd noblock, if return EINPROGRESS or EAGAIN or EWOULDBLOCK
Expand Down
Loading

0 comments on commit e7fd91b

Please sign in to comment.