Skip to content

Commit

Permalink
add code
Browse files Browse the repository at this point in the history
  • Loading branch information
wuxianrong committed Dec 26, 2023
2 parents 1defd41 + e2c6f92 commit 09cca3b
Show file tree
Hide file tree
Showing 62 changed files with 454 additions and 432 deletions.
1 change: 1 addition & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ max-cache-statistic-keys : 0
# a small compact is triggered automatically if the small compaction feature is enabled.
# small-compaction-threshold default value is 5000 and the value range is [1, 100000].
small-compaction-threshold : 5000
small-compaction-duration-threshold : 10000

# The maximum total size of all live memtables of the RocksDB instance that owned by Pika.
# Flushing from memtable to disk will be triggered if the actual memory usage of RocksDB
Expand Down
14 changes: 2 additions & 12 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Version final : public pstd::noncopyable {

void debug() {
std::shared_lock l(rwlock_);
printf("Current pro_num %u pro_offset %lu\n", pro_num_, pro_offset_);
printf("Current pro_num %u pro_offset %llu\n", pro_num_, pro_offset_);
}

private:
Expand All @@ -63,12 +63,8 @@ class Binlog : public pstd::noncopyable {
// Need to hold Lock();
pstd::Status Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index);

uint64_t file_size() { return file_size_; }

std::string filename() { return filename_; }

bool IsBinlogIoError() { return binlog_io_error_; }

// need to hold mutex_
void SetTerm(uint32_t term) {
std::lock_guard l(version_->rwlock_);
Expand All @@ -85,11 +81,9 @@ class Binlog : public pstd::noncopyable {

private:
pstd::Status Put(const char* item, int len);
pstd::Status EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int* temp_pro_offset);
static pstd::Status AppendPadding(pstd::WritableFile* file, uint64_t* len);
// pstd::WritableFile *queue() { return queue_; }

void InitLogFile();
pstd::Status EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int* temp_pro_offset);

/*
* Produce
Expand All @@ -109,17 +103,13 @@ class Binlog : public pstd::noncopyable {

int block_offset_ = 0;

char* pool_ = nullptr;
bool exit_all_consume_ = false;
const std::string binlog_path_;

uint64_t file_size_ = 0;

std::string filename_;

std::atomic<bool> binlog_io_error_;
// Not use
// int32_t retry_;
};

#endif
22 changes: 9 additions & 13 deletions include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "cache/include/cache.h"
#include "storage/storage.h"

class PikaCacheLoadThread;
class ZIncrbyCmd;
class ZRangebyscoreCmd;
class ZRevrangebyscoreCmd;
Expand Down Expand Up @@ -45,10 +46,8 @@ struct CacheInfo {
}
};

class PikaCacheLoadThread;
class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<PikaCache> {
public:

PikaCache(int zset_cache_start_pos, int zset_cache_field_num_per_key);
~PikaCache();

Expand All @@ -60,24 +59,21 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
int CacheStatus(void);
void ClearHitRatio(void);
// Normal Commands
void Info(CacheInfo &info);
long long DbSize(void);
void Info(CacheInfo& info);
bool Exists(std::string& key);
void FlushDB(void);
void ActiveExpireCycle();
void ProcessCronTask(void);

rocksdb::Status Del(const std::vector<std::string> &keys);
rocksdb::Status Del(const std::vector<std::string>& keys);
rocksdb::Status Expire(std::string& key, int64_t ttl);
rocksdb::Status Expireat(std::string& key, int64_t ttl);
rocksdb::Status TTL(std::string& key, int64_t* ttl);
rocksdb::Status Persist(std::string& key);
rocksdb::Status Type(std::string& key, std::string* value);
rocksdb::Status RandomKey(std::string *key);
rocksdb::Status RandomKey(std::string* key);

// String Commands
rocksdb::Status Set(std::string& key, std::string& value, int64_t ttl);
rocksdb::Status SetWithoutTTL(std::string& key, std::string& value);
rocksdb::Status Setnx(std::string& key, std::string& value, int64_t ttl);
rocksdb::Status SetnxWithoutTTL(std::string& key, std::string& value);
rocksdb::Status Setxx(std::string& key, std::string& value, int64_t ttl);
Expand Down Expand Up @@ -138,7 +134,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status SAddnx(std::string& key, std::vector<std::string>& members, int64_t ttl);
rocksdb::Status SAddnxWithoutTTL(std::string& key, std::vector<std::string>& members);
rocksdb::Status SCard(std::string& key, uint64_t* len);
rocksdb::Status SIsmember(std::string& key, std::string &member);
rocksdb::Status SIsmember(std::string& key, std::string& member);
rocksdb::Status SMembers(std::string& key, std::vector<std::string>* members);
rocksdb::Status SRem(std::string& key, std::vector<std::string>& members);
rocksdb::Status SRandmember(std::string& key, int64_t count, std::vector<std::string>* members);
Expand All @@ -151,12 +147,12 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status ZCard(std::string& key, uint32_t* len, const std::shared_ptr<DB>& db);
rocksdb::Status ZCount(std::string& key, std::string& min, std::string& max, uint64_t* len, ZCountCmd* cmd);
rocksdb::Status ZIncrby(std::string& key, std::string& member, double increment);
rocksdb::Status ZIncrbyIfKeyExist(std::string& key, std::string &member, double increment, ZIncrbyCmd* cmd);
rocksdb::Status ZIncrbyIfKeyExist(std::string& key, std::string& member, double increment, ZIncrbyCmd* cmd);
rocksdb::Status ZRange(std::string& key, int64_t start, int64_t stop, std::vector<storage::ScoreMember>* score_members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRangebyscore(std::string& key, std::string& min, std::string& max,
std::vector<storage::ScoreMember>* score_members, ZRangebyscoreCmd* cmd);
rocksdb::Status ZRank(std::string& key, std::string &member, int64_t* rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZRank(std::string& key, std::string& member, int64_t* rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZRem(std::string& key, std::vector<std::string>& members, std::shared_ptr<DB> db);
rocksdb::Status ZRemrangebyrank(std::string& key, std::string& min, std::string& max, int32_t ele_deleted = 0,
const std::shared_ptr<DB>& db = nullptr);
Expand All @@ -167,8 +163,8 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
std::vector<storage::ScoreMember>* score_members, ZRevrangebyscoreCmd* cmd);
rocksdb::Status ZRevrangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members,
const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrank(std::string& key, std::string &member, int64_t *rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZScore(std::string& key, std::string &member, double *score, const std::shared_ptr<DB>& db);
rocksdb::Status ZRevrank(std::string& key, std::string& member, int64_t *rank, const std::shared_ptr<DB>& db);
rocksdb::Status ZScore(std::string& key, std::string& member, double* score, const std::shared_ptr<DB>& db);
rocksdb::Status ZRangebylex(std::string& key, std::string& min, std::string& max, std::vector<std::string>* members, const std::shared_ptr<DB>& db);
rocksdb::Status ZLexcount(std::string& key, std::string& min, std::string& max, uint64_t* len,
const std::shared_ptr<DB>& db);
Expand Down
20 changes: 8 additions & 12 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class PikaClientConn : public net::RedisConn {
std::shared_ptr<std::string> resp_ptr;
LogOffset offset;
std::string db_name;
uint32_t slot_id;
};

struct TxnStateBitMask {
Expand Down Expand Up @@ -89,33 +88,30 @@ class PikaClientConn : public net::RedisConn {
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs);
int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; }
static void DoBackgroundTask(void* arg);
static void DoExecTask(void* arg);

bool IsPubSub() { return is_pubsub_; }
void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; }
void SetCurrentDb(const std::string& db_name) { current_db_ = db_name; }
const std::string& GetCurrentTable() override { return current_db_; }
void SetWriteCompleteCallback(WriteCompleteCallback cb) { write_completed_cb_ = std::move(cb); }
const std::string& GetCurrentTable() override { return current_db_; }

// Txn
void PushCmdToQue(std::shared_ptr<Cmd> cmd);
std::queue<std::shared_ptr<Cmd>> GetTxnCmdQue();
void PushCmdToQue(std::shared_ptr<Cmd> cmd);
void ClearTxnCmdQue();
bool IsInTxn();
bool IsTxnFailed();
bool IsTxnInitFailed();
bool IsTxnWatchFailed();
bool IsTxnExecing(void);
void SetTxnWatchFailState(bool is_failed);
void SetTxnInitFailState(bool is_failed);
void SetTxnStartState(bool is_start);

void AddKeysToWatch(const std::vector<std::string> &db_keys);
void AddKeysToWatch(const std::vector<std::string>& db_keys);
void RemoveWatchedKeys();
void SetTxnFailedFromKeys(const std::vector<std::string> &db_keys);
void SetTxnFailedFromKeys(const std::vector<std::string>& db_keys);
void SetAllTxnFailed();
void SetTxnFailedFromDBs(std::string db_name);
void ExitTxn();
bool IsInTxn();
bool IsTxnInitFailed();
bool IsTxnWatchFailed();
bool IsTxnExecing(void);

net::ServerThread* server_thread() { return server_thread_; }

Expand Down
6 changes: 2 additions & 4 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const std::string kCmdNameSlotsMgrtSlotAsync = "slotsmgrtslot-async";
const std::string kCmdNameSlotsMgrtExecWrapper = "slotsmgrt-exec-wrapper";
const std::string kCmdNameSlotsMgrtAsyncStatus = "slotsmgrt-async-status";
const std::string kCmdNameSlotsMgrtAsyncCancel = "slotsmgrt-async-cancel";

// Kv
const std::string kCmdNameSet = "set";
const std::string kCmdNameGet = "get";
Expand Down Expand Up @@ -230,22 +231,19 @@ const std::string kCmdNameUnSubscribe = "unsubscribe";
const std::string kCmdNamePubSub = "pubsub";
const std::string kCmdNamePSubscribe = "psubscribe";
const std::string kCmdNamePUnSubscribe = "punsubscribe";

const std::string kClusterPrefix = "pkcluster";

using PikaCmdArgsType = net::RedisCmdArgsType;
static const int RAW_ARGS_LEN = 1024 * 1024;

enum CmdFlagsMask {
kCmdFlagsMaskRW = 1,
kCmdFlagsMaskType = 30,
kCmdFlagsMaskLocal = 32,
kCmdFlagsMaskSuspend = 64,
kCmdFlagsMaskPrior = 128,
kCmdFlagsMaskAdminRequire = 256,
kCmdFlagsMaskDoThrouhDB = 4096,
kCmdFlagsMaskReadCache = 128,
kCmdFlagsMaskUpdateCache = 2048,
kCmdFlagsMaskSlot = 1536,
};

enum CmdFlags {
Expand Down
37 changes: 22 additions & 15 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return small_compaction_threshold_;
}
int small_compaction_duration_threshold() {
std::shared_lock l(rwlock_);
return small_compaction_duration_threshold_;
}
int max_background_flushes() {
std::shared_lock l(rwlock_);
return max_background_flushes_;
Expand Down Expand Up @@ -425,6 +429,11 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("small-compaction-threshold", std::to_string(value));
small_compaction_threshold_ = value;
}
void SetSmallCompactionDurationThreshold(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("small-compaction-duration-threshold", std::to_string(value));
small_compaction_duration_threshold_ = value;
}
void SetMaxClientResponseSize(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("max-client-response-size", std::to_string(value));
Expand Down Expand Up @@ -477,7 +486,7 @@ class PikaConf : public pstd::BaseConf {
}
void SetSlotMigrate(const std::string &value) {
std::lock_guard l(rwlock_);
slotmigrate_ = (value == "yes") ? true : false;
slotmigrate_ = (value == "yes");
}
void SetExpireLogsNums(const int value) {
std::lock_guard l(rwlock_);
Expand Down Expand Up @@ -602,32 +611,31 @@ class PikaConf : public pstd::BaseConf {
max_rsync_parallel_num_ = value;
}

int64_t cache_maxmemory() { return cache_maxmemory_; }
void SetCacheType(const std::string &value);
void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; }
int zset_cache_start_pos() { return zset_cache_start_pos_; }
int zset_cache_field_num_per_key() { return zset_cache_field_num_per_key_; }
int64_t cache_maxmemory() { return cache_maxmemory_; }
int cache_maxmemory_policy() { return cache_maxmemory_policy_; }
int cache_maxmemory_samples() { return cache_maxmemory_samples_; }
int cache_lfu_decay_time() { return cache_lfu_decay_time_; }

int Load();
int ConfigRewrite();
int ConfigRewriteReplicationID();
private:

private:
int port_ = 0;
std::string slaveof_;
int slave_priority_ = 0;
int thread_num_ = 0;
int thread_pool_size_ = 0;
int sync_thread_num_ = 0;
int expire_dump_days_ = 3;
int db_sync_speed_ = 0;
std::string slaveof_;
std::string log_path_;
std::string log_level_;
std::string db_path_;
std::string db_sync_path_;
int expire_dump_days_ = 3;
int db_sync_speed_ = 0;
std::string compact_cron_;
std::string compact_interval_;
int64_t resume_check_interval_ = 60; // seconds
Expand Down Expand Up @@ -675,6 +683,7 @@ class PikaConf : public pstd::BaseConf {

int max_cache_statistic_keys_ = 0;
int small_compaction_threshold_ = 0;
int small_compaction_duration_threshold_ = 0;
int max_background_flushes_ = 0;
int max_background_compactions_ = 0;
int max_background_jobs_ = 0;
Expand Down Expand Up @@ -712,10 +721,11 @@ class PikaConf : public pstd::BaseConf {
int binlog_file_size_ = 0;

// cache
std::vector<std::string> cache_type_;
std::atomic_bool tmp_cache_disable_flag_;
std::atomic_int64_t cache_maxmemory_;
std::atomic_int cache_num_;
std::atomic_int cache_model_;
std::atomic_bool tmp_cache_disable_flag_;
std::vector<std::string> cache_type_;
std::atomic_int cache_string_;
std::atomic_int cache_set_;
std::atomic_int cache_zset_;
Expand All @@ -724,25 +734,22 @@ class PikaConf : public pstd::BaseConf {
std::atomic_int cache_bit_;
std::atomic_int zset_cache_start_pos_;
std::atomic_int zset_cache_field_num_per_key_;
std::atomic_int64_t cache_maxmemory_;
std::atomic_int cache_maxmemory_policy_;
std::atomic_int cache_maxmemory_samples_;
std::atomic_int cache_lfu_decay_time_;


// rocksdb blob
bool enable_blob_files_ = false;
int64_t min_blob_size_ = 4096; // 4K
int64_t blob_file_size_ = 256 * 1024 * 1024; // 256M
std::string blob_compression_type_ = "none";
bool enable_blob_garbage_collection_ = false;
double blob_garbage_collection_age_cutoff_ = 0.25;
double blob_garbage_collection_force_threshold_ = 1.0;
int64_t min_blob_size_ = 4096; // 4K
int64_t blob_cache_ = 0;
int64_t blob_num_shard_bits_ = 0;
int64_t blob_file_size_ = 256 * 1024 * 1024; // 256M
std::string blob_compression_type_ = "none";

std::unique_ptr<PikaMeta> local_meta_;

std::shared_mutex rwlock_;

// Rsync Rate limiting configuration
Expand Down
8 changes: 0 additions & 8 deletions include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class Context : public pstd::noncopyable {
pstd::Status Init();
// RWLock should be held when access members.
pstd::Status StableSave();
void PrepareUpdateAppliedIndex(const LogOffset& offset);
void UpdateAppliedIndex(const LogOffset& offset);
void Reset(const LogOffset& offset);

Expand Down Expand Up @@ -79,7 +78,6 @@ class MemLog {
logs_.push_back(item);
last_offset_ = item.offset;
}
pstd::Status PurgeLogs(const LogOffset& offset, std::vector<LogItem>* logs);
pstd::Status TruncateTo(const LogOffset& offset);

void Reset(const LogOffset& offset);
Expand Down Expand Up @@ -136,11 +134,6 @@ class ConsensusCoordinator {
return committed_index_;
}

LogOffset applied_index() {
std::shared_lock lock(context_->rwlock_);
return context_->applied_index_;
}

std::shared_ptr<Context> context() { return context_; }

// redis parser cb
Expand Down Expand Up @@ -203,7 +196,6 @@ class ConsensusCoordinator {
uint32_t term_ = 0;

std::string db_name_;
uint32_t slot_id_ = 0;

SyncProgress sync_pros_;
std::shared_ptr<StableLog> stable_logger_;
Expand Down
2 changes: 0 additions & 2 deletions include/pika_data_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ class PikaDataDistribution {
virtual ~PikaDataDistribution() = default;
// Initialization
virtual void Init() = 0;
virtual uint32_t Distribute(const std::string& str) = 0;
};

class HashModulo : public PikaDataDistribution {
public:
~HashModulo() override = default;
void Init() override;
uint32_t Distribute(const std::string& str) override;
};

#endif
Loading

0 comments on commit 09cca3b

Please sign in to comment.