diff --git a/conf/pika.conf b/conf/pika.conf index 7f91a580ca..e902d45a1e 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -268,6 +268,7 @@ max-cache-statistic-keys : 0 # a small compact is triggered automatically if the small compaction feature is enabled. # small-compaction-threshold default value is 5000 and the value range is [1, 100000]. small-compaction-threshold : 5000 +small-compaction-duration-threshold : 10000 # The maximum total size of all live memtables of the RocksDB instance that owned by Pika. # Flushing from memtable to disk will be triggered if the actual memory usage of RocksDB diff --git a/include/pika_binlog.h b/include/pika_binlog.h index c2b4f9f650..6a1e8aa1ca 100644 --- a/include/pika_binlog.h +++ b/include/pika_binlog.h @@ -37,7 +37,7 @@ class Version final : public pstd::noncopyable { void debug() { std::shared_lock l(rwlock_); - printf("Current pro_num %u pro_offset %lu\n", pro_num_, pro_offset_); + printf("Current pro_num %u pro_offset %llu\n", pro_num_, pro_offset_); } private: @@ -63,12 +63,8 @@ class Binlog : public pstd::noncopyable { // Need to hold Lock(); pstd::Status Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index); - uint64_t file_size() { return file_size_; } - std::string filename() { return filename_; } - bool IsBinlogIoError() { return binlog_io_error_; } - // need to hold mutex_ void SetTerm(uint32_t term) { std::lock_guard l(version_->rwlock_); @@ -85,11 +81,9 @@ class Binlog : public pstd::noncopyable { private: pstd::Status Put(const char* item, int len); + pstd::Status EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int* temp_pro_offset); static pstd::Status AppendPadding(pstd::WritableFile* file, uint64_t* len); - // pstd::WritableFile *queue() { return queue_; } - void InitLogFile(); - pstd::Status EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int* temp_pro_offset); /* * Produce @@ -109,8 +103,6 @@ class Binlog : public pstd::noncopyable { int block_offset_ = 0; - char* pool_ = nullptr; - bool exit_all_consume_ = false; const std::string binlog_path_; uint64_t file_size_ = 0; @@ -118,8 +110,6 @@ class Binlog : public pstd::noncopyable { std::string filename_; std::atomic binlog_io_error_; - // Not use - // int32_t retry_; }; #endif diff --git a/include/pika_cache.h b/include/pika_cache.h index d976932a27..0566798529 100644 --- a/include/pika_cache.h +++ b/include/pika_cache.h @@ -18,6 +18,7 @@ #include "cache/include/cache.h" #include "storage/storage.h" +class PikaCacheLoadThread; class ZIncrbyCmd; class ZRangebyscoreCmd; class ZRevrangebyscoreCmd; @@ -45,10 +46,8 @@ struct CacheInfo { } }; -class PikaCacheLoadThread; class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this { public: - PikaCache(int zset_cache_start_pos, int zset_cache_field_num_per_key); ~PikaCache(); @@ -60,24 +59,21 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this< int CacheStatus(void); void ClearHitRatio(void); // Normal Commands - void Info(CacheInfo &info); - long long DbSize(void); + void Info(CacheInfo& info); bool Exists(std::string& key); void FlushDB(void); - void ActiveExpireCycle(); void ProcessCronTask(void); - rocksdb::Status Del(const std::vector &keys); + rocksdb::Status Del(const std::vector& keys); rocksdb::Status Expire(std::string& key, int64_t ttl); rocksdb::Status Expireat(std::string& key, int64_t ttl); rocksdb::Status TTL(std::string& key, int64_t* ttl); rocksdb::Status Persist(std::string& key); rocksdb::Status Type(std::string& key, std::string* value); - rocksdb::Status RandomKey(std::string *key); + rocksdb::Status RandomKey(std::string* key); // String Commands rocksdb::Status Set(std::string& key, std::string& value, int64_t ttl); - rocksdb::Status SetWithoutTTL(std::string& key, std::string& value); rocksdb::Status Setnx(std::string& key, std::string& value, int64_t ttl); rocksdb::Status SetnxWithoutTTL(std::string& key, std::string& value); rocksdb::Status Setxx(std::string& key, std::string& value, int64_t ttl); @@ -138,7 +134,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this< rocksdb::Status SAddnx(std::string& key, std::vector& members, int64_t ttl); rocksdb::Status SAddnxWithoutTTL(std::string& key, std::vector& members); rocksdb::Status SCard(std::string& key, uint64_t* len); - rocksdb::Status SIsmember(std::string& key, std::string &member); + rocksdb::Status SIsmember(std::string& key, std::string& member); rocksdb::Status SMembers(std::string& key, std::vector* members); rocksdb::Status SRem(std::string& key, std::vector& members); rocksdb::Status SRandmember(std::string& key, int64_t count, std::vector* members); @@ -151,12 +147,12 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this< rocksdb::Status ZCard(std::string& key, uint32_t* len, const std::shared_ptr& db); rocksdb::Status ZCount(std::string& key, std::string& min, std::string& max, uint64_t* len, ZCountCmd* cmd); rocksdb::Status ZIncrby(std::string& key, std::string& member, double increment); - rocksdb::Status ZIncrbyIfKeyExist(std::string& key, std::string &member, double increment, ZIncrbyCmd* cmd); + rocksdb::Status ZIncrbyIfKeyExist(std::string& key, std::string& member, double increment, ZIncrbyCmd* cmd); rocksdb::Status ZRange(std::string& key, int64_t start, int64_t stop, std::vector* score_members, const std::shared_ptr& db); rocksdb::Status ZRangebyscore(std::string& key, std::string& min, std::string& max, std::vector* score_members, ZRangebyscoreCmd* cmd); - rocksdb::Status ZRank(std::string& key, std::string &member, int64_t* rank, const std::shared_ptr& db); + rocksdb::Status ZRank(std::string& key, std::string& member, int64_t* rank, const std::shared_ptr& db); rocksdb::Status ZRem(std::string& key, std::vector& members, std::shared_ptr db); rocksdb::Status ZRemrangebyrank(std::string& key, std::string& min, std::string& max, int32_t ele_deleted = 0, const std::shared_ptr& db = nullptr); @@ -167,8 +163,8 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this< std::vector* score_members, ZRevrangebyscoreCmd* cmd); rocksdb::Status ZRevrangebylex(std::string& key, std::string& min, std::string& max, std::vector* members, const std::shared_ptr& db); - rocksdb::Status ZRevrank(std::string& key, std::string &member, int64_t *rank, const std::shared_ptr& db); - rocksdb::Status ZScore(std::string& key, std::string &member, double *score, const std::shared_ptr& db); + rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t *rank, const std::shared_ptr& db); + rocksdb::Status ZScore(std::string& key, std::string& member, double* score, const std::shared_ptr& db); rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector* members, const std::shared_ptr& db); rocksdb::Status ZLexcount(std::string& key, std::string& min, std::string& max, uint64_t* len, const std::shared_ptr& db); diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index 1223f696b2..0df2d51dbd 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -52,7 +52,6 @@ class PikaClientConn : public net::RedisConn { std::shared_ptr resp_ptr; LogOffset offset; std::string db_name; - uint32_t slot_id; }; struct TxnStateBitMask { @@ -89,33 +88,30 @@ class PikaClientConn : public net::RedisConn { void BatchExecRedisCmd(const std::vector& argvs); int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; } static void DoBackgroundTask(void* arg); - static void DoExecTask(void* arg); bool IsPubSub() { return is_pubsub_; } void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; } void SetCurrentDb(const std::string& db_name) { current_db_ = db_name; } - const std::string& GetCurrentTable() override { return current_db_; } void SetWriteCompleteCallback(WriteCompleteCallback cb) { write_completed_cb_ = std::move(cb); } + const std::string& GetCurrentTable() override { return current_db_; } // Txn - void PushCmdToQue(std::shared_ptr cmd); std::queue> GetTxnCmdQue(); + void PushCmdToQue(std::shared_ptr cmd); void ClearTxnCmdQue(); - bool IsInTxn(); - bool IsTxnFailed(); - bool IsTxnInitFailed(); - bool IsTxnWatchFailed(); - bool IsTxnExecing(void); void SetTxnWatchFailState(bool is_failed); void SetTxnInitFailState(bool is_failed); void SetTxnStartState(bool is_start); - - void AddKeysToWatch(const std::vector &db_keys); + void AddKeysToWatch(const std::vector& db_keys); void RemoveWatchedKeys(); - void SetTxnFailedFromKeys(const std::vector &db_keys); + void SetTxnFailedFromKeys(const std::vector& db_keys); void SetAllTxnFailed(); void SetTxnFailedFromDBs(std::string db_name); void ExitTxn(); + bool IsInTxn(); + bool IsTxnInitFailed(); + bool IsTxnWatchFailed(); + bool IsTxnExecing(void); net::ServerThread* server_thread() { return server_thread_; } diff --git a/include/pika_command.h b/include/pika_command.h index 9e6badd1c3..c5185094dc 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -77,6 +77,7 @@ const std::string kCmdNameSlotsMgrtSlotAsync = "slotsmgrtslot-async"; const std::string kCmdNameSlotsMgrtExecWrapper = "slotsmgrt-exec-wrapper"; const std::string kCmdNameSlotsMgrtAsyncStatus = "slotsmgrt-async-status"; const std::string kCmdNameSlotsMgrtAsyncCancel = "slotsmgrt-async-cancel"; + // Kv const std::string kCmdNameSet = "set"; const std::string kCmdNameGet = "get"; @@ -230,22 +231,19 @@ const std::string kCmdNameUnSubscribe = "unsubscribe"; const std::string kCmdNamePubSub = "pubsub"; const std::string kCmdNamePSubscribe = "psubscribe"; const std::string kCmdNamePUnSubscribe = "punsubscribe"; - const std::string kClusterPrefix = "pkcluster"; + using PikaCmdArgsType = net::RedisCmdArgsType; static const int RAW_ARGS_LEN = 1024 * 1024; enum CmdFlagsMask { kCmdFlagsMaskRW = 1, - kCmdFlagsMaskType = 30, kCmdFlagsMaskLocal = 32, kCmdFlagsMaskSuspend = 64, - kCmdFlagsMaskPrior = 128, kCmdFlagsMaskAdminRequire = 256, kCmdFlagsMaskDoThrouhDB = 4096, kCmdFlagsMaskReadCache = 128, kCmdFlagsMaskUpdateCache = 2048, - kCmdFlagsMaskSlot = 1536, }; enum CmdFlags { diff --git a/include/pika_conf.h b/include/pika_conf.h index 9eb489d29c..a0966e00b4 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -220,6 +220,10 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return small_compaction_threshold_; } + int small_compaction_duration_threshold() { + std::shared_lock l(rwlock_); + return small_compaction_duration_threshold_; + } int max_background_flushes() { std::shared_lock l(rwlock_); return max_background_flushes_; @@ -425,6 +429,11 @@ class PikaConf : public pstd::BaseConf { TryPushDiffCommands("small-compaction-threshold", std::to_string(value)); small_compaction_threshold_ = value; } + void SetSmallCompactionDurationThreshold(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("small-compaction-duration-threshold", std::to_string(value)); + small_compaction_duration_threshold_ = value; + } void SetMaxClientResponseSize(const int value) { std::lock_guard l(rwlock_); TryPushDiffCommands("max-client-response-size", std::to_string(value)); @@ -477,7 +486,7 @@ class PikaConf : public pstd::BaseConf { } void SetSlotMigrate(const std::string &value) { std::lock_guard l(rwlock_); - slotmigrate_ = (value == "yes") ? true : false; + slotmigrate_ = (value == "yes"); } void SetExpireLogsNums(const int value) { std::lock_guard l(rwlock_); @@ -602,32 +611,31 @@ class PikaConf : public pstd::BaseConf { max_rsync_parallel_num_ = value; } + int64_t cache_maxmemory() { return cache_maxmemory_; } void SetCacheType(const std::string &value); void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; } int zset_cache_start_pos() { return zset_cache_start_pos_; } int zset_cache_field_num_per_key() { return zset_cache_field_num_per_key_; } - int64_t cache_maxmemory() { return cache_maxmemory_; } int cache_maxmemory_policy() { return cache_maxmemory_policy_; } int cache_maxmemory_samples() { return cache_maxmemory_samples_; } int cache_lfu_decay_time() { return cache_lfu_decay_time_; } - int Load(); int ConfigRewrite(); int ConfigRewriteReplicationID(); - private: + private: int port_ = 0; - std::string slaveof_; int slave_priority_ = 0; int thread_num_ = 0; int thread_pool_size_ = 0; int sync_thread_num_ = 0; + int expire_dump_days_ = 3; + int db_sync_speed_ = 0; + std::string slaveof_; std::string log_path_; std::string log_level_; std::string db_path_; std::string db_sync_path_; - int expire_dump_days_ = 3; - int db_sync_speed_ = 0; std::string compact_cron_; std::string compact_interval_; int64_t resume_check_interval_ = 60; // seconds @@ -675,6 +683,7 @@ class PikaConf : public pstd::BaseConf { int max_cache_statistic_keys_ = 0; int small_compaction_threshold_ = 0; + int small_compaction_duration_threshold_ = 0; int max_background_flushes_ = 0; int max_background_compactions_ = 0; int max_background_jobs_ = 0; @@ -712,10 +721,11 @@ class PikaConf : public pstd::BaseConf { int binlog_file_size_ = 0; // cache + std::vector cache_type_; + std::atomic_bool tmp_cache_disable_flag_; + std::atomic_int64_t cache_maxmemory_; std::atomic_int cache_num_; std::atomic_int cache_model_; - std::atomic_bool tmp_cache_disable_flag_; - std::vector cache_type_; std::atomic_int cache_string_; std::atomic_int cache_set_; std::atomic_int cache_zset_; @@ -724,25 +734,22 @@ class PikaConf : public pstd::BaseConf { std::atomic_int cache_bit_; std::atomic_int zset_cache_start_pos_; std::atomic_int zset_cache_field_num_per_key_; - std::atomic_int64_t cache_maxmemory_; std::atomic_int cache_maxmemory_policy_; std::atomic_int cache_maxmemory_samples_; std::atomic_int cache_lfu_decay_time_; - // rocksdb blob bool enable_blob_files_ = false; - int64_t min_blob_size_ = 4096; // 4K - int64_t blob_file_size_ = 256 * 1024 * 1024; // 256M - std::string blob_compression_type_ = "none"; bool enable_blob_garbage_collection_ = false; double blob_garbage_collection_age_cutoff_ = 0.25; double blob_garbage_collection_force_threshold_ = 1.0; + int64_t min_blob_size_ = 4096; // 4K int64_t blob_cache_ = 0; int64_t blob_num_shard_bits_ = 0; + int64_t blob_file_size_ = 256 * 1024 * 1024; // 256M + std::string blob_compression_type_ = "none"; std::unique_ptr local_meta_; - std::shared_mutex rwlock_; // Rsync Rate limiting configuration diff --git a/include/pika_consensus.h b/include/pika_consensus.h index 6c68aa249c..8cd23ee1b3 100644 --- a/include/pika_consensus.h +++ b/include/pika_consensus.h @@ -21,7 +21,6 @@ class Context : public pstd::noncopyable { pstd::Status Init(); // RWLock should be held when access members. pstd::Status StableSave(); - void PrepareUpdateAppliedIndex(const LogOffset& offset); void UpdateAppliedIndex(const LogOffset& offset); void Reset(const LogOffset& offset); @@ -79,7 +78,6 @@ class MemLog { logs_.push_back(item); last_offset_ = item.offset; } - pstd::Status PurgeLogs(const LogOffset& offset, std::vector* logs); pstd::Status TruncateTo(const LogOffset& offset); void Reset(const LogOffset& offset); @@ -136,11 +134,6 @@ class ConsensusCoordinator { return committed_index_; } - LogOffset applied_index() { - std::shared_lock lock(context_->rwlock_); - return context_->applied_index_; - } - std::shared_ptr context() { return context_; } // redis parser cb @@ -203,7 +196,6 @@ class ConsensusCoordinator { uint32_t term_ = 0; std::string db_name_; - uint32_t slot_id_ = 0; SyncProgress sync_pros_; std::shared_ptr stable_logger_; diff --git a/include/pika_data_distribution.h b/include/pika_data_distribution.h index ead040881e..7f8d494fe0 100644 --- a/include/pika_data_distribution.h +++ b/include/pika_data_distribution.h @@ -17,14 +17,12 @@ class PikaDataDistribution { virtual ~PikaDataDistribution() = default; // Initialization virtual void Init() = 0; - virtual uint32_t Distribute(const std::string& str) = 0; }; class HashModulo : public PikaDataDistribution { public: ~HashModulo() override = default; void Init() override; - uint32_t Distribute(const std::string& str) override; }; #endif diff --git a/include/pika_db.h b/include/pika_db.h index ed431fe6f2..c7828a5722 100644 --- a/include/pika_db.h +++ b/include/pika_db.h @@ -91,7 +91,6 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { friend class PikaServer; std::string GetDBName(); - bool FlushSubDB(const std::string& db_name); std::shared_ptr storage() const; void GetBgSaveMetaData(std::vector* fileNames, std::string* snapshot_uuid); void BgSaveDB(); @@ -122,17 +121,11 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { void StopKeyScan(); void ScanDatabase(const storage::DataType& type); KeyScanInfo GetKeyScanInfo(); - pstd::Status GetSlotsKeyScanInfo(std::map* infos); // Compact use; void Compact(const storage::DataType& type); void CompactRange(const storage::DataType& type, const std::string& start, const std::string& end); - void LeaveAllSlot(); - std::set GetSlotIDs(); - bool DBIsEmpty(); - pstd::Status MovetoToTrash(const std::string& path); - pstd::Status Leave(); std::shared_ptr LockMgr(); void DbRWLockWriter(); void DbRWLockReader(); @@ -150,7 +143,7 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { * FlushDB & FlushSubDB use */ bool FlushDB(); - //bool FlushSubDB(const std::string& db_name); + bool FlushSubDB(const std::string& db_name); bool FlushDBWithoutLock(); bool FlushSubDBWithoutLock(const std::string& db_name); bool ChangeDb(const std::string& new_path); @@ -158,12 +151,11 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { void PrepareRsync(); bool IsBgSaving(); BgSaveInfo bgsave_info(); - pstd::Status GetKeyNum(std::vector* key_info); + private: bool opened_ = false; std::string dbsync_path_; - std::shared_mutex slots_rw_; std::string db_name_; std::string db_path_; std::string snapshot_uuid_; diff --git a/include/pika_define.h b/include/pika_define.h index 3e68b75879..3e47a933ad 100644 --- a/include/pika_define.h +++ b/include/pika_define.h @@ -34,7 +34,6 @@ class PikaServer; /* Port shift */ const int kPortShiftRSync = 1000; const int kPortShiftReplServer = 2000; -//TODO: Temporarily used for rsync server port shift. will be deleted. const int kPortShiftRsync2 = 10001; const std::string kPikaPidFile = "pika.pid"; const std::string kPikaSecretFile = "rsync.secret"; @@ -53,15 +52,6 @@ struct DBStruct { std::string db_name; }; -struct WorkerCronTask { - int task; - std::string ip_port; -}; -using MonitorCronTask = WorkerCronTask; -// task define -#define TASK_KILL 0 -#define TASK_KILLALL 1 - // slave item struct SlaveItem { std::string ip_port; @@ -91,7 +81,6 @@ const std::string ReplStateMsg[] = {"kNoConnect", "kTryConnect", "kTryDBSync", " enum DBState { INFREE = 0, - INBUSY = 1, }; struct LogicOffset { @@ -265,7 +254,7 @@ class RmNode : public Node { void SetSessionId(int32_t session_id) { session_id_ = session_id; } int32_t SessionId() const { return session_id_; } std::string ToString() const { - return "slot=" + DBName() + "_,ip_port=" + Ip() + ":" + + return "db=" + DBName() + "_,ip_port=" + Ip() + ":" + std::to_string(Port()) + ",session id=" + std::to_string(SessionId()); } void SetLastSendTime(uint64_t last_send_time) { last_send_time_ = last_send_time; } @@ -328,12 +317,6 @@ constexpr int PIKA_CACHE_READ = 1; #define PIKA_CACHE_SIZE_MIN 536870912 // 512M #define PIKA_CACHE_SIZE_DEFAULT 10737418240 // 10G -/* - * The size of Binlogfile - */ -// static uint64_t kBinlogSize = 128; -// static const uint64_t kBinlogSize = 1024 * 1024 * 100; - enum RecordType { kZeroType = 0, kFullType = 1, @@ -371,13 +354,11 @@ const std::string kContext = "context"; /* * define common character - * */ #define COMMA ',' /* * define reply between master and slave - * */ const std::string kInnerReplOk = "ok"; const std::string kInnerReplWait = "wait"; @@ -411,6 +392,7 @@ const int PIKA_CACHE_STATUS_DESTROY = 4; const int PIKA_CACHE_STATUS_CLEAR = 5; const int CACHE_START_FROM_BEGIN = 0; const int CACHE_START_FROM_END = -1; + /* * key type */ @@ -420,7 +402,6 @@ const char PIKA_KEY_TYPE_LIST = 'l'; const char PIKA_KEY_TYPE_SET = 's'; const char PIKA_KEY_TYPE_ZSET = 'z'; - /* * cache task type */ diff --git a/include/pika_geo.h b/include/pika_geo.h index 6ab441eb54..f495f01275 100644 --- a/include/pika_geo.h +++ b/include/pika_geo.h @@ -5,6 +5,7 @@ #ifndef PIKA_GEO_H_ #define PIKA_GEO_H_ + #include "include/pika_db.h" #include "include/pika_command.h" #include "storage/storage.h" diff --git a/include/pika_geohash.h b/include/pika_geohash.h index 8a184eae4b..1ba348515e 100644 --- a/include/pika_geohash.h +++ b/include/pika_geohash.h @@ -43,7 +43,7 @@ extern "C" { #define RANGEISZERO(r) (!(r).max && !(r).min) #define RANGEPISZERO(r) ((r) == nullptr || RANGEISZERO(*(r))) -#define GEO_STEP_MAX 26 /* 26*2 = 52 bits. */ +#define GEO_STEP_MAX 26 /* 26 * 2 = 52 bits. */ /* Limits from EPSG:900913 / EPSG:3785 / OSGEO:41001 */ constexpr double GEO_LAT_MIN{-85.05112878}; diff --git a/include/pika_hash.h b/include/pika_hash.h index fdfdc7e031..8b08d7ce35 100644 --- a/include/pika_hash.h +++ b/include/pika_hash.h @@ -5,10 +5,10 @@ #ifndef PIKA_HASH_H_ #define PIKA_HASH_H_ -#include "include/pika_db.h" -#include "storage/storage.h" #include "include/pika_command.h" +#include "include/pika_db.h" +#include "storage/storage.h" /* * hash diff --git a/include/pika_kv.h b/include/pika_kv.h index 3b380b1d0a..f8232f222a 100644 --- a/include/pika_kv.h +++ b/include/pika_kv.h @@ -28,6 +28,7 @@ class SetCmd : public Cmd { void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SetCmd(*this); } + private: std::string key_; std::string value_; diff --git a/include/pika_list.h b/include/pika_list.h index 403ed395d4..76aeda4969 100644 --- a/include/pika_list.h +++ b/include/pika_list.h @@ -85,7 +85,6 @@ class LLenCmd : public Cmd { class BlockingBaseCmd : public Cmd { public: BlockingBaseCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - //blpop/brpop used start struct WriteBinlogOfPopArgs{ BlockKeyType block_type; @@ -300,6 +299,7 @@ class BRPopCmd final : public BlockingBaseCmd { virtual Cmd* Clone() override { return new BRPopCmd(*this); } void DoInitial() override; void DoBinlog(const std::shared_ptr& db) override; + private: std::vector keys_; int64_t expire_time_{0}; diff --git a/include/pika_monotonic_time.h b/include/pika_monotonic_time.h new file mode 100644 index 0000000000..909fadfaec --- /dev/null +++ b/include/pika_monotonic_time.h @@ -0,0 +1,20 @@ +// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef PIKA_MONOTONIC_TIME_H +#define PIKA_MONOTONIC_TIME_H + +#include + +/* A counter in micro-seconds. The 'monotime' type is provided for variables + * holding a monotonic time. This will help distinguish & document that the + * variable is associated with the monotonic clock and should not be confused + * with other types of time.*/ +using monotime = uint64_t; + +// Get monotonic time in microseconds +monotime getMonotonicUs(); + +#endif // PIKA_MONOTONIC_TIME_H \ No newline at end of file diff --git a/include/pika_repl_client.h b/include/pika_repl_client.h index dd150b4ea5..2389f35978 100644 --- a/include/pika_repl_client.h +++ b/include/pika_repl_client.h @@ -36,7 +36,7 @@ struct ReplClientWriteBinlogTaskArg { std::shared_ptr conn; void* res_private_data; PikaReplBgWorker* worker; - ReplClientWriteBinlogTaskArg(const std::shared_ptr& _res, + ReplClientWriteBinlogTaskArg(const std::shared_ptr& _res, const std::shared_ptr& _conn, void* _res_private_data, PikaReplBgWorker* _worker) : res(_res), conn(_conn), res_private_data(_res_private_data), worker(_worker) {} diff --git a/include/pika_repl_client_conn.h b/include/pika_repl_client_conn.h index 6477f007a2..b9143e9962 100644 --- a/include/pika_repl_client_conn.h +++ b/include/pika_repl_client_conn.h @@ -38,13 +38,6 @@ class PikaReplClientConn : public net::PbConn { private: // dispatch binlog by its table_name + slot void DispatchBinlogRes(const std::shared_ptr& response); - - struct ReplRespArg { - std::shared_ptr resp; - std::shared_ptr conn; - ReplRespArg(std::shared_ptr _resp, std::shared_ptr _conn) - : resp(std::move(_resp)), conn(std::move(_conn)) {} - }; }; #endif diff --git a/include/pika_repl_client_thread.h b/include/pika_repl_client_thread.h index 92c29ee0a2..fe8213b090 100644 --- a/include/pika_repl_client_thread.h +++ b/include/pika_repl_client_thread.h @@ -18,7 +18,6 @@ class PikaReplClientThread : public net::ClientThread { public: PikaReplClientThread(int cron_interval, int keepalive_timeout); ~PikaReplClientThread() override = default; - int Start(); private: class ReplClientConnFactory : public net::ConnFactory { @@ -36,11 +35,6 @@ class PikaReplClientThread : public net::ClientThread { void FdTimeoutHandle(int fd, const std::string& ip_port) const override; void FdClosedHandle(int fd, const std::string& ip_port) const override; bool AccessHandle(std::string& ip) const override { - // ban 127.0.0.1 if you want to test this routine - // if (ip.find("127.0.0.2") != std::string::npos) { - // std::cout << "AccessHandle " << ip << std::endl; - // return false; - // } return true; } int CreateWorkerSpecificData(void** data) const override { return 0; } diff --git a/include/pika_repl_server.h b/include/pika_repl_server.h index 68daf1ee4d..4a12f99cb9 100644 --- a/include/pika_repl_server.h +++ b/include/pika_repl_server.h @@ -32,10 +32,10 @@ class PikaReplServer { int Stop(); pstd::Status SendSlaveBinlogChips(const std::string& ip, int port, const std::vector& tasks); - void BuildBinlogOffset(const LogOffset& offset, InnerMessage::BinlogOffset* boffset); - void BuildBinlogSyncResp(const std::vector& tasks, InnerMessage::InnerResponse* resp); pstd::Status Write(const std::string& ip, int port, const std::string& msg); + void BuildBinlogOffset(const LogOffset& offset, InnerMessage::BinlogOffset* boffset); + void BuildBinlogSyncResp(const std::vector& tasks, InnerMessage::InnerResponse* resp); void Schedule(net::TaskFunc func, void* arg); void UpdateClientConnMap(const std::string& ip_port, int fd); void RemoveClientConn(int fd); @@ -44,7 +44,6 @@ class PikaReplServer { private: std::unique_ptr server_tp_ = nullptr; std::unique_ptr pika_repl_server_thread_ = nullptr; - std::shared_mutex client_conn_rwlock_; std::map client_conn_map_; }; diff --git a/include/pika_repl_server_thread.h b/include/pika_repl_server_thread.h index 013dd869a0..c4e356839b 100644 --- a/include/pika_repl_server_thread.h +++ b/include/pika_repl_server_thread.h @@ -14,12 +14,8 @@ class PikaReplServerThread : public net::HolyThread { public: PikaReplServerThread(const std::set& ips, int port, int cron_interval); ~PikaReplServerThread() override = default; - int ListenPort(); - // for ProcessBinlogData use - uint64_t GetnPlusSerial() { return serial_++; } - private: class ReplServerConnFactory : public net::ConnFactory { public: diff --git a/include/pika_rm.h b/include/pika_rm.h index 35686c902c..326e55d265 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -35,9 +35,7 @@ class SyncDB { public: SyncDB(const std::string& db_name); virtual ~SyncDB() = default; - DBInfo& SyncDBInfo() { return db_info_; } - std::string DBName(); protected: @@ -49,33 +47,25 @@ class SyncMasterDB : public SyncDB { SyncMasterDB(const std::string& db_name); pstd::Status AddSlaveNode(const std::string& ip, int port, int session_id); pstd::Status RemoveSlaveNode(const std::string& ip, int port); - pstd::Status ActivateSlaveBinlogSync(const std::string& ip, int port, const LogOffset& offset); pstd::Status ActivateSlaveDbSync(const std::string& ip, int port); - pstd::Status SyncBinlogToWq(const std::string& ip, int port); - pstd::Status GetSlaveSyncBinlogInfo(const std::string& ip, int port, BinlogOffset* sent_offset, BinlogOffset* acked_offset); pstd::Status GetSlaveState(const std::string& ip, int port, SlaveState* slave_state); - pstd::Status SetLastRecvTime(const std::string& ip, int port, uint64_t time); - pstd::Status GetSafetyPurgeBinlog(std::string* safety_purge); - bool BinlogCloudPurge(uint32_t index); - pstd::Status WakeUpSlaveBinlogSync(); pstd::Status CheckSyncTimeout(uint64_t now); - + pstd::Status GetSlaveNodeSession(const std::string& ip, int port, int32_t* session); + void GetValidSlaveNames(std::vector* slavenames); int GetNumberOfSlaveNode(); + bool BinlogCloudPurge(uint32_t index); bool CheckSlaveNodeExist(const std::string& ip, int port); - pstd::Status GetSlaveNodeSession(const std::string& ip, int port, int32_t* session); - void GetValidSlaveNames(std::vector* slavenames); // display use pstd::Status GetInfo(std::string* info); // debug use std::string ToStringStatus(); - int32_t GenSessionId(); bool CheckSessionId(const std::string& ip, int port, const std::string& db_name, int session_id); @@ -112,45 +102,31 @@ class SyncMasterDB : public SyncDB { pstd::Mutex session_mu_; int32_t session_id_ = 0; - ConsensusCoordinator coordinator_; }; class SyncSlaveDB : public SyncDB { public: SyncSlaveDB(const std::string& db_name); - void Activate(const RmNode& master, const ReplState& repl_state); void Deactivate(); - void SetLastRecvTime(uint64_t time); - void SetReplState(const ReplState& repl_state); ReplState State(); - pstd::Status CheckSyncTimeout(uint64_t now); // For display pstd::Status GetInfo(std::string* info); // For debug std::string ToStringStatus(); - + std::string LocalIp(); + int32_t MasterSessionId(); const std::string& MasterIp(); - int MasterPort(); - void SetMasterSessionId(int32_t session_id); - - int32_t MasterSessionId(); - void SetLocalIp(const std::string& local_ip); - - std::string LocalIp(); - void StopRsync(); - void ActivateRsync(); - bool IsRsyncRunning() {return rsync_cli_->IsRunning();} private: @@ -165,14 +141,10 @@ class PikaReplicaManager { public: PikaReplicaManager(); ~PikaReplicaManager() = default; - friend Cmd; - void Start(); void Stop(); - bool CheckMasterSyncFinished(); - pstd::Status ActivateSyncSlaveDB(const RmNode& node, const ReplState& repl_state); // For Pika Repl Client Thread @@ -181,7 +153,7 @@ class PikaReplicaManager { pstd::Status SendTrySyncRequest(const std::string& db_name); pstd::Status SendDBSyncRequest(const std::string& db_name); pstd::Status SendBinlogSyncAckRequest(const std::string& table, const LogOffset& ack_start, - const LogOffset& ack_end, bool is_first_send = false); + const LogOffset& ack_end, bool is_first_send = false); pstd::Status CloseReplClientConn(const std::string& ip, int32_t port); // For Pika Repl Server Thread @@ -199,26 +171,20 @@ class PikaReplicaManager { // To check slot info // For pkcluster info command - - void FindCompleteReplica(std::vector* replica); + static bool CheckSlaveDBState(const std::string& ip, int port); void FindCommonMaster(std::string* master); - pstd::Status CheckDBRole(const std::string& table, int* role); - void RmStatus(std::string* debug_info); - - static bool CheckSlaveDBState(const std::string& ip, int port); - + pstd::Status CheckDBRole(const std::string& table, int* role); pstd::Status LostConnection(const std::string& ip, int port); // Update binlog win and try to send next binlog pstd::Status UpdateSyncBinlogStatus(const RmNode& slave, const LogOffset& offset_start, const LogOffset& offset_end); - pstd::Status WakeUpBinlogSync(); // write_queue related void ProduceWriteQueue(const std::string& ip, int port, std::string db_name, const std::vector& tasks); - int ConsumeWriteQueue(); void DropItemInWriteQueue(const std::string& ip, int port); + int ConsumeWriteQueue(); // Schedule Task void ScheduleReplServerBGTask(net::TaskFunc func, void* arg); @@ -227,11 +193,11 @@ class PikaReplicaManager { const std::shared_ptr& res, const std::shared_ptr& conn, void* res_private_data); void ScheduleWriteDBTask(const std::shared_ptr& cmd_ptr, const LogOffset& offset, const std::string& db_name); - void ReplServerRemoveClientConn(int fd); void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd); std::shared_mutex& GetDBLock() { return dbs_rw_; } + void DBLock() { dbs_rw_.lock(); } @@ -255,9 +221,9 @@ class PikaReplicaManager { std::unordered_map, hash_db_info> sync_slave_dbs_; pstd::Mutex write_queue_mu_; - // every host owns a queue, the key is "ip+port" - std::unordered_map>> write_queues_; + // every host owns a queue, the key is "ip + port" + std::unordered_map>> write_queues_; std::unique_ptr pika_repl_client_; std::unique_ptr pika_repl_server_; }; diff --git a/include/pika_server.h b/include/pika_server.h index 807859bb8a..bb891a652b 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -86,7 +86,6 @@ class PikaServer : public pstd::noncopyable { * Server init info */ bool ServerInit(); - void Start(); void Exit(); @@ -107,17 +106,17 @@ class PikaServer : public pstd::noncopyable { void SetForceFullSync(bool v); void SetDispatchQueueLimit(int queue_limit); storage::StorageOptions storage_options(); - uint64_t getMonotonicUs(); + /* - * Table use + * DB use */ void InitDBStruct(); - std::shared_ptr GetDB(const std::string& db_name); bool IsBgSaving(); bool IsKeyScaning(); bool IsCompacting(); bool IsDBExist(const std::string& db_name); bool IsDBBinlogIoError(const std::string& db_name); + std::shared_ptr GetDB(const std::string& db_name); pstd::Status DoSameThingSpecificDB(const std::set& dbs, const TaskArg& arg); std::shared_mutex& GetDBLock() { return dbs_rw_; @@ -174,8 +173,6 @@ class PikaServer : public pstd::noncopyable { void FinishMetaSync(); bool MetaSyncDone(); void ResetMetaSyncStatus(); - bool AllSlotConnectSuccess(); - bool LoopSlotStateMachine(); void SetLoopDBStateMachine(bool need_loop); int GetMetaSyncTimestamp(); void UpdateMetaSyncTimestamp(); @@ -242,22 +239,21 @@ class PikaServer : public pstd::noncopyable { */ void SlowlogTrim(); void SlowlogReset(); - uint32_t SlowlogLen(); void SlowlogObtain(int64_t number, std::vector* slowlogs); void SlowlogPushEntry(const std::vector& argv, int64_t time, int64_t duration); + uint32_t SlowlogLen(); /* * Statistic used */ - void ResetStat(); uint64_t ServerQueryNum(); uint64_t ServerCurrentQps(); uint64_t accumulative_connections(); + void ResetStat(); void incr_accumulative_connections(); void ResetLastSecQuerynum(); void UpdateQueryNumAndExecCountDB(const std::string& db_name, const std::string& command, bool is_write); std::unordered_map ServerExecCountDB(); - QpsStatistic ServerDBStat(const std::string& db_name); std::unordered_map ServerAllDBStat(); /* @@ -291,25 +287,23 @@ class PikaServer : public pstd::noncopyable { std::vector>* result); void PubSubChannels(const std::string& pattern, std::vector* result); void PubSubNumSub(const std::vector& channels, std::vector>* result); - pstd::Status GetCmdRouting(std::vector& redis_cmds, std::vector* dst, bool* all_local); // info debug use void ServerStatus(std::string* info); /* - * * Async migrate used + * Async migrate used */ int SlotsMigrateOne(const std::string& key, const std::shared_ptr &db); bool SlotsMigrateBatch(const std::string &ip, int64_t port, int64_t time_out, int64_t slots, int64_t keys_num, const std::shared_ptr& db); void GetSlotsMgrtSenderStatus(std::string *ip, int64_t *port, int64_t *slot, bool *migrating, int64_t *moved, int64_t *remained); bool SlotsMigrateAsyncCancel(); - std::shared_mutex bgsave_protector_; BgSaveInfo bgsave_info_; /* - * BGSlotsReload used + * BGSlotsReload used */ struct BGSlotsReload { bool reloading = false; @@ -358,7 +352,6 @@ class PikaServer : public pstd::noncopyable { } void Bgslotsreload(const std::shared_ptr& db); - /* * BGSlotsCleanup used */ @@ -388,7 +381,6 @@ class PikaServer : public pstd::noncopyable { BGSlotsCleanup bgslots_cleanup_; net::BGThread bgslots_cleanup_thread_; - BGSlotsCleanup bgslots_cleanup() { std::lock_guard ml(bgsave_protector_); return bgslots_cleanup_; @@ -432,7 +424,6 @@ class PikaServer : public pstd::noncopyable { */ std::unordered_map* GetCommandStatMap(); - /* * Instantaneous Metric used */ @@ -459,21 +450,23 @@ class PikaServer : public pstd::noncopyable { std::unique_ptr conf; bool reenable_cache; }; + /* * Cache used */ + static void DoCacheBGTask(void* arg); void ResetCacheAsync(uint32_t cache_num, std::shared_ptr db, cache::CacheConfig *cache_cfg = nullptr); void ClearCacheDbAsync(std::shared_ptr db); void ClearCacheDbAsyncV2(std::shared_ptr db); void ResetCacheConfig(std::shared_ptr db); void ClearHitRatio(std::shared_ptr db); void OnCacheStartPosChanged(int zset_cache_start_pos, std::shared_ptr db); - static void DoCacheBGTask(void* arg); void UpdateCacheInfo(void); void ResetDisplayCacheInfo(int status, std::shared_ptr db); void CacheConfigInit(cache::CacheConfig &cache_cfg); void ProcessCronTask(); double HitRatio(); + private: /* * TimingTask use @@ -497,7 +490,7 @@ class PikaServer : public pstd::noncopyable { std::timed_mutex exit_mutex_; /* - * Table used + * DB used */ std::atomic db_state_; std::shared_mutex dbs_rw_; @@ -597,10 +590,9 @@ class PikaServer : public pstd::noncopyable { Statistic statistic_; /* - * Info Commandstats used - */ + * Info Commandstats used + */ std::unordered_map cmdstat_map_; - net::BGThread common_bg_thread_; /* diff --git a/include/pika_slaveping_thread.h b/include/pika_slaveping_thread.h index 35b726fb12..a79200782e 100644 --- a/include/pika_slaveping_thread.h +++ b/include/pika_slaveping_thread.h @@ -33,10 +33,8 @@ class PikaSlavepingThread : public net::Thread { private: int64_t sid_ = 0; bool is_first_send_ = true; - int sockfd_ = -1; net::NetCli* cli_ = nullptr; - virtual void* ThreadMain(); }; diff --git a/include/pika_slot_command.h b/include/pika_slot_command.h index d5218b0b0b..405b07d449 100644 --- a/include/pika_slot_command.h +++ b/include/pika_slot_command.h @@ -22,13 +22,13 @@ extern uint32_t CRC32CheckSum(const char* buf, int len); int GetSlotID(const std::string &str); int GetKeyType(const std::string& key, std::string& key_type, const std::shared_ptr& db); +int DeleteKey(const std::string& key, const char key_type, const std::shared_ptr& db); +int GetSlotsID(const std::string& str, uint32_t* pcrc, int* phastag); void AddSlotKey(const std::string& type, const std::string& key, const std::shared_ptr& db); void RemSlotKey(const std::string& key, const std::shared_ptr& db); -int DeleteKey(const std::string& key, const char key_type, const std::shared_ptr& db); +void RemSlotKeyByType(const std::string& type, const std::string& key, const std::shared_ptr& db); std::string GetSlotKey(int slot); std::string GetSlotsTagKey(uint32_t crc); -int GetSlotsID(const std::string& str, uint32_t* pcrc, int* phastag); -void RemSlotKeyByType(const std::string& type, const std::string& key, const std::shared_ptr& db); class PikaMigrate { public: @@ -53,16 +53,13 @@ class PikaMigrate { private: std::map migrate_clients_; pstd::Mutex mutex_; - void KillMigrateClient(net::NetCli* migrate_cli); void KillAllMigrateClient(); - + int64_t TTLByType(const char key_type, const std::string& key, const std::shared_ptr& db); int MigrateSend(net::NetCli* migrate_cli, const std::string& key, const char type, std::string& detail, const std::shared_ptr& db); bool MigrateRecv(net::NetCli* migrate_cli, int need_receive, std::string& detail); - int ParseKey(const std::string& key, const char type, std::string& wbuf_str, const std::shared_ptr& db); - int64_t TTLByType(const char key_type, const std::string& key, const std::shared_ptr& db); int ParseKKey(const std::string& key, std::string& wbuf_str, const std::shared_ptr& db); int ParseZKey(const std::string& key, std::string& wbuf_str, const std::shared_ptr& db); int ParseSKey(const std::string& key, std::string& wbuf_str, const std::shared_ptr& db); @@ -78,13 +75,13 @@ class SlotsMgrtTagSlotCmd : public Cmd { void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtTagSlotCmd(*this); } + private: std::string dest_ip_; int64_t dest_port_ = 0; int64_t timeout_ms_ = 60; int64_t slot_id_ = 0; std::basic_string, std::allocator> key_; - void DoInitial() override; }; @@ -95,6 +92,7 @@ class SlotsMgrtTagSlotAsyncCmd : public Cmd { void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtTagSlotAsyncCmd(*this); } + private: std::string dest_ip_; int64_t dest_port_ = 0; @@ -103,7 +101,6 @@ class SlotsMgrtTagSlotAsyncCmd : public Cmd { int64_t max_bytes_ = 0; int64_t slot_id_ = 0; int64_t keys_num_ = 0; - void DoInitial() override; }; @@ -114,6 +111,7 @@ class SlotsMgrtTagOneCmd : public Cmd { void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtTagOneCmd(*this); } + private: std::string dest_ip_; int64_t dest_port_ = 0; @@ -144,6 +142,7 @@ class SlotsInfoCmd : public Cmd { void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsInfoCmd(*this); } + private: void DoInitial() override; @@ -158,6 +157,7 @@ class SlotsMgrtAsyncCancelCmd : public Cmd { void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtAsyncCancelCmd(*this); } + private: void DoInitial() override; }; @@ -169,6 +169,7 @@ class SlotsDelCmd : public Cmd { void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsDelCmd(*this); } + private: std::vector slots_; void DoInitial() override; @@ -181,6 +182,7 @@ class SlotsHashKeyCmd : public Cmd { void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsHashKeyCmd(*this); } + private: std::vector keys_; void DoInitial() override; @@ -193,6 +195,7 @@ class SlotsScanCmd : public Cmd { void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsScanCmd(*this); } + private: std::string key_; std::string pattern_ = "*"; @@ -217,6 +220,7 @@ class SlotsMgrtExecWrapperCmd : public Cmd { void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtExecWrapperCmd(*this); } + private: std::string key_; std::vector args; @@ -231,6 +235,7 @@ class SlotsReloadCmd : public Cmd { void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsReloadCmd(*this); } + private: void DoInitial() override; }; @@ -242,6 +247,7 @@ class SlotsReloadOffCmd : public Cmd { void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsReloadOffCmd(*this); } + private: void DoInitial() override; }; @@ -254,6 +260,7 @@ class SlotsCleanupCmd : public Cmd { void Merge() override {}; Cmd* Clone() override { return new SlotsCleanupCmd(*this); } std::vector cleanup_slots_; + private: void DoInitial() override; }; @@ -265,6 +272,7 @@ class SlotsCleanupOffCmd : public Cmd { void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsCleanupOffCmd(*this); } + private: void DoInitial() override; }; diff --git a/include/pika_statistic.h b/include/pika_statistic.h index c6d03d2e3e..dcfe97d652 100644 --- a/include/pika_statistic.h +++ b/include/pika_statistic.h @@ -16,9 +16,7 @@ class QpsStatistic { QpsStatistic(); QpsStatistic(const QpsStatistic& other); ~QpsStatistic() = default; - void IncreaseQueryNum(bool is_write); - void ResetLastSecQuerynum(); std::atomic querynum; diff --git a/include/pika_transaction.h b/include/pika_transaction.h index 2c8f3594ba..623392af69 100644 --- a/include/pika_transaction.h +++ b/include/pika_transaction.h @@ -32,6 +32,7 @@ class ExecCmd : public Cmd { void Split(std::shared_ptr db, const HintKeys& hint_keys) override {} void Merge() override {} std::vector current_key() const override { return {}; } + private: struct CmdInfo { public: @@ -63,6 +64,7 @@ class DiscardCmd : public Cmd { Cmd* Clone() override { return new DiscardCmd(*this); } void Split(std::shared_ptr db, const HintKeys& hint_keys) override {} void Merge() override {} + private: void DoInitial() override; }; @@ -70,13 +72,13 @@ class DiscardCmd : public Cmd { class WatchCmd : public Cmd { public: WatchCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr db) override; void Execute() override; void Split(std::shared_ptr db, const HintKeys& hint_keys) override {} Cmd* Clone() override { return new WatchCmd(*this); } void Merge() override {} std::vector current_key() const override { return keys_; } + private: void DoInitial() override; std::vector keys_; @@ -86,11 +88,11 @@ class WatchCmd : public Cmd { class UnwatchCmd : public Cmd { public: UnwatchCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr db) override; Cmd* Clone() override { return new UnwatchCmd(*this); } void Split(std::shared_ptr db, const HintKeys& hint_keys) override {} void Merge() override {} + private: void DoInitial() override; }; diff --git a/include/pika_zset.h b/include/pika_zset.h index 863840f987..2d1770bcc6 100644 --- a/include/pika_zset.h +++ b/include/pika_zset.h @@ -8,7 +8,6 @@ #include "include/pika_command.h" #include "pika_kv.h" -#include "storage/storage.h" /* * zset @@ -160,13 +159,13 @@ class ZRevrangeCmd : public ZsetRangeParentCmd { class ZsetRangebyscoreParentCmd : public Cmd { public: ZsetRangebyscoreParentCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - double MinScore() { return min_score_; } double MaxScore() { return max_score_; } bool LeftClose() { return left_close_; } bool RightClose() { return right_close_; } int64_t Offset() { return offset_; } int64_t Count() { return count_; } + protected: std::string key_; std::string min_, max_; @@ -285,7 +284,6 @@ class ZsetUIstoreParentCmd : public Cmd { ZsetUIstoreParentCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) { zadd_cmd_ = std::make_unique(kCmdNameZAdd, -4, kCmdFlagsWrite | kCmdFlagsSingleDB | kCmdFlagsZset); - //del_cmd_ = std::make_unique(kCmdNameDel, -2, kCmdFlagsWrite | kCmdFlagsMultiDB | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache); } ZsetUIstoreParentCmd(const ZsetUIstoreParentCmd& other) : Cmd(other), @@ -295,12 +293,12 @@ class ZsetUIstoreParentCmd : public Cmd { keys_(other.keys_), weights_(other.weights_) { zadd_cmd_ = std::make_unique(kCmdNameZAdd, -4, kCmdFlagsWrite | kCmdFlagsSingleDB | kCmdFlagsZset); - //del_cmd_ = std::make_unique(kCmdNameDel, -2, kCmdFlagsWrite | kCmdFlagsMultiDB | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache); } std::vector current_key() const override { return {dest_key_}; } + protected: std::string dest_key_; int64_t num_keys_ = 0; @@ -311,7 +309,6 @@ class ZsetUIstoreParentCmd : public Cmd { void Clear() override { aggregate_ = storage::SUM; } //used for write binlog std::shared_ptr zadd_cmd_; - std::shared_ptr del_cmd_; }; class ZUnionstoreCmd : public ZsetUIstoreParentCmd { diff --git a/include/rsync_client.h b/include/rsync_client.h index f13e967885..e2dcbea6ef 100644 --- a/include/rsync_client.h +++ b/include/rsync_client.h @@ -43,7 +43,6 @@ class WaitObjectManager; using pstd::Status; - class RsyncClient : public net::Thread { public: enum State { @@ -81,7 +80,6 @@ class RsyncClient : public net::Thread { private: typedef std::unique_ptr NetThreadUPtr; - std::map meta_table_; std::set file_set_; std::string snapshot_uuid_; @@ -104,7 +102,7 @@ class RsyncClient : public net::Thread { }; class RsyncWriter { -public: + public: RsyncWriter(const std::string& filepath) { filepath_ = filepath; fd_ = open(filepath.c_str(), O_RDWR | O_APPEND | O_CREAT, 0644); @@ -138,13 +136,13 @@ class RsyncWriter { return Status::OK(); } -private: + private: std::string filepath_; int fd_ = -1; }; class WaitObject { -public: + public: WaitObject() : filename_(""), type_(RsyncService::kRsyncMeta), offset_(0), resp_(nullptr) {} ~WaitObject() {} @@ -182,7 +180,7 @@ class WaitObject { std::string Filename() {return filename_;} RsyncService::Type Type() {return type_;} size_t Offset() {return offset_;} -private: + private: std::string filename_; RsyncService::Type type_; size_t offset_ = 0xFFFFFFFF; @@ -192,7 +190,7 @@ class WaitObject { }; class WaitObjectManager { -public: + public: WaitObjectManager() { wo_vec_.resize(kMaxRsyncParallelNum); for (int i = 0; i < kMaxRsyncParallelNum; i++) { @@ -228,7 +226,7 @@ class WaitObjectManager { wo_vec_[index]->WakeUp(resp); } -private: + private: std::vector wo_vec_; std::mutex mu_; }; diff --git a/include/rsync_server.h b/include/rsync_server.h index 17d32fde5a..5e8abd7493 100644 --- a/include/rsync_server.h +++ b/include/rsync_server.h @@ -115,6 +115,7 @@ class RsyncReader { *is_eof = (offset + copy_count == total_size_); return pstd::Status::OK(); } + private: pstd::Status Seek(const std::string filepath, const size_t offset) { if (filepath == filepath_ && offset >= start_offset_ && offset < end_offset_) { diff --git a/include/throttle.h b/include/throttle.h index cb0dc7d638..2bdbe6ed71 100644 --- a/include/throttle.h +++ b/include/throttle.h @@ -15,25 +15,25 @@ extern std::unique_ptr g_pika_conf; namespace rsync { class Throttle { public: - Throttle() {} - Throttle(size_t throttle_throughput_bytes, size_t check_cycle); - ~Throttle(); - size_t ThrottledByThroughput(size_t bytes); - void ReturnUnusedThroughput(size_t acquired, size_t consumed, size_t elaspe_time_us); - static Throttle& GetInstance() { - static Throttle instance(g_pika_conf->throttle_bytes_per_second(), 10); - return instance; - } + Throttle() {} + Throttle(size_t throttle_throughput_bytes, size_t check_cycle); + ~Throttle(); + size_t ThrottledByThroughput(size_t bytes); + void ReturnUnusedThroughput(size_t acquired, size_t consumed, size_t elaspe_time_us); + static Throttle& GetInstance() { + static Throttle instance(g_pika_conf->throttle_bytes_per_second(), 10); + return instance; + } private: - std::atomic throttle_throughput_bytes_ = 100 * 1024 * 1024; - // the num of tasks doing install_snapshot - std::atomic last_throughput_check_time_us_; - std::atomic cur_throughput_bytes_; - // user defined check cycles of throughput per second - size_t check_cycle_ = 10; - pstd::Mutex keys_mutex_; - size_t caculate_check_time_us_(int64_t current_time_us, int64_t check_cycle) { + std::atomic throttle_throughput_bytes_ = 100 * 1024 * 1024; + // the num of tasks doing install_snapshot + std::atomic last_throughput_check_time_us_; + std::atomic cur_throughput_bytes_; + // user defined check cycles of throughput per second + size_t check_cycle_ = 10; + pstd::Mutex keys_mutex_; + size_t caculate_check_time_us_(int64_t current_time_us, int64_t check_cycle) { size_t base_aligning_time_us = 1000 * 1000 / check_cycle; return current_time_us / base_aligning_time_us * base_aligning_time_us; } diff --git a/src/build_version.cc.in b/src/build_version.cc.in index 8e2de924a2..0898e05783 100644 --- a/src/build_version.cc.in +++ b/src/build_version.cc.in @@ -3,8 +3,5 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -#include "include/build_version.h" -const char* pika_build_git_sha = - "pika_git_sha:@PIKA_GIT_SHA@"; -const char* pika_build_git_date = "pika_build_git_date:@PIKA_GIT_DATE@"; +const char* pika_build_git_sha = "pika_git_sha:@PIKA_GIT_SHA@"; const char* pika_build_compile_date = "pika_build_date:@PIKA_BUILD_DATE@"; diff --git a/src/cache/include/cache.h b/src/cache/include/cache.h index d5d5fcf576..37a5e5a3b8 100644 --- a/src/cache/include/cache.h +++ b/src/cache/include/cache.h @@ -109,7 +109,7 @@ class RedisCache { // Set Commands Status SAdd(std::string& key, std::vector &members); Status SCard(std::string& key, uint64_t *len); - Status SIsmember(std::string& key, std::string &member); + Status SIsmember(std::string& key, std::string& member); Status SMembers(std::string& key, std::vector *members); Status SRem(std::string& key, std::vector &members); Status SRandmember(std::string& key, int64_t count, std::vector *members); @@ -118,7 +118,7 @@ class RedisCache { Status ZAdd(std::string& key, std::vector &score_members); Status ZCard(std::string& key, uint64_t *len); Status ZCount(std::string& key, std::string &min, std::string &max, uint64_t *len); - Status ZIncrby(std::string& key, std::string &member, double increment); + Status ZIncrby(std::string& key, std::string& member, double increment); Status ZRange(std::string& key, int64_t start, int64_t stop, std::vector *score_members); @@ -126,7 +126,7 @@ class RedisCache { std::string &min, std::string &max, std::vector *score_members, int64_t offset = 0, int64_t count = -1); - Status ZRank(std::string& key, std::string &member, int64_t *rank); + Status ZRank(std::string& key, std::string& member, int64_t *rank); Status ZRem(std::string& key, std::vector &members); Status ZRemrangebyrank(std::string& key, std::string &min, std::string &max); Status ZRemrangebyscore(std::string& key, std::string &min, std::string &max); @@ -140,8 +140,8 @@ class RedisCache { Status ZRevrangebylex(std::string& key, std::string &min, std::string &max, std::vector *members); - Status ZRevrank(std::string& key, std::string &member, int64_t *rank); - Status ZScore(std::string& key, std::string &member, double *score); + Status ZRevrank(std::string& key, std::string& member, int64_t *rank); + Status ZScore(std::string& key, std::string& member, double *score); Status ZRangebylex(std::string& key, std::string &min, std::string &max, std::vector *members); diff --git a/src/cache/src/set.cc b/src/cache/src/set.cc index 2f723e6a88..aa1610e00b 100644 --- a/src/cache/src/set.cc +++ b/src/cache/src/set.cc @@ -47,7 +47,7 @@ Status RedisCache::SCard(std::string& key, uint64_t *len) { return Status::OK(); } -Status RedisCache::SIsmember(std::string& key, std::string &member) { +Status RedisCache::SIsmember(std::string& key, std::string& member) { int is_member = 0; robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); robj *mobj = createObject(OBJ_STRING, sdsnewlen(member.data(), member.size())); diff --git a/src/cache/src/zset.cc b/src/cache/src/zset.cc index 6a9b202622..3333cc6854 100644 --- a/src/cache/src/zset.cc +++ b/src/cache/src/zset.cc @@ -68,7 +68,7 @@ Status RedisCache::ZCount(std::string& key, std::string &min, std::string &max, return Status::OK(); } -Status RedisCache::ZIncrby(std::string& key, std::string &member, double increment) { +Status RedisCache::ZIncrby(std::string& key, std::string& member, double increment) { if (C_OK != RcFreeMemoryIfNeeded(cache_)) { return Status::Corruption("[error] Free memory faild !"); } @@ -145,7 +145,7 @@ Status RedisCache::ZRangebyscore(std::string& key, std::string &min, std::string return Status::OK(); } -Status RedisCache::ZRank(std::string& key, std::string &member, int64_t *rank) { +Status RedisCache::ZRank(std::string& key, std::string& member, int64_t *rank) { robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); robj *mobj = createObject(OBJ_STRING, sdsnewlen(member.data(), member.size())); DEFER { @@ -305,7 +305,7 @@ Status RedisCache::ZRevrangebylex(std::string& key, std::string &min, std::strin return Status::OK(); } -Status RedisCache::ZRevrank(std::string& key, std::string &member, int64_t *rank) { +Status RedisCache::ZRevrank(std::string& key, std::string& member, int64_t *rank) { robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); robj *mobj = createObject(OBJ_STRING, sdsnewlen(member.data(), member.size())); DEFER { @@ -324,7 +324,7 @@ Status RedisCache::ZRevrank(std::string& key, std::string &member, int64_t *rank return Status::OK(); } -Status RedisCache::ZScore(std::string& key, std::string &member, double *score) { +Status RedisCache::ZScore(std::string& key, std::string& member, double *score) { robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); robj *mobj = createObject(OBJ_STRING, sdsnewlen(member.data(), member.size())); DEFER { diff --git a/src/pika.cc b/src/pika.cc index 8128fa00e7..c016660ec1 100644 --- a/src/pika.cc +++ b/src/pika.cc @@ -104,7 +104,6 @@ static void create_pid_file() { size_t pos = path.find_last_of('/'); if (pos != std::string::npos) { - // mkpath(path.substr(0, pos).c_str(), 0755); pstd::CreateDir(path.substr(0, pos)); } else { path = kPikaPidFile; @@ -234,7 +233,7 @@ int main(int argc, char* argv[]) { } // stop PikaReplicaManager first,avoid internal threads - // may references to dead PikaServer + // may reference to dead PikaServer g_pika_rm->Stop(); return 0; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 7fe4ed4149..de1c5e6557 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -106,7 +106,6 @@ void SlaveofCmd::DoInitial() { is_none_ = true; return; } - // self is master of A , want to slaveof B if ((g_pika_server->role() & PIKA_ROLE_MASTER) != 0) { res_.SetRes(CmdRes::kErrOther, "already master of others, invalid usage"); @@ -1660,6 +1659,12 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeNumber(&config_body, g_pika_conf->small_compaction_threshold()); } + if (pstd::stringmatch(pattern.data(), "small-compaction-duration-threshold", 1) != 0) { + elements += 2; + EncodeString(&config_body, "small-compaction-duration-threshold"); + EncodeNumber(&config_body, g_pika_conf->small_compaction_duration_threshold()); + } + if (pstd::stringmatch(pattern.data(), "max-background-flushes", 1) != 0) { elements += 2; EncodeString(&config_body, "max-background-flushes"); @@ -2017,7 +2022,7 @@ void ConfigCmd::ConfigGet(std::string& ret) { void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr db) { std::string set_item = config_args_v_[1]; if (set_item == "*") { - ret = "*28\r\n"; + ret = "*29\r\n"; EncodeString(&ret, "timeout"); EncodeString(&ret, "requirepass"); EncodeString(&ret, "masterauth"); @@ -2036,6 +2041,7 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr db) { EncodeString(&ret, "write-binlog"); EncodeString(&ret, "max-cache-statistic-keys"); EncodeString(&ret, "small-compaction-threshold"); + EncodeString(&ret, "small-compaction-duration-threshold"); EncodeString(&ret, "max-client-response-size"); EncodeString(&ret, "db-sync-speed"); EncodeString(&ret, "compact-cron"); diff --git a/src/pika_cache.cc b/src/pika_cache.cc index 4b90750587..bf1b910966 100644 --- a/src/pika_cache.cc +++ b/src/pika_cache.cc @@ -594,7 +594,7 @@ Status PikaCache::SCard(std::string& key, uint64_t *len) { return caches_[cache_index]->SCard(key, len); } -Status PikaCache::SIsmember(std::string& key, std::string &member) { +Status PikaCache::SIsmember(std::string& key, std::string& member) { int cache_index = CacheIndex(key); std::lock_guard lm(*cache_mutexs_[cache_index]); return caches_[cache_index]->SIsmember(key, member); @@ -903,7 +903,7 @@ Status PikaCache::ZCount(std::string& key, std::string &min, std::string &max, u } } -Status PikaCache::ZIncrby(std::string& key, std::string &member, double increment) { +Status PikaCache::ZIncrby(std::string& key, std::string& member, double increment) { int cache_index = CacheIndex(key); std::lock_guard lm(*cache_mutexs_[cache_index]); return caches_[cache_index]->ZIncrby(key, member, increment); @@ -943,7 +943,7 @@ bool PikaCache::ReloadCacheKeyIfNeeded(cache::RedisCache *cache_obj, std::string } } -Status PikaCache::ZIncrbyIfKeyExist(std::string& key, std::string &member, double increment, ZIncrbyCmd *cmd) { +Status PikaCache::ZIncrbyIfKeyExist(std::string& key, std::string& member, double increment, ZIncrbyCmd *cmd) { auto eps = std::numeric_limits::epsilon(); if (-eps < increment && increment < eps) { return Status::NotFound("icrement is 0, nothing to be done"); @@ -971,7 +971,7 @@ Status PikaCache::ZIncrbyIfKeyExist(std::string& key, std::string &member, doubl ReloadCacheKeyIfNeeded(cache_obj, key, cache_len); return s; }; - auto RemCacheKeyMember = [this, cache_obj, &key, cache_len](const std::string &member, bool check = true) { + auto RemCacheKeyMember = [this, cache_obj, &key, cache_len](const std::string& member, bool check = true) { std::vector member_rm = {member}; auto s = cache_obj->ZRem(key, member_rm); if (check) { @@ -1134,7 +1134,7 @@ Status PikaCache::ZRangebyscore(std::string& key, std::string &min, std::string } } -Status PikaCache::ZRank(std::string& key, std::string &member, int64_t *rank, const std::shared_ptr& db) { +Status PikaCache::ZRank(std::string& key, std::string& member, int64_t *rank, const std::shared_ptr& db) { std::string CachePrefixKeyZ = PCacheKeyPrefixZ + key; int cache_index = CacheIndex(CachePrefixKeyZ); std::lock_guard lm(*cache_mutexs_[cache_index]); @@ -1324,7 +1324,7 @@ Status PikaCache::ZRevrangebylex(std::string& key, std::string &min, std::string } } -Status PikaCache::ZRevrank(std::string& key, std::string &member, int64_t *rank, const std::shared_ptr& db) { +Status PikaCache::ZRevrank(std::string& key, std::string& member, int64_t *rank, const std::shared_ptr& db) { std::string CachePrefixKeyZ = PCacheKeyPrefixZ + key; int cache_index = CacheIndex(CachePrefixKeyZ); std::lock_guard lm(*cache_mutexs_[cache_index]); @@ -1347,7 +1347,7 @@ Status PikaCache::ZRevrank(std::string& key, std::string &member, int64_t *rank, } } } -Status PikaCache::ZScore(std::string& key, std::string &member, double *score, const std::shared_ptr& db) { +Status PikaCache::ZScore(std::string& key, std::string& member, double *score, const std::shared_ptr& db) { int cache_index = CacheIndex(key); std::lock_guard lm(*cache_mutexs_[cache_index]); auto s = caches_[cache_index]->ZScore(key, member, score); diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index ca7616240a..f02881c84e 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -215,39 +215,6 @@ void PikaClientConn::DoBackgroundTask(void* arg) { conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds); } -void PikaClientConn::DoExecTask(void* arg) { - std::unique_ptr bg_arg(static_cast(arg)); - std::shared_ptr cmd_ptr = bg_arg->cmd_ptr; - std::shared_ptr conn_ptr = bg_arg->conn_ptr; - std::shared_ptr resp_ptr = bg_arg->resp_ptr; - LogOffset offset = bg_arg->offset; - std::string db_name = bg_arg->db_name; - bg_arg.reset(); - - cmd_ptr->SetStage(Cmd::kExecuteStage); - cmd_ptr->Execute(); - if (g_pika_conf->slowlog_slower_than() >= 0) { - conn_ptr->ProcessSlowlog(cmd_ptr->argv(), cmd_ptr->GetDoDuration()); - } - - std::shared_ptr db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name)); - if (!db) { - LOG(WARNING) << "Sync Master DB not exist " << db_name; - return; - } - db->ConsensusUpdateAppliedIndex(offset); - - if (!conn_ptr || !resp_ptr) { - return; - } - - *resp_ptr = std::move(cmd_ptr->res().message()); - // last step to update resp_num, early update may casue another therad may - // TryWriteResp success with resp_ptr not updated - conn_ptr->resp_num--; - conn_ptr->TryWriteResp(); -} - void PikaClientConn::BatchExecRedisCmd(const std::vector& argvs) { resp_num.store(static_cast(argvs.size())); for (const auto& argv : argvs) { @@ -283,11 +250,6 @@ bool PikaClientConn::IsInTxn() { return txn_state_[TxnStateBitMask::Start]; } -bool PikaClientConn::IsTxnFailed() { - std::lock_guard lg(txn_state_mu_); - return txn_state_[TxnStateBitMask::WatchFailed] | txn_state_[TxnStateBitMask::InitCmdFailed]; -} - bool PikaClientConn::IsTxnInitFailed() { std::lock_guard lg(txn_state_mu_); return txn_state_[TxnStateBitMask::InitCmdFailed]; diff --git a/src/pika_command.cc b/src/pika_command.cc index 0244294d4e..c6e21c7470 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -24,7 +24,6 @@ #include "include/pika_slot_command.h" #include "include/pika_zset.h" #include "pstd_defer.h" -#include "include/pika_cache.h" #include "src/pstd/include/scope_record_lock.h" using pstd::Status; diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 6f443cc634..4cc61ca331 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -337,8 +337,18 @@ int PikaConf::Load() { small_compaction_threshold_ = 5000; GetConfInt("small-compaction-threshold", &small_compaction_threshold_); - if (small_compaction_threshold_ <= 0 || small_compaction_threshold_ >= 100000) { - small_compaction_threshold_ = 5000; + if (small_compaction_threshold_ < 0) { + small_compaction_threshold_ = 0; + } else if (small_compaction_threshold_ >= 100000) { + small_compaction_threshold_ = 100000; + } + + small_compaction_duration_threshold_ = 10000; + GetConfInt("small-compaction-duration-threshold", &small_compaction_duration_threshold_); + if (small_compaction_duration_threshold_ < 0) { + small_compaction_duration_threshold_ = 0; + } else if (small_compaction_duration_threshold_ >= 1000000) { + small_compaction_duration_threshold_ = 1000000; } max_background_flushes_ = 1; @@ -610,6 +620,7 @@ int PikaConf::ConfigRewrite() { SetConfStr("replication-id", replication_id_); SetConfInt("max-cache-statistic-keys", max_cache_statistic_keys_); SetConfInt("small-compaction-threshold", small_compaction_threshold_); + SetConfInt("small-compaction-duration-threshold", small_compaction_duration_threshold_); SetConfInt("max-client-response-size", static_cast(max_client_response_size_)); SetConfInt("db-sync-speed", db_sync_speed_); SetConfStr("compact-cron", compact_cron_); diff --git a/src/pika_consensus.cc b/src/pika_consensus.cc index 4d35e73a1e..854217674a 100644 --- a/src/pika_consensus.cc +++ b/src/pika_consensus.cc @@ -62,11 +62,6 @@ Status Context::Init() { } } -void Context::PrepareUpdateAppliedIndex(const LogOffset& offset) { - std::lock_guard l(rwlock_); - applied_win_.Push(SyncWinItem(offset)); -} - void Context::UpdateAppliedIndex(const LogOffset& offset) { std::lock_guard l(rwlock_); LogOffset cur_offset; @@ -166,18 +161,6 @@ MemLog::MemLog() = default; int MemLog::Size() { return static_cast(logs_.size()); } -// purge [begin, offset] -Status MemLog::PurgeLogs(const LogOffset& offset, std::vector* logs) { - std::lock_guard l_logs(logs_mu_); - int index = InternalFindLogByBinlogOffset(offset); - if (index < 0) { - return Status::NotFound("Cant find correct index"); - } - logs->assign(logs_.begin(), logs_.begin() + index + 1); - logs_.erase(logs_.begin(), logs_.begin() + index + 1); - return Status::OK(); -} - // keep mem_log [mem_log.begin, offset] Status MemLog::TruncateTo(const LogOffset& offset) { std::lock_guard l_logs(logs_mu_); diff --git a/src/pika_data_distribution.cc b/src/pika_data_distribution.cc index cfab3937a8..49d6af125e 100644 --- a/src/pika_data_distribution.cc +++ b/src/pika_data_distribution.cc @@ -7,7 +7,5 @@ void HashModulo::Init() {} -uint32_t HashModulo::Distribute(const std::string& str) { - return std::hash()(str); -} + diff --git a/src/pika_db.cc b/src/pika_db.cc index 6e66361a38..fb819905fe 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -32,7 +32,7 @@ std::string DbSyncPath(const std::string& sync_path, const std::string& db_name) DB::DB(std::string db_name, const std::string& db_path, const std::string& log_path) - : db_name_(std::move(db_name)), bgsave_engine_(nullptr) { + : db_name_(db_name), bgsave_engine_(nullptr) { db_path_ = DBPath(db_path, db_name_); bgsave_sub_path_ = db_name; dbsync_path_ = DbSyncPath(g_pika_conf->db_sync_path(), db_name); @@ -190,21 +190,6 @@ void DB::InitKeyScan() { key_scan_info_.duration = -1; // duration -1 mean the task in processing } -Status DB::MovetoToTrash(const std::string& path) { - std::string path_tmp = path; - if (path_tmp[path_tmp.length() - 1] == '/') { - path_tmp.erase(path_tmp.length() - 1); - } - path_tmp += "_deleting/"; - if (pstd::RenameFile(path, path_tmp) != 0) { - LOG(WARNING) << "Failed to move " << path << " to trash, error: " << strerror(errno); - return Status::Corruption("Failed to move %s to trash", path); - } - g_pika_server->PurgeDir(path_tmp); - LOG(WARNING) << path << " move to trash success"; - return Status::OK(); -} - void DB::DbRWLockWriter() { db_rwlock_.lock(); } DisplayCacheInfo DB::GetCacheInfo() { diff --git a/src/pika_monotonic_time.cc b/src/pika_monotonic_time.cc new file mode 100644 index 0000000000..e1c8c51496 --- /dev/null +++ b/src/pika_monotonic_time.cc @@ -0,0 +1,52 @@ +// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifdef __APPLE__ // Mac +#include + +#include "include/pika_monotonic_time.h" + +monotime getMonotonicUs() { + static mach_timebase_info_data_t timebase; + if (timebase.denom == 0) { + mach_timebase_info(&timebase); + } + uint64_t nanos = mach_absolute_time() * timebase.numer / timebase.denom; + return nanos / 1000; +} + +#elif __linux__ // Linux + +#ifdef __x86_64__ // x86_64 + +#include + +#include "include/pika_monotonic_time.h" + +monotime getMonotonicUs() { + timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return static_cast(ts.tv_sec) * 1000000 + static_cast(ts.tv_nsec) / 1000; +} + +#elif __arm__ || __aarch64__ // ARM + +#include + +#include "include/pika_monotonic_time.h" + +uint64_t getMonotonicUs() { + timeval tv; + gettimeofday(&tv, nullptr); + return static_cast(tv.tv_sec) * 1000000 + static_cast(tv.tv_usec); +} + +#else +#error "Unsupported architecture for Linux" +#endif // __x86_64__, __arm__ + +#else +#error "Unsupported platform" +#endif // __APPLE__, __linux__ \ No newline at end of file diff --git a/src/pika_repl_client.cc b/src/pika_repl_client.cc index f219de2f33..1a3cb224c6 100644 --- a/src/pika_repl_client.cc +++ b/src/pika_repl_client.cc @@ -149,6 +149,7 @@ Status PikaReplClient::SendDBSync(const std::string& ip, uint32_t port, const st node->set_port(g_pika_server->port()); InnerMessage::Slot* db = db_sync->mutable_slot(); db->set_db_name(db_name); + db->set_slot_id(0); InnerMessage::BinlogOffset* binlog_offset = db_sync->mutable_binlog_offset(); binlog_offset->set_filenum(boffset.filenum); @@ -172,6 +173,7 @@ Status PikaReplClient::SendTrySync(const std::string& ip, uint32_t port, const s node->set_port(g_pika_server->port()); InnerMessage::Slot* db = try_sync->mutable_slot(); db->set_db_name(db_name); + db->set_slot_id(0); InnerMessage::BinlogOffset* binlog_offset = try_sync->mutable_binlog_offset(); binlog_offset->set_filenum(boffset.filenum); @@ -195,6 +197,7 @@ Status PikaReplClient::SendBinlogSync(const std::string& ip, uint32_t port, cons node->set_ip(local_ip); node->set_port(g_pika_server->port()); binlog_sync->set_db_name(db_name); + binlog_sync->set_slot_id(0); binlog_sync->set_first_send(is_first_send); InnerMessage::BinlogOffset* ack_range_start = binlog_sync->mutable_ack_range_start(); @@ -237,6 +240,7 @@ Status PikaReplClient::SendRemoveSlaveNode(const std::string& ip, uint32_t port, InnerMessage::Slot* db = remove_slave_node->mutable_slot(); db->set_db_name(db_name); + db->set_slot_id(0); std::string to_send; if (!request.SerializeToString(&to_send)) { diff --git a/src/pika_repl_server.cc b/src/pika_repl_server.cc index de797b70eb..8ad58de270 100644 --- a/src/pika_repl_server.cc +++ b/src/pika_repl_server.cc @@ -94,6 +94,7 @@ void PikaReplServer::BuildBinlogSyncResp(const std::vector& tasks, In binlog_sync->set_session_id(task.rm_node_.SessionId()); InnerMessage::Slot* db = binlog_sync->mutable_slot(); db->set_db_name(task.rm_node_.DBName()); + db->set_slot_id(0); InnerMessage::BinlogOffset* boffset = binlog_sync->mutable_binlog_offset(); BuildBinlogOffset(task.binlog_chip_.offset_, boffset); binlog_sync->set_binlog(task.binlog_chip_.binlog_); diff --git a/src/pika_repl_server_conn.cc b/src/pika_repl_server_conn.cc index 0b9292c570..1a3eada822 100644 --- a/src/pika_repl_server_conn.cc +++ b/src/pika_repl_server_conn.cc @@ -58,6 +58,7 @@ void PikaReplServerConn::HandleMetaSyncRequest(void* arg) { for (const auto& db_struct : db_structs) { InnerMessage::InnerResponse_MetaSync_DBInfo* db_info = meta_sync->add_dbs_info(); db_info->set_db_name(db_struct.db_name); + db_info->set_slot_num(1); } } } @@ -87,6 +88,7 @@ void PikaReplServerConn::HandleTrySyncRequest(void* arg) { try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError); InnerMessage::Slot* db_response = try_sync_response->mutable_slot(); db_response->set_db_name(db_name); + db_response->set_slot_id(0); bool pre_success = true; response.set_type(InnerMessage::Type::kTrySync); @@ -290,6 +292,7 @@ void PikaReplServerConn::HandleDBSyncRequest(void* arg) { InnerMessage::InnerResponse::DBSync* db_sync_response = response.mutable_db_sync(); InnerMessage::Slot* db_response = db_sync_response->mutable_slot(); db_response->set_db_name(db_name); + db_response->set_slot_id(0); LOG(INFO) << "Handle DBSync Request"; bool prior_success = true; @@ -468,6 +471,7 @@ void PikaReplServerConn::HandleRemoveSlaveNodeRequest(void* arg) { InnerMessage::InnerResponse::RemoveSlaveNode* remove_slave_node_response = response.add_remove_slave_node(); InnerMessage::Slot* slot_response = remove_slave_node_response->mutable_slot (); slot_response->set_db_name(db_name); + slot_response->set_slot_id(0); InnerMessage::Node* node_response = remove_slave_node_response->mutable_node(); node_response->set_ip(g_pika_server->host()); node_response->set_port(g_pika_server->port()); diff --git a/src/pika_server.cc b/src/pika_server.cc index 7783629c92..349be5b7de 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -4,26 +4,22 @@ // of patent rights can be found in the PATENTS file in the same directory. #include -#include #include #include -#include #include #include #include #include #include -#include -#include "net/include/bg_thread.h" #include "net/include/net_cli.h" #include "net/include/net_interfaces.h" -#include "net/include/redis_cli.h" #include "net/include/net_stats.h" #include "pstd/include/env.h" #include "pstd/include/rsync.h" #include "include/pika_cmd_table_manager.h" #include "include/pika_dispatch_thread.h" +#include "include/pika_monotonic_time.h" #include "include/pika_instant.h" #include "include/pika_server.h" #include "include/pika_rm.h" @@ -1129,8 +1125,6 @@ std::unordered_map PikaServer::ServerExecCountDB() { return res; } -QpsStatistic PikaServer::ServerDBStat(const std::string& db_name) { return statistic_.DBStat(db_name); } - std::unordered_map PikaServer::ServerAllDBStat() { return statistic_.AllDBStat(); } int PikaServer::SendToPeer() { return g_pika_rm->ConsumeWriteQueue(); } @@ -1384,15 +1378,6 @@ void PikaServer::AutoUpdateNetworkMetric() { current_time, factor); } -uint64_t PikaServer::getMonotonicUs() { - static mach_timebase_info_data_t timebase; - if (timebase.denom == 0) { - mach_timebase_info(&timebase); - } - uint64_t nanos = mach_absolute_time() * timebase.numer / timebase.denom; - return nanos / 1000; -} - void PikaServer::PrintThreadPoolQueueStatus() { // Print the current queue size if it exceeds QUEUE_SIZE_THRESHOLD_PERCENTAGE/100 of the maximum queue size. size_t cur_size = ClientProcessorThreadPoolCurQueueSize(); diff --git a/src/pika_zset.cc b/src/pika_zset.cc index 2eb8b110b5..4805038027 100644 --- a/src/pika_zset.cc +++ b/src/pika_zset.cc @@ -796,10 +796,11 @@ void ZUnionstoreCmd::DoBinlog(const std::shared_ptr& db) { PikaCmdArgsType del_args; del_args.emplace_back("del"); del_args.emplace_back(dest_key_); - del_cmd_->Initial(del_args, db_name_); - del_cmd_->SetConn(GetConn()); - del_cmd_->SetResp(resp_.lock()); - del_cmd_->DoBinlog(db); + std::shared_ptr del_cmd = std::make_unique(kCmdNameDel, -2, kCmdFlagsWrite | kCmdFlagsMultiSlot | kCmdFlagsKv | kCmdFlagsDoThroughDB); + del_cmd->Initial(del_args, db_name_); + del_cmd->SetConn(GetConn()); + del_cmd->SetResp(resp_.lock()); + del_cmd->DoBinlog(db); if(value_to_dest_.empty()){ // The union operation got an empty set, only use del to simulate overwrite the dest_key with empty set @@ -873,10 +874,11 @@ void ZInterstoreCmd::DoBinlog(const std::shared_ptr& db) { PikaCmdArgsType del_args; del_args.emplace_back("del"); del_args.emplace_back(dest_key_); - del_cmd_->Initial(del_args, db_name_); - del_cmd_->SetConn(GetConn()); - del_cmd_->SetResp(resp_.lock()); - del_cmd_->DoBinlog(db); + std::shared_ptr del_cmd = std::make_unique(kCmdNameDel, -2, kCmdFlagsWrite | kCmdFlagsMultiSlot | kCmdFlagsKv | kCmdFlagsDoThroughDB); + del_cmd->Initial(del_args, db_name_); + del_cmd->SetConn(GetConn()); + del_cmd->SetResp(resp_.lock()); + del_cmd->DoBinlog(db); if (value_to_dest_.size() == 0) { //The inter operation got an empty set, just exec del to simulate overwrite an empty set to dest_key diff --git a/src/rsync_client.cc b/src/rsync_client.cc index ea68a5f063..33eadacf72 100644 --- a/src/rsync_client.cc +++ b/src/rsync_client.cc @@ -168,6 +168,7 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) { request.set_reader_index(index); request.set_type(kRsyncFile); request.set_db_name(db_name_); + request.set_slot_id(0); FileRequest* file_req = request.mutable_file_req(); file_req->set_filename(filename); file_req->set_offset(offset); @@ -329,6 +330,7 @@ Status RsyncClient::CopyRemoteMeta(std::string* snapshot_uuid, std::set filenames; std::string snapshot_uuid; @@ -158,9 +159,10 @@ void RsyncServerConn::HandleFileRsyncRequest(void* arg) { response.set_code(RsyncService::kOk); response.set_type(RsyncService::kRsyncFile); response.set_db_name(db_name); + response.set_slot_id(0); std::string snapshot_uuid; - Status s = g_pika_server->GetDumpUUID(db_name, &snapshot_uuid); + Status s = g_pika_server->GetDumpUUID(db_name, &snapshot_uuid); response.set_snapshot_uuid(snapshot_uuid); if (!s.ok()) { LOG(WARNING) << "rsyncserver get snapshotUUID failed"; diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index b756c5635f..fce6d546c5 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -66,6 +66,7 @@ struct StorageOptions { bool share_block_cache = false; size_t statistics_max_size = 0; size_t small_compaction_threshold = 5000; + size_t small_compaction_duration_threshold = 10000; Status ResetOptions(const OptionType& option_type, const std::unordered_map& options_map); }; @@ -1034,6 +1035,7 @@ class Storage { Status SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys); Status SetSmallCompactionThreshold(uint32_t small_compaction_threshold); + Status SetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold); std::string GetCurrentTaskType(); Status GetUsage(const std::string& property, uint64_t* result); diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index ca56c55091..b6b848c6d4 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -12,8 +12,9 @@ Redis::Redis(Storage* const s, const DataType& type) : storage_(s), type_(type), lock_mgr_(std::make_shared(1000, 0, std::make_shared())), - small_compaction_threshold_(5000) { - statistics_store_ = std::make_unique>(); + small_compaction_threshold_(5000), + small_compaction_duration_threshold_(10000) { + statistics_store_ = std::make_unique>(); scan_cursors_store_ = std::make_unique>(); scan_cursors_store_->SetCapacity(5000); default_compact_range_options_.exclusive_manual_compaction = false; @@ -46,23 +47,40 @@ Status Redis::SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys) { return Status::OK(); } -Status Redis::SetSmallCompactionThreshold(size_t small_compaction_threshold) { +Status Redis::SetSmallCompactionThreshold(uint64_t small_compaction_threshold) { small_compaction_threshold_ = small_compaction_threshold; return Status::OK(); } -Status Redis::UpdateSpecificKeyStatistics(const std::string& key, size_t count) { - if ((statistics_store_->Capacity() != 0U) && (count != 0U)) { - size_t total = 0; - statistics_store_->Lookup(key, &total); - statistics_store_->Insert(key, total + count); - AddCompactKeyTaskIfNeeded(key, total + count); +Status Redis::SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold) { + small_compaction_duration_threshold_ = small_compaction_duration_threshold; + return Status::OK(); +} + +Status Redis::UpdateSpecificKeyStatistics(const std::string& key, uint64_t count) { + if ((statistics_store_->Capacity() != 0U) && (count != 0U) && (small_compaction_threshold_ != 0U)) { + KeyStatistics data; + statistics_store_->Lookup(key, &data); + data.AddModifyCount(count); + statistics_store_->Insert(key, data); + AddCompactKeyTaskIfNeeded(key, data.ModifyCount(), data.AvgDuration()); + } + return Status::OK(); +} + +Status Redis::UpdateSpecificKeyDuration(const std::string& key, uint64_t duration) { + if ((statistics_store_->Capacity() != 0U) && (duration != 0U) && (small_compaction_duration_threshold_ != 0U)) { + KeyStatistics data; + statistics_store_->Lookup(key, &data); + data.AddDuration(duration); + statistics_store_->Insert(key, data); + AddCompactKeyTaskIfNeeded(key, data.ModifyCount(), data.AvgDuration()); } return Status::OK(); } -Status Redis::AddCompactKeyTaskIfNeeded(const std::string& key, size_t total) { - if (total < small_compaction_threshold_) { +Status Redis::AddCompactKeyTaskIfNeeded(const std::string& key, uint64_t count, uint64_t duration) { + if (count < small_compaction_threshold_ || duration < small_compaction_duration_threshold_) { return Status::OK(); } else { storage_->AddBGTask({type_, kCompactRange, {key, key}}); diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index ac1b560f51..f1615e7b8f 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -14,6 +14,8 @@ #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "pstd/include/env.h" + #include "src/lock_mgr.h" #include "src/lru_cache.h" #include "src/mutex_impl.h" @@ -30,6 +32,61 @@ class Redis { rocksdb::DB* GetDB() { return db_; } + struct KeyStatistics { + size_t window_size; + std::deque durations; + + uint64_t modify_count; + + KeyStatistics() : KeyStatistics(10) {} + + KeyStatistics(size_t size) : window_size(size + 2), modify_count(0) {} + + void AddDuration(uint64_t duration) { + durations.push_back(duration); + while (durations.size() > window_size) { + durations.pop_front(); + } + } + uint64_t AvgDuration() { + if (durations.size () < window_size) { + return 0; + } + uint64_t min = durations[0]; + uint64_t max = durations[0]; + uint64_t sum = 0; + for (auto duration : durations) { + if (duration < min) { + min = duration; + } + if (duration > max) { + max = duration; + } + sum += duration; + } + return (sum - max - min) / (durations.size() - 2); + } + void AddModifyCount(uint64_t count) { + modify_count += count; + } + uint64_t ModifyCount() { + return modify_count; + } + }; + + struct KeyStatisticsDurationGuard { + Redis* ctx; + std::string key; + uint64_t start_us; + KeyStatisticsDurationGuard(Redis* that, const std::string& key): ctx(that), key(key), start_us(pstd::NowMicros()) { + } + ~KeyStatisticsDurationGuard() { + uint64_t end_us = pstd::NowMicros(); + uint64_t duration = end_us > start_us ? end_us - start_us : 0; + ctx->UpdateSpecificKeyDuration(key, duration); + } + }; + Status SetOptions(const OptionType& option_type, const std::unordered_map& options); void SetWriteWalOptions(const bool is_wal_disable); @@ -54,7 +111,8 @@ class Redis { virtual Status TTL(const Slice& key, int64_t* timestamp) = 0; Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys); - Status SetSmallCompactionThreshold(size_t small_compaction_threshold); + Status SetSmallCompactionThreshold(uint64_t small_compaction_threshold); + Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold); void GetRocksDBInfo(std::string &info, const char *prefix); protected: @@ -75,11 +133,13 @@ class Redis { Status StoreScanNextPoint(const Slice& key, const Slice& pattern, int64_t cursor, const std::string& next_point); // For Statistics - std::atomic small_compaction_threshold_; - std::unique_ptr> statistics_store_; + std::atomic_uint64_t small_compaction_threshold_; + std::atomic_uint64_t small_compaction_duration_threshold_; + std::unique_ptr> statistics_store_; - Status UpdateSpecificKeyStatistics(const std::string& key, size_t count); - Status AddCompactKeyTaskIfNeeded(const std::string& key, size_t total); + Status UpdateSpecificKeyStatistics(const std::string& key, uint64_t count); + Status UpdateSpecificKeyDuration(const std::string& key, uint64_t duration); + Status AddCompactKeyTaskIfNeeded(const std::string& key, uint64_t count, uint64_t duration); }; } // namespace storage diff --git a/src/storage/src/redis_hashes.cc b/src/storage/src/redis_hashes.cc index 6d28f76ec7..4d1c9bf6b7 100644 --- a/src/storage/src/redis_hashes.cc +++ b/src/storage/src/redis_hashes.cc @@ -22,6 +22,7 @@ RedisHashes::RedisHashes(Storage* const s, const DataType& type) : Redis(s, type Status RedisHashes::Open(const StorageOptions& storage_options, const std::string& db_path) { statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; + small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold; rocksdb::Options ops(storage_options.options); Status s = rocksdb::DB::Open(ops, db_path, &db_); @@ -298,6 +299,7 @@ Status RedisHashes::HGetall(const Slice& key, std::vector* fvs) { version = parsed_hashes_meta_value.version(); HashesDataKey hashes_data_key(key, version, ""); Slice prefix = hashes_data_key.Encode(); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedHashesDataKey parsed_hashes_data_key(iter->key()); @@ -516,6 +518,7 @@ Status RedisHashes::HKeys(const Slice& key, std::vector* fields) { version = parsed_hashes_meta_value.version(); HashesDataKey hashes_data_key(key, version, ""); Slice prefix = hashes_data_key.Encode(); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedHashesDataKey parsed_hashes_data_key(iter->key()); @@ -788,6 +791,7 @@ Status RedisHashes::HVals(const Slice& key, std::vector* values) { version = parsed_hashes_meta_value.version(); HashesDataKey hashes_data_key(key, version, ""); Slice prefix = hashes_data_key.Encode(); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { values->push_back(iter->value().ToString()); @@ -850,6 +854,7 @@ Status RedisHashes::HScan(const Slice& key, int64_t cursor, const std::string& p HashesDataKey hashes_data_prefix(key, version, sub_field); HashesDataKey hashes_start_data_key(key, version, start_point); std::string prefix = hashes_data_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(hashes_start_data_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix); iter->Next()) { @@ -900,6 +905,7 @@ Status RedisHashes::HScanx(const Slice& key, const std::string& start_field, con HashesDataKey hashes_data_prefix(key, version, Slice()); HashesDataKey hashes_start_data_key(key, version, start_field); std::string prefix = hashes_data_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(hashes_start_data_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix); iter->Next()) { @@ -956,6 +962,7 @@ Status RedisHashes::PKHScanRange(const Slice& key, const Slice& field_start, con HashesDataKey hashes_data_prefix(key, version, Slice()); HashesDataKey hashes_start_data_key(key, version, field_start); std::string prefix = hashes_data_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(start_no_limit ? prefix : hashes_start_data_key.Encode()); iter->Valid() && remain > 0 && iter->key().starts_with(prefix); iter->Next()) { @@ -1016,6 +1023,7 @@ Status RedisHashes::PKHRScanRange(const Slice& key, const Slice& field_start, co HashesDataKey hashes_data_prefix(key, version, Slice()); HashesDataKey hashes_start_data_key(key, start_key_version, start_key_field); std::string prefix = hashes_data_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->SeekForPrev(hashes_start_data_key.Encode().ToString()); iter->Valid() && remain > 0 && iter->key().starts_with(prefix); iter->Prev()) { diff --git a/src/storage/src/redis_lists.cc b/src/storage/src/redis_lists.cc index 6360f94620..e2d484b3e4 100644 --- a/src/storage/src/redis_lists.cc +++ b/src/storage/src/redis_lists.cc @@ -26,6 +26,7 @@ RedisLists::RedisLists(Storage* const s, const DataType& type) : Redis(s, type) Status RedisLists::Open(const StorageOptions& storage_options, const std::string& db_path) { statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; + small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold; rocksdb::Options ops(storage_options.options); Status s = rocksdb::DB::Open(ops, db_path, &db_); diff --git a/src/storage/src/redis_sets.cc b/src/storage/src/redis_sets.cc index 4d8c7b2eee..f76217eb32 100644 --- a/src/storage/src/redis_sets.cc +++ b/src/storage/src/redis_sets.cc @@ -21,8 +21,6 @@ namespace storage { RedisSets::RedisSets(Storage* const s, const DataType& type) : Redis(s, type) { - spop_counts_store_ = std::make_unique>(); - spop_counts_store_->SetCapacity(1000); } RedisSets::~RedisSets() = default; @@ -30,6 +28,7 @@ RedisSets::~RedisSets() = default; rocksdb::Status RedisSets::Open(const StorageOptions& storage_options, const std::string& db_path) { statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; + small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold; rocksdb::Options ops(storage_options.options); rocksdb::Status s = rocksdb::DB::Open(ops, db_path, &db_); @@ -325,6 +324,7 @@ rocksdb::Status RedisSets::SDiff(const std::vector& keys, std::vect version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(keys[0], version, Slice()); prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, keys[0]); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -393,6 +393,7 @@ rocksdb::Status RedisSets::SDiffstore(const Slice& destination, const std::vecto version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(keys[0], version, Slice()); Slice prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, keys[0]); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -493,6 +494,7 @@ rocksdb::Status RedisSets::SInter(const std::vector& keys, std::vec version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(keys[0], version, Slice()); Slice prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, keys[0]); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -575,6 +577,7 @@ rocksdb::Status RedisSets::SInterstore(const Slice& destination, const std::vect version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(keys[0], version, Slice()); Slice prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, keys[0]); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -685,6 +688,7 @@ rocksdb::Status RedisSets::SMembers(const Slice& key, std::vector* version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(key, version, Slice()); Slice prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -832,7 +836,7 @@ rocksdb::Status RedisSets::SMove(const Slice& source, const Slice& destination, return s; } -rocksdb::Status RedisSets::SPop(const Slice& key, std::vector* members, bool* need_compact, int64_t cnt) { +rocksdb::Status RedisSets::SPop(const Slice& key, std::vector* members, int64_t cnt) { std::default_random_engine engine; std::string meta_value; @@ -890,6 +894,7 @@ rocksdb::Status RedisSets::SPop(const Slice& key, std::vector* memb SetsMemberKey sets_member_key(key, version, Slice()); int64_t del_count = 0; + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(default_read_options_, handles_[1]); for (iter->Seek(sets_member_key.Encode()); iter->Valid() && cur_index < size; @@ -911,34 +916,14 @@ rocksdb::Status RedisSets::SPop(const Slice& key, std::vector* memb parsed_sets_meta_value.ModifyCount(static_cast(-cnt)); batch.Put(handles_[0], key, meta_value); delete iter; - } - } } else { return s; } - uint64_t count = 0; - uint64_t duration = pstd::NowMicros() - start_us; - AddAndGetSpopCount(key.ToString(), &count); - if (duration >= SPOP_COMPACT_THRESHOLD_DURATION - || count >= SPOP_COMPACT_THRESHOLD_COUNT) { - *need_compact = true; - ResetSpopCount(key.ToString()); - } return db_->Write(default_write_options_, &batch); } -rocksdb::Status RedisSets::ResetSpopCount(const std::string& key) { return spop_counts_store_->Remove(key); } - -rocksdb::Status RedisSets::AddAndGetSpopCount(const std::string& key, uint64_t* count) { - size_t old_count = 0; - spop_counts_store_->Lookup(key, &old_count); - spop_counts_store_->Insert(key, old_count + 1); - *count = old_count + 1; - return rocksdb::Status::OK(); -} - rocksdb::Status RedisSets::SRandmember(const Slice& key, int32_t count, std::vector* members) { if (count == 0) { return rocksdb::Status::OK(); @@ -988,6 +973,7 @@ rocksdb::Status RedisSets::SRandmember(const Slice& key, int32_t count, std::vec int32_t cur_index = 0; int32_t idx = 0; SetsMemberKey sets_member_key(key, version, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(default_read_options_, handles_[1]); for (iter->Seek(sets_member_key.Encode()); iter->Valid() && cur_index < size; iter->Next(), cur_index++) { if (static_cast(idx) >= targets.size()) { @@ -1087,6 +1073,7 @@ rocksdb::Status RedisSets::SUnion(const std::vector& keys, std::vec for (const auto& key_version : vaild_sets) { SetsMemberKey sets_member_key(key_version.key, key_version.version, Slice()); prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, key_version.key); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -1136,6 +1123,7 @@ rocksdb::Status RedisSets::SUnionstore(const Slice& destination, const std::vect for (const auto& key_version : vaild_sets) { SetsMemberKey sets_member_key(key_version.key, key_version.version, Slice()); prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, key_version.key); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -1220,6 +1208,7 @@ rocksdb::Status RedisSets::SScan(const Slice& key, int64_t cursor, const std::st SetsMemberKey sets_member_prefix(key, version, sub_member); SetsMemberKey sets_member_key(key, version, start_point); std::string prefix = sets_member_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(sets_member_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix); iter->Next()) { diff --git a/src/storage/src/redis_sets.h b/src/storage/src/redis_sets.h index aa303b16a5..2898d0e9e7 100644 --- a/src/storage/src/redis_sets.h +++ b/src/storage/src/redis_sets.h @@ -10,15 +10,10 @@ #include #include -#include "pstd/include/env.h" - #include "src/custom_comparator.h" #include "src/lru_cache.h" #include "src/redis.h" -#define SPOP_COMPACT_THRESHOLD_COUNT 500 -#define SPOP_COMPACT_THRESHOLD_DURATION (1000 * 1000) // 1000ms - namespace storage { class RedisSets : public Redis { @@ -46,7 +41,7 @@ class RedisSets : public Redis { Status SMembers(const Slice& key, std::vector* members); Status SMembersWithTTL(const Slice& key, std::vector* members, int64_t* ttl); Status SMove(const Slice& source, const Slice& destination, const Slice& member, int32_t* ret); - Status SPop(const Slice& key, std::vector* members, bool* need_compact, int64_t cnt); + Status SPop(const Slice& key, std::vector* members, int64_t cnt); Status SRandmember(const Slice& key, int32_t count, std::vector* members); Status SRem(const Slice& key, const std::vector& members, int32_t* ret); Status SUnion(const std::vector& keys, std::vector* members); @@ -71,12 +66,6 @@ class RedisSets : public Redis { // Iterate all data void ScanDatabase(); - - private: - // For compact in time after multiple spop - std::unique_ptr> spop_counts_store_; - Status ResetSpopCount(const std::string& key); - Status AddAndGetSpopCount(const std::string& key, uint64_t* count); }; } // namespace storage diff --git a/src/storage/src/redis_zsets.cc b/src/storage/src/redis_zsets.cc index e516f9d140..4da415901f 100644 --- a/src/storage/src/redis_zsets.cc +++ b/src/storage/src/redis_zsets.cc @@ -31,6 +31,7 @@ RedisZSets::RedisZSets(Storage* const s, const DataType& type) : Redis(s, type) Status RedisZSets::Open(const StorageOptions& storage_options, const std::string& db_path) { statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; + small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold; rocksdb::Options ops(storage_options.options); Status s = rocksdb::DB::Open(ops, db_path, &db_); @@ -229,6 +230,7 @@ Status RedisZSets::ZPopMax(const Slice& key, const int64_t count, std::vector::max(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[2]); int32_t del_cnt = 0; for (iter->SeekForPrev(zsets_score_key.Encode()); iter->Valid() && del_cnt < num; iter->Prev()) { @@ -274,6 +276,7 @@ Status RedisZSets::ZPopMin(const Slice& key, const int64_t count, std::vector::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[2]); int32_t del_cnt = 0; for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && del_cnt < num; iter->Next()) { @@ -439,6 +442,7 @@ Status RedisZSets::ZCount(const Slice& key, double min, double max, bool left_cl int32_t stop_index = parsed_zsets_meta_value.count() - 1; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, min, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { bool left_pass = false; @@ -563,6 +567,7 @@ Status RedisZSets::ZRange(const Slice& key, int32_t start, int32_t stop, std::ve int32_t cur_index = 0; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { if (cur_index >= start_index) { @@ -661,6 +666,7 @@ Status RedisZSets::ZRangebyscore(const Slice& key, double min, double max, bool int64_t skipped = 0; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, min, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && index <= stop_index; iter->Next(), ++index) { bool left_pass = false; @@ -725,6 +731,7 @@ Status RedisZSets::ZRank(const Slice& key, const Slice& member, int32_t* rank) { int32_t stop_index = parsed_zsets_meta_value.count() - 1; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && index <= stop_index; iter->Next(), ++index) { ParsedZSetsScoreKey parsed_zsets_score_key(iter->key()); @@ -830,6 +837,7 @@ Status RedisZSets::ZRemrangebyrank(const Slice& key, int32_t start, int32_t stop return s; } ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { if (cur_index >= start_index) { @@ -878,6 +886,7 @@ Status RedisZSets::ZRemrangebyscore(const Slice& key, double min, double max, bo int32_t stop_index = parsed_zsets_meta_value.count() - 1; int32_t version = parsed_zsets_meta_value.version(); ZSetsScoreKey zsets_score_key(key, version, min, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { bool left_pass = false; @@ -953,6 +962,7 @@ Status RedisZSets::ZRevrange(const Slice& key, int32_t start, int32_t stop, std: int32_t cur_index = count - 1; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::max(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->SeekForPrev(zsets_score_key.Encode()); iter->Valid() && cur_index >= start_index; iter->Prev(), --cur_index) { @@ -991,6 +1001,7 @@ Status RedisZSets::ZRevrangebyscore(const Slice& key, double min, double max, bo int64_t skipped = 0; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, std::nextafter(max, std::numeric_limits::max()), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->SeekForPrev(zsets_score_key.Encode()); iter->Valid() && left > 0; iter->Prev(), --left) { bool left_pass = false; @@ -1055,6 +1066,7 @@ Status RedisZSets::ZRevrank(const Slice& key, const Slice& member, int32_t* rank int32_t left = parsed_zsets_meta_value.count(); int32_t version = parsed_zsets_meta_value.version(); ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::max(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->SeekForPrev(zsets_score_key.Encode()); iter->Valid() && left >= 0; iter->Prev(), --left, ++rev_index) { ParsedZSetsScoreKey parsed_zsets_score_key(iter->key()); @@ -1137,6 +1149,7 @@ Status RedisZSets::ZUnionstore(const Slice& destination, const std::vector::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, keys[idx]); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { @@ -1253,6 +1266,7 @@ Status RedisZSets::ZInterstore(const Slice& destination, const std::vector::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, vaild_zsets[0].key); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { ParsedZSetsScoreKey parsed_zsets_score_key(iter->key()); @@ -1357,6 +1371,7 @@ Status RedisZSets::ZRangebylex(const Slice& key, const Slice& min, const Slice& int32_t cur_index = 0; int32_t stop_index = parsed_zsets_meta_value.count() - 1; ZSetsMemberKey zsets_member_key(key, version, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(zsets_member_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { bool left_pass = false; @@ -1417,6 +1432,7 @@ Status RedisZSets::ZRemrangebylex(const Slice& key, const Slice& min, const Slic int32_t cur_index = 0; int32_t stop_index = parsed_zsets_meta_value.count() - 1; ZSetsMemberKey zsets_member_key(key, version, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(zsets_member_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { bool left_pass = false; @@ -1641,6 +1657,7 @@ Status RedisZSets::ZScan(const Slice& key, int64_t cursor, const std::string& pa ZSetsMemberKey zsets_member_prefix(key, version, sub_member); ZSetsMemberKey zsets_member_key(key, version, start_point); std::string prefix = zsets_member_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(zsets_member_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix); iter->Next()) { diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index a7c25043c7..d16548b9c2 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -337,11 +337,7 @@ Status Storage::SMove(const Slice& source, const Slice& destination, const Slice } Status Storage::SPop(const Slice& key, std::vector* members, int64_t count) { - bool need_compact = false; - Status status = sets_db_->SPop(key, members, &need_compact, count); - if (need_compact) { - AddBGTask({kSets, kCompactRange, {key.ToString(), key.ToString()}}); - } + Status status = sets_db_->SPop(key, members, count); return status; } @@ -1661,6 +1657,14 @@ Status Storage::SetSmallCompactionThreshold(uint32_t small_compaction_threshold) return Status::OK(); } +Status Storage::SetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold) { + std::vector dbs = {sets_db_.get(), zsets_db_.get(), hashes_db_.get(), lists_db_.get()}; + for (const auto& db : dbs) { + db->SetSmallCompactionDurationThreshold(small_compaction_duration_threshold); + } + return Status::OK(); +} + std::string Storage::GetCurrentTaskType() { int type = current_task_type_; switch (type) { diff --git a/tests/helpers/redis_queue.py b/tests/helpers/redis_queue.py index 3d0eb8fe12..9203c2d0db 100644 --- a/tests/helpers/redis_queue.py +++ b/tests/helpers/redis_queue.py @@ -6,13 +6,18 @@ START_FLAG = True + def enqueue(client: redis.Redis, queue_name: str): while START_FLAG: + n = client.zcard(queue_name) + if n >= 1000: + time.sleep(0.1) + continue now_ms = int(time.time() * 1000) pipeline = client.pipeline(transaction=False) for i in range(10): score = now_ms << 5 | i - pipeline.zadd(queue_name, {str(i): score}) + pipeline.zadd(queue_name, {str(score): score}) pipeline.execute() print("enqueue exit") @@ -23,6 +28,7 @@ def dequeue(client: redis.Redis, queue_name: str): start_time = time.time() n = client.zcard(queue_name) if n <= 10: + time.sleep(0.1) continue res = client.zremrangebyrank(queue_name, 0, 9) latency = time.time() - start_time @@ -45,37 +51,58 @@ def compact(client: redis.Redis, queue_name: str): print("compact exit") +def auto_compact(client: redis.Redis): + client.config_set("max-cache-statistic-keys", 10000) + client.config_set("small-compaction-threshold", 10000) + client.config_set("small-compaction-duration-threshold", 10000) + + def main(): - if len(sys.argv) != 4: - print("Usage: python redis_queue.py ") + if len(sys.argv) != 5: + print("Usage: python redis_queue.py $redis_host $port $passwd [compact | auto_compact]") sys.exit(1) host = sys.argv[1] port = int(sys.argv[2]) passwd = sys.argv[3] - client_enqueue = redis.Redis(host=host, port=port, password=passwd) - client_dequeue = redis.Redis(host=host, port=port, password=passwd) - client_compact = redis.Redis(host=host, port=port, password=passwd) + mode = sys.argv[4] + + thread_list = [] queue_name = "test_queue" + + client_enqueue = redis.Redis(host=host, port=port, password=passwd) t1 = threading.Thread(target=enqueue, args=(client_enqueue, queue_name)) - t1.start() + thread_list.append(t1) + + client_dequeue = redis.Redis(host=host, port=port, password=passwd) t2 = threading.Thread(target=dequeue, args=(client_dequeue, queue_name)) - t2.start() - t3 = threading.Thread(target=compact, args=(client_compact, queue_name)) - t3.start() - - def signal_handler(signal, frame): + thread_list.append(t2) + + client_compact = redis.Redis(host=host, port=port, password=passwd) + if mode == "compact": + t3 = threading.Thread(target=compact, args=(client_compact, queue_name)) + thread_list.append(t3) + elif mode == "auto_compact": + auto_compact(client_compact) + else: + print("invalid compact mode: {}".format(mode)) + sys.exit(1) + + for t in thread_list: + t.start() + + def signal_handler(signal, frame): print("revc signal: {}".format(signal)) global START_FLAG START_FLAG = False - t1.join() - t2.join() - t3.join() + for t in thread_list: + t.join() print("exit") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGQUIT, signal_handler) + while True: time.sleep(60)