From e1030fc5d6cbdc432605c78ffb9e9b0d7ab8445b Mon Sep 17 00:00:00 2001 From: wuxianrong Date: Mon, 11 Dec 2023 14:36:47 +0800 Subject: [PATCH] delets kv slot --- include/pika_command.h | 18 +- include/pika_db.h | 10 +- include/pika_kv.h | 256 +++++++++++----------- include/pika_slot_command.h | 62 +++--- src/pika_command.cc | 90 ++------ src/pika_db.cc | 5 + src/pika_kv.cc | 420 ++++++++++++++++++------------------ src/pika_slot_command.cc | 12 +- 8 files changed, 416 insertions(+), 457 deletions(-) diff --git a/include/pika_command.h b/include/pika_command.h index b8215dae2b..1396a79fca 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -17,6 +17,7 @@ #include "pstd/include/pstd_string.h" #include "include/pika_slot.h" +#include "include/pika_db.h" #include "net/src/dispatch_thread.h" class SyncMasterSlot; @@ -481,15 +482,14 @@ class Cmd : public std::enable_shared_from_this { virtual std::vector current_key() const; virtual void Execute(); virtual void ProcessSingleSlotCmd(); - virtual void ProcessMultiSlotCmd(); - virtual void Do(std::shared_ptr slot = nullptr) = 0; - virtual void DoThroughDB(std::shared_ptr slot = nullptr) {} - virtual void DoUpdateCache(std::shared_ptr slot = nullptr) {} - virtual void ReadCache(std::shared_ptr slot = nullptr) {} + virtual void Do(std::shared_ptr db) = 0; + virtual void DoThroughDB(std::shared_ptr db) {} + virtual void DoUpdateCache(std::shared_ptr db) {} + virtual void ReadCache(std::shared_ptr db) {} rocksdb::Status CmdStatus() { return s_; }; virtual Cmd* Clone() = 0; // used for execute multikey command into different slots - virtual void Split(std::shared_ptr slot, const HintKeys& hint_keys) = 0; + virtual void Split(std::shared_ptr db, const HintKeys& hint_keys) = 0; virtual void Merge() = 0; void Initial(const PikaCmdArgsType& argv, const std::string& db_name); @@ -531,11 +531,11 @@ class Cmd : public std::enable_shared_from_this { protected: // enable copy, used default copy // Cmd(const Cmd&); - void ProcessCommand(const std::shared_ptr& slot, const std::shared_ptr& sync_slot, + void ProcessCommand(const std::shared_ptr& db, const std::shared_ptr& sync_slot, const HintKeys& hint_key = HintKeys()); - void InternalProcessCommand(const std::shared_ptr& slot, const std::shared_ptr& sync_slot, + void InternalProcessCommand(const std::shared_ptr& db, const std::shared_ptr& sync_slot, const HintKeys& hint_key); - void DoCommand(const std::shared_ptr& slot, const HintKeys& hint_key); + void DoCommand(const std::shared_ptr& db, const HintKeys& hint_key); bool CheckArg(uint64_t num) const; void LogCommand() const; diff --git a/include/pika_db.h b/include/pika_db.h index 339718d2f7..551ee90fcc 100644 --- a/include/pika_db.h +++ b/include/pika_db.h @@ -24,6 +24,7 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { friend class PikaServer; std::string GetDBName(); + std::shared_ptr storage() const; void BgSaveDB(); void CompactDB(const storage::DataType& type); bool FlushSlotDB(); @@ -33,6 +34,7 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { bool IsBinlogIoError(); uint32_t SlotNum(); void GetAllSlots(std::set& slot_ids); + std::shared_ptr cache() const; std::shared_mutex& GetSlotLock() { return slots_rw_; } @@ -76,12 +78,18 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { std::map> GetSlots() { return slots_; } + std::shared_ptr LockMgr(); + void DbRWLockReader(); + void DbRWUnLock(); private: std::string db_name_; uint32_t slot_num_ = 0; std::string db_path_; std::string log_path_; - + std::shared_ptr lock_mgr_; + std::shared_mutex db_rwlock_; + std::shared_ptr storage_; + std::shared_ptr cache_; std::atomic binlog_io_error_; // lock order // slots_rw_ > key_scan_protector_ diff --git a/include/pika_kv.h b/include/pika_kv.h index b674d71acf..78d8562eeb 100644 --- a/include/pika_kv.h +++ b/include/pika_kv.h @@ -23,10 +23,10 @@ class SetCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SetCmd(*this); } private: @@ -55,11 +55,11 @@ class GetCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void ReadCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void ReadCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new GetCmd(*this); } @@ -74,11 +74,11 @@ class GetCmd : public Cmd { class DelCmd : public Cmd { public: DelCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){}; - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; std::vector current_key() const override { return keys_; } - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override; void Merge() override; Cmd* Clone() override { return new DelCmd(*this); } void DoBinlog(const std::shared_ptr& slot) override; @@ -98,10 +98,10 @@ class IncrCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new IncrCmd(*this); } @@ -120,10 +120,10 @@ class IncrbyCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new IncrbyCmd(*this); } @@ -142,10 +142,10 @@ class IncrbyfloatCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new IncrbyfloatCmd(*this); } @@ -164,10 +164,10 @@ class DecrCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new DecrCmd(*this); } @@ -186,10 +186,10 @@ class DecrbyCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new DecrbyCmd(*this); } @@ -208,10 +208,10 @@ class GetsetCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new GetsetCmd(*this); } @@ -230,10 +230,10 @@ class AppendCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new AppendCmd(*this); } @@ -247,12 +247,12 @@ class AppendCmd : public Cmd { class MgetCmd : public Cmd { public: MgetCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){}; - void Do(std::shared_ptr slot = nullptr) override; - void ReadCache(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; + void Do(std::shared_ptr db) override; + void ReadCache(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; std::vector current_key() const override { return keys_; } - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override; void Merge() override; Cmd* Clone() override { return new MgetCmd(*this); } @@ -270,8 +270,8 @@ class MgetCmd : public Cmd { class KeysCmd : public Cmd { public: KeysCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new KeysCmd(*this); } @@ -291,8 +291,8 @@ class SetnxCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SetnxCmd(*this); } @@ -313,10 +313,10 @@ class SetexCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SetexCmd(*this); } @@ -337,10 +337,10 @@ class PsetexCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PsetexCmd(*this); } @@ -361,8 +361,8 @@ class DelvxCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new DelvxCmd(*this); } @@ -383,9 +383,9 @@ class MsetCmd : public Cmd { set_cmd_ = std::make_shared(kCmdNameSet, -3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv); } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; std::vector current_key() const override { std::vector res; for (auto& kv : kvs_) { @@ -393,7 +393,7 @@ class MsetCmd : public Cmd { } return res; } - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override; void Merge() override; Cmd* Clone() override { return new MsetCmd(*this); } void DoBinlog(const std::shared_ptr& slot) override; @@ -422,8 +422,8 @@ class MsetnxCmd : public Cmd { } return res; } - void Do(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new MsetnxCmd(*this); } void DoBinlog(const std::shared_ptr& slot) override; @@ -444,11 +444,11 @@ class GetrangeCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void ReadCache(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void ReadCache(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new GetrangeCmd(*this); } @@ -470,10 +470,10 @@ class SetrangeCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SetrangeCmd(*this); } @@ -493,11 +493,11 @@ class StrlenCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void ReadCache(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void ReadCache(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new StrlenCmd(*this); } @@ -512,11 +512,11 @@ class StrlenCmd : public Cmd { class ExistsCmd : public Cmd { public: ExistsCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot = nullptr) override; - void ReadCache(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; + void Do(std::shared_ptr db) override; + void ReadCache(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; std::vector current_key() const override { return keys_; } - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override; void Merge() override; Cmd* Clone() override { return new ExistsCmd(*this); } @@ -535,10 +535,10 @@ class ExpireCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ExpireCmd(*this); } @@ -558,10 +558,10 @@ class PexpireCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PexpireCmd(*this); } @@ -581,10 +581,10 @@ class ExpireatCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ExpireatCmd(*this); } @@ -603,10 +603,10 @@ class PexpireatCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PexpireatCmd(*this); } @@ -626,10 +626,10 @@ class TtlCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void ReadCache(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void ReadCache(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new TtlCmd(*this); } @@ -647,10 +647,10 @@ class PttlCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void ReadCache(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void ReadCache(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PttlCmd(*this); } @@ -668,10 +668,10 @@ class PersistCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void DoUpdateCache(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void DoUpdateCache(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PersistCmd(*this); } @@ -689,10 +689,10 @@ class TypeCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void ReadCache(std::shared_ptr slot = nullptr) override; - void DoThroughDB(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Do(std::shared_ptr db) override; + void ReadCache(std::shared_ptr db) override; + void DoThroughDB(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new TypeCmd(*this); } @@ -710,8 +710,8 @@ class PTypeCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PTypeCmd(*this); } @@ -724,8 +724,8 @@ class PTypeCmd : public Cmd { class ScanCmd : public Cmd { public: ScanCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag), pattern_("*") {} - void Do(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new ScanCmd(*this); } @@ -746,8 +746,8 @@ class ScanCmd : public Cmd { class ScanxCmd : public Cmd { public: ScanxCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag), pattern_("*") {} - void Do(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new ScanxCmd(*this); } @@ -772,8 +772,8 @@ class PKSetexAtCmd : public Cmd { res.push_back(key_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PKSetexAtCmd(*this); } @@ -794,8 +794,8 @@ class PKScanRangeCmd : public Cmd { res.push_back(key_start_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PKScanRangeCmd(*this); } @@ -823,8 +823,8 @@ class PKRScanRangeCmd : public Cmd { res.push_back(key_start_); return res; } - void Do(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PKRScanRangeCmd(*this); } diff --git a/include/pika_slot_command.h b/include/pika_slot_command.h index eca67377a4..02e6a1bad8 100644 --- a/include/pika_slot_command.h +++ b/include/pika_slot_command.h @@ -22,9 +22,9 @@ extern uint32_t CRC32Update(uint32_t crc, const char *buf, int len); extern uint32_t CRC32CheckSum(const char *buf, int len); int GetSlotID(const std::string &str); -int GetKeyType(const std::string& key, std::string &key_type, const std::shared_ptr& slot); -void AddSlotKey(const std::string& type, const std::string& key, const std::shared_ptr& slot); -void RemSlotKey(const std::string& key, const std::shared_ptr& slot); +int GetKeyType(const std::string& key, std::string &key_type, const std::shared_ptr& db); +void AddSlotKey(const std::string& type, const std::string& key, const std::shared_ptr& db); +void RemSlotKey(const std::string& key, const std::shared_ptr& db); int DeleteKey(const std::string& key, const char key_type, const std::shared_ptr& slot); std::string GetSlotKey(int slot); std::string GetSlotsTagKey(uint32_t crc); @@ -75,8 +75,8 @@ class PikaMigrate { class SlotsMgrtTagSlotCmd : public Cmd { public: SlotsMgrtTagSlotCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtTagSlotCmd(*this); } private: @@ -92,8 +92,8 @@ class SlotsMgrtTagSlotCmd : public Cmd { class SlotsMgrtTagSlotAsyncCmd : public Cmd { public: SlotsMgrtTagSlotAsyncCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){} - void Do(std::shared_ptr slot) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtTagSlotAsyncCmd(*this); } private: @@ -111,8 +111,8 @@ class SlotsMgrtTagSlotAsyncCmd : public Cmd { class SlotsMgrtTagOneCmd : public Cmd { public: SlotsMgrtTagOneCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtTagOneCmd(*this); } private: @@ -129,8 +129,8 @@ class SlotsMgrtTagOneCmd : public Cmd { class SlotsMgrtAsyncStatusCmd : public Cmd { public: SlotsMgrtAsyncStatusCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db = nullptr) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtAsyncStatusCmd(*this); } @@ -141,8 +141,8 @@ class SlotsMgrtAsyncStatusCmd : public Cmd { class SlotsInfoCmd : public Cmd { public: SlotsInfoCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsInfoCmd(*this); } private: @@ -155,8 +155,8 @@ class SlotsInfoCmd : public Cmd { class SlotsMgrtAsyncCancelCmd : public Cmd { public: SlotsMgrtAsyncCancelCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtAsyncCancelCmd(*this); } private: @@ -166,8 +166,8 @@ class SlotsMgrtAsyncCancelCmd : public Cmd { class SlotsDelCmd : public Cmd { public: SlotsDelCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsDelCmd(*this); } private: @@ -178,8 +178,8 @@ class SlotsDelCmd : public Cmd { class SlotsHashKeyCmd : public Cmd { public: SlotsHashKeyCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsHashKeyCmd(*this); } private: @@ -190,8 +190,8 @@ class SlotsHashKeyCmd : public Cmd { class SlotsScanCmd : public Cmd { public: SlotsScanCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsScanCmd(*this); } private: @@ -214,8 +214,8 @@ class SlotsScanCmd : public Cmd { class SlotsMgrtExecWrapperCmd : public Cmd { public: SlotsMgrtExecWrapperCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtExecWrapperCmd(*this); } private: @@ -228,8 +228,8 @@ class SlotsMgrtExecWrapperCmd : public Cmd { class SlotsReloadCmd : public Cmd { public: SlotsReloadCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsReloadCmd(*this); } private: @@ -239,8 +239,8 @@ class SlotsReloadCmd : public Cmd { class SlotsReloadOffCmd : public Cmd { public: SlotsReloadOffCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptrslot) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsReloadOffCmd(*this); } private: @@ -250,8 +250,8 @@ class SlotsReloadOffCmd : public Cmd { class SlotsCleanupCmd : public Cmd { public: SlotsCleanupCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsCleanupCmd(*this); } std::vector cleanup_slots_; @@ -262,8 +262,8 @@ class SlotsCleanupCmd : public Cmd { class SlotsCleanupOffCmd : public Cmd { public: SlotsCleanupOffCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; + void Do(std::shared_ptr db) override; + void Split(std::shared_ptr db, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsCleanupOffCmd(*this); } private: diff --git a/src/pika_command.cc b/src/pika_command.cc index 3688623bcb..bd9553e20a 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -745,7 +745,7 @@ void Cmd::Execute() { void Cmd::ProcessSingleSlotCmd() { std::shared_ptr slot; slot = g_pika_server->GetSlotByDBName(db_name_); - + std::shared_ptr db = g_pika_server->GetDB(db_name_); if (!slot) { res_.SetRes(CmdRes::kErrOther, "Slot not found"); return; @@ -757,25 +757,25 @@ void Cmd::ProcessSingleSlotCmd() { res_.SetRes(CmdRes::kErrOther, "Slot not found"); return; } - ProcessCommand(slot, sync_slot); + ProcessCommand(db, sync_slot); } -void Cmd::ProcessCommand(const std::shared_ptr& slot, const std::shared_ptr& sync_slot, +void Cmd::ProcessCommand(const std::shared_ptr& db, const std::shared_ptr& sync_slot, const HintKeys& hint_keys) { if (stage_ == kNone) { - InternalProcessCommand(slot, sync_slot, hint_keys); + InternalProcessCommand(db, sync_slot, hint_keys); } else { if (stage_ == kBinlogStage) { DoBinlog(sync_slot); } else if (stage_ == kExecuteStage) { - DoCommand(slot, hint_keys); + DoCommand(db, hint_keys); } } } -void Cmd::InternalProcessCommand(const std::shared_ptr& slot, const std::shared_ptr& sync_slot, +void Cmd::InternalProcessCommand(const std::shared_ptr& db, const std::shared_ptr& sync_slot, const HintKeys& hint_keys) { - pstd::lock::MultiRecordLock record_lock(slot->LockMgr()); + pstd::lock::MultiRecordLock record_lock(db->LockMgr()); if (is_write()) { record_lock.Lock(current_key()); } @@ -784,7 +784,7 @@ void Cmd::InternalProcessCommand(const std::shared_ptr& slot, const std::s if (g_pika_conf->slowlog_slower_than() >= 0) { start_us = pstd::NowMicros(); } - DoCommand(slot, hint_keys); + DoCommand(db, hint_keys); if (g_pika_conf->slowlog_slower_than() >= 0) { do_duration_ += pstd::NowMicros() - start_us; } @@ -796,34 +796,34 @@ void Cmd::InternalProcessCommand(const std::shared_ptr& slot, const std::s } } -void Cmd::DoCommand(const std::shared_ptr& slot, const HintKeys& hint_keys) { +void Cmd::DoCommand(const std::shared_ptr& db, const HintKeys& hint_keys) { if (!IsSuspend()) { - slot->DbRWLockReader(); + db->DbRWLockReader(); } DEFER { if (!IsSuspend()) { - slot->DbRWUnLock(); + db->DbRWUnLock(); } }; if (IsNeedCacheDo() && PIKA_CACHE_NONE != g_pika_conf->cache_model() - && slot->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) { + && db->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) { if (IsNeedReadCache()) { - ReadCache(slot); + ReadCache(db); } if (is_read() && res().CacheMiss()) { - DoThroughDB(slot); + DoThroughDB(db); if (IsNeedUpdateCache()) { - DoUpdateCache(slot); + DoUpdateCache(db); } } else if (is_write()) { - DoThroughDB(slot); + DoThroughDB(db); if (IsNeedUpdateCache()) { - DoUpdateCache(slot); + DoUpdateCache(db); } } } else { - Do(slot); + Do(db); } } @@ -854,60 +854,6 @@ void Cmd::DoBinlog(const std::shared_ptr& slot) { } } -void Cmd::ProcessMultiSlotCmd() { - std::shared_ptr slot; - std::vector cur_key = current_key(); - if (cur_key.empty()) { - res_.SetRes(CmdRes::kErrOther, "Internal Error"); - return; - } - - int hint = 0; - std::unordered_map process_map; - // split cur_key into slots - std::shared_ptr db = g_pika_server->GetDB(db_name_); - if (!db) { - res_.SetRes(CmdRes::kErrOther, "DB not found"); - return; - } - - CmdStage current_stage = stage_; - for (auto& key : cur_key) { - // in sharding mode we select slot by key - uint32_t slot_id = g_pika_cmd_table_manager->DistributeKey(key, db->SlotNum()); - auto iter = process_map.find(slot_id); - if (iter == process_map.end()) { - std::shared_ptr slot = db->GetSlotById(slot_id); - if (!slot) { - res_.SetRes(CmdRes::kErrOther, "Slot not found"); - return; - } - std::shared_ptr sync_slot = - g_pika_rm->GetSyncMasterSlotByName(SlotInfo(slot->GetDBName(), slot->GetSlotID())); - if (!sync_slot) { - res_.SetRes(CmdRes::kErrOther, "Slot not found"); - return; - } - HintKeys hint_keys; - hint_keys.Push(key, hint); - process_map[slot_id] = ProcessArg(slot, sync_slot, hint_keys); - } else { - iter->second.hint_keys.Push(key, hint); - } - hint++; - } - for (auto& iter : process_map) { - ProcessArg& arg = iter.second; - ProcessCommand(arg.slot, arg.sync_slot, arg.hint_keys); - if (!res_.ok()) { - return; - } - } - if (current_stage == kNone || current_stage == kExecuteStage) { - Merge(); - } -} - bool Cmd::is_read() const { return ((flag_ & kCmdFlagsMaskRW) == kCmdFlagsRead); } bool Cmd::is_write() const { return ((flag_ & kCmdFlagsMaskRW) == kCmdFlagsWrite); } bool Cmd::IsLocal() const { return ((flag_ & kCmdFlagsMaskLocal) == kCmdFlagsLocal); } diff --git a/src/pika_db.cc b/src/pika_db.cc index 677b0bc8aa..4c2cefc8c9 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -82,6 +82,11 @@ bool DB::FlushSlotSubDB(const std::string& db_name) { void DB::SetBinlogIoError() { return binlog_io_error_.store(true); } void DB::SetBinlogIoErrorrelieve() { return binlog_io_error_.store(false); } bool DB::IsBinlogIoError() { return binlog_io_error_.load(); } +std::shared_ptr Slot::LockMgr() { return lock_mgr_; } +void DB::DbRWLockReader() { db_rwlock_.lock_shared(); } +void DB::DbRWUnLock() { db_rwlock_.unlock(); } +std::shared_ptr DB::cache() const { return cache_; } +std::shared_ptr DB::storage() const { return storage_; } uint32_t DB::SlotNum() { return slot_num_; } diff --git a/src/pika_kv.cc b/src/pika_kv.cc index 91b35c878f..a515e338dd 100644 --- a/src/pika_kv.cc +++ b/src/pika_kv.cc @@ -67,23 +67,23 @@ void SetCmd::DoInitial() { } } -void SetCmd::Do(std::shared_ptr slot) { +void SetCmd::Do(std::shared_ptr db) { int32_t res = 1; switch (condition_) { case SetCmd::kXX: - s_ = slot->db()->Setxx(key_, value_, &res, static_cast(sec_)); + s_ = db->storage()->Setxx(key_, value_, &res, static_cast(sec_)); break; case SetCmd::kNX: - s_ = slot->db()->Setnx(key_, value_, &res, static_cast(sec_)); + s_ = db->storage()->Setnx(key_, value_, &res, static_cast(sec_)); break; case SetCmd::kVX: - s_ = slot->db()->Setvx(key_, target_, value_, &success_, static_cast(sec_)); + s_ = db->storage()->Setvx(key_, target_, value_, &success_, static_cast(sec_)); break; case SetCmd::kEXORPX: - s_ = slot->db()->Setex(key_, value_, static_cast(sec_)); + s_ = db->storage()->Setex(key_, value_, static_cast(sec_)); break; default: - s_ = slot->db()->Set(key_, value_); + s_ = db->storage()->Set(key_, value_); break; } @@ -93,7 +93,7 @@ void SetCmd::Do(std::shared_ptr slot) { } else { if (res == 1) { res_.SetRes(CmdRes::kOk); - AddSlotKey("k", key_, slot); + AddSlotKey("k", key_, db); } else { res_.AppendStringLen(-1); } @@ -103,20 +103,20 @@ void SetCmd::Do(std::shared_ptr slot) { } } -void SetCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void SetCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void SetCmd::DoUpdateCache(std::shared_ptr slot) { +void SetCmd::DoUpdateCache(std::shared_ptr db) { if (SetCmd::kNX == condition_) { return; } if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; if (has_ttl_) { - slot->cache()->Setxx(CachePrefixKeyK, value_, sec_); + db->cache()->Setxx(CachePrefixKeyK, value_, sec_); } else { - slot->cache()->SetxxWithoutTTL(CachePrefixKeyK, value_); + db->cache()->SetxxWithoutTTL(CachePrefixKeyK, value_); } } } @@ -158,8 +158,8 @@ void GetCmd::DoInitial() { key_ = argv_[1]; } -void GetCmd::Do(std::shared_ptr slot) { - s_ = slot->db()->GetWithTTL(key_, &value_, &sec_); +void GetCmd::Do(std::shared_ptr db) { + s_ = db->storage()->GetWithTTL(key_, &value_, &sec_); if (s_.ok()) { res_.AppendStringLenUint64(value_.size()); res_.AppendContent(value_); @@ -170,9 +170,9 @@ void GetCmd::Do(std::shared_ptr slot) { } } -void GetCmd::ReadCache(std::shared_ptr slot) { +void GetCmd::ReadCache(std::shared_ptr db) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - auto s = slot->cache()->Get(CachePrefixKeyK, &value_); + auto s = db->cache()->Get(CachePrefixKeyK, &value_); if (s.ok()) { res_.AppendStringLen(value_.size()); res_.AppendContent(value_); @@ -181,15 +181,15 @@ void GetCmd::ReadCache(std::shared_ptr slot) { } } -void GetCmd::DoThroughDB(std::shared_ptr slot) { +void GetCmd::DoThroughDB(std::shared_ptr db) { res_.clear(); - Do(slot); + Do(db); } -void GetCmd::DoUpdateCache(std::shared_ptr slot) { +void GetCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->WriteKVToCache(CachePrefixKeyK, value_, sec_); + db->cache()->WriteKVToCache(CachePrefixKeyK, value_, sec_); } } @@ -202,15 +202,15 @@ void DelCmd::DoInitial() { keys_.assign(++iter, argv_.end()); } -void DelCmd::Do(std::shared_ptr slot) { +void DelCmd::Do(std::shared_ptr db) { std::map type_status; - int64_t count = slot->db()->Del(keys_, &type_status); + int64_t count = db->storage()->Del(keys_, &type_status); if (count >= 0) { res_.AppendInteger(count); s_ = rocksdb::Status::OK(); std::vector::const_iterator it; for (it = keys_.begin(); it != keys_.end(); it++) { - RemSlotKey(*it, slot); + RemSlotKey(*it, db); } } else { res_.SetRes(CmdRes::kErrOther, "delete error"); @@ -218,11 +218,11 @@ void DelCmd::Do(std::shared_ptr slot) { } } -void DelCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void DelCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void DelCmd::DoUpdateCache(std::shared_ptr slot) { +void DelCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::vector v; for (auto key : keys_) { @@ -232,13 +232,13 @@ void DelCmd::DoUpdateCache(std::shared_ptr slot) { v.emplace_back(PCacheKeyPrefixS + key); v.emplace_back(PCacheKeyPrefixH + key); } - slot->cache()->Del(v); + db->cache()->Del(v); } } -void DelCmd::Split(std::shared_ptr slot, const HintKeys& hint_keys) { +void DelCmd::Split(std::shared_ptr db, const HintKeys& hint_keys) { std::map type_status; - int64_t count = slot->db()->Del(hint_keys.keys, &type_status); + int64_t count = db->storage()->Del(hint_keys.keys, &type_status); if (count >= 0) { split_res_ += count; } else { @@ -266,11 +266,11 @@ void IncrCmd::DoInitial() { key_ = argv_[1]; } -void IncrCmd::Do(std::shared_ptr slot) { - s_ = slot->db()->Incrby(key_, 1, &new_value_); +void IncrCmd::Do(std::shared_ptr db) { + s_ = db->storage()->Incrby(key_, 1, &new_value_); if (s_.ok()) { res_.AppendContent(":" + std::to_string(new_value_)); - AddSlotKey("k", key_, slot); + AddSlotKey("k", key_, db); } else if (s_.IsCorruption() && s_.ToString() == "Corruption: Value is not a integer") { res_.SetRes(CmdRes::kInvalidInt); } else if (s_.IsInvalidArgument()) { @@ -280,14 +280,14 @@ void IncrCmd::Do(std::shared_ptr slot) { } } -void IncrCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void IncrCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void IncrCmd::DoUpdateCache(std::shared_ptr slot) { +void IncrCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->Incrxx(CachePrefixKeyK); + db->cache()->Incrxx(CachePrefixKeyK); } } @@ -303,11 +303,11 @@ void IncrbyCmd::DoInitial() { } } -void IncrbyCmd::Do(std::shared_ptr slot) { - s_ = slot->db()->Incrby(key_, by_, &new_value_); +void IncrbyCmd::Do(std::shared_ptr db) { + s_ = db->storage()->Incrby(key_, by_, &new_value_); if (s_.ok()) { res_.AppendContent(":" + std::to_string(new_value_)); - AddSlotKey("k", key_, slot); + AddSlotKey("k", key_, db); } else if (s_.IsCorruption() && s_.ToString() == "Corruption: Value is not a integer") { res_.SetRes(CmdRes::kInvalidInt); } else if (s_.IsInvalidArgument()) { @@ -317,14 +317,14 @@ void IncrbyCmd::Do(std::shared_ptr slot) { } } -void IncrbyCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void IncrbyCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void IncrbyCmd::DoUpdateCache(std::shared_ptr slot) { +void IncrbyCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->IncrByxx(CachePrefixKeyK, by_); + db->cache()->IncrByxx(CachePrefixKeyK, by_); } } @@ -341,12 +341,12 @@ void IncrbyfloatCmd::DoInitial() { } } -void IncrbyfloatCmd::Do(std::shared_ptr slot) { - s_ = slot->db()->Incrbyfloat(key_, value_, &new_value_); +void IncrbyfloatCmd::Do(std::shared_ptr db) { + s_ = db->storage()->Incrbyfloat(key_, value_, &new_value_); if (s_.ok()) { res_.AppendStringLenUint64(new_value_.size()); res_.AppendContent(new_value_); - AddSlotKey("k", key_, slot); + AddSlotKey("k", key_, db); } else if (s_.IsCorruption() && s_.ToString() == "Corruption: Value is not a vaild float") { res_.SetRes(CmdRes::kInvalidFloat); } else if (s_.IsInvalidArgument()) { @@ -356,16 +356,16 @@ void IncrbyfloatCmd::Do(std::shared_ptr slot) { } } -void IncrbyfloatCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void IncrbyfloatCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void IncrbyfloatCmd::DoUpdateCache(std::shared_ptr slot) { +void IncrbyfloatCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { long double long_double_by; if (storage::StrToLongDouble(value_.data(), value_.size(), &long_double_by) != -1) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->Incrbyfloatxx(CachePrefixKeyK, long_double_by); + db->cache()->Incrbyfloatxx(CachePrefixKeyK, long_double_by); } } } @@ -378,8 +378,8 @@ void DecrCmd::DoInitial() { key_ = argv_[1]; } -void DecrCmd::Do(std::shared_ptr slot) { - s_= slot->db()->Decrby(key_, 1, &new_value_); +void DecrCmd::Do(std::shared_ptr db) { + s_= db->storage()->Decrby(key_, 1, &new_value_); if (s_.ok()) { res_.AppendContent(":" + std::to_string(new_value_)); } else if (s_.IsCorruption() && s_.ToString() == "Corruption: Value is not a integer") { @@ -391,14 +391,14 @@ void DecrCmd::Do(std::shared_ptr slot) { } } -void DecrCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void DecrCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void DecrCmd::DoUpdateCache(std::shared_ptr slot) { +void DecrCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->Decrxx(CachePrefixKeyK); + db->cache()->Decrxx(CachePrefixKeyK); } } @@ -414,10 +414,10 @@ void DecrbyCmd::DoInitial() { } } -void DecrbyCmd::Do(std::shared_ptr slot) { - s_ = slot->db()->Decrby(key_, by_, &new_value_); +void DecrbyCmd::Do(std::shared_ptr db) { + s_ = db->storage()->Decrby(key_, by_, &new_value_); if (s_.ok()) { - AddSlotKey("k", key_, slot); + AddSlotKey("k", key_, db); res_.AppendContent(":" + std::to_string(new_value_)); } else if (s_.IsCorruption() && s_.ToString() == "Corruption: Value is not a integer") { res_.SetRes(CmdRes::kInvalidInt); @@ -428,14 +428,14 @@ void DecrbyCmd::Do(std::shared_ptr slot) { } } -void DecrbyCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void DecrbyCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void DecrbyCmd::DoUpdateCache(std::shared_ptr slot) { +void DecrbyCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->DecrByxx(CachePrefixKeyK, by_); + db->cache()->DecrByxx(CachePrefixKeyK, by_); } } @@ -448,9 +448,9 @@ void GetsetCmd::DoInitial() { new_value_ = argv_[2]; } -void GetsetCmd::Do(std::shared_ptr slot) { +void GetsetCmd::Do(std::shared_ptr db) { std::string old_value; - s_ = slot->db()->GetSet(key_, new_value_, &old_value); + s_ = db->storage()->GetSet(key_, new_value_, &old_value); if (s_.ok()) { if (old_value.empty()) { res_.AppendContent("$-1"); @@ -458,20 +458,20 @@ void GetsetCmd::Do(std::shared_ptr slot) { res_.AppendStringLenUint64(old_value.size()); res_.AppendContent(old_value); } - AddSlotKey("k", key_, slot); + AddSlotKey("k", key_, db); } else { res_.SetRes(CmdRes::kErrOther, s_.ToString()); } } -void GetsetCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void GetsetCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void GetsetCmd::DoUpdateCache(std::shared_ptr slot) { +void GetsetCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->SetxxWithoutTTL(CachePrefixKeyK, new_value_); + db->cache()->SetxxWithoutTTL(CachePrefixKeyK, new_value_); } } @@ -484,25 +484,25 @@ void AppendCmd::DoInitial() { value_ = argv_[2]; } -void AppendCmd::Do(std::shared_ptr slot) { +void AppendCmd::Do(std::shared_ptr db) { int32_t new_len = 0; - s_ = slot->db()->Append(key_, value_, &new_len); + s_ = db->storage()->Append(key_, value_, &new_len); if (s_.ok() || s_.IsNotFound()) { res_.AppendInteger(new_len); - AddSlotKey("k", key_, slot); + AddSlotKey("k", key_, db); } else { res_.SetRes(CmdRes::kErrOther, s_.ToString()); } } -void AppendCmd::DoThroughDB(std::shared_ptr slot){ - Do(slot); +void AppendCmd::DoThroughDB(std::shared_ptr db){ + Do(db); } -void AppendCmd::DoUpdateCache(std::shared_ptr slot) { +void AppendCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->Appendxx(CachePrefixKeyK, value_); + db->cache()->Appendxx(CachePrefixKeyK, value_); } } @@ -516,9 +516,9 @@ void MgetCmd::DoInitial() { split_res_.resize(keys_.size()); } -void MgetCmd::Do(std::shared_ptr slot) { +void MgetCmd::Do(std::shared_ptr db) { db_value_status_array_.clear(); - s_ = slot->db()->MGet(keys_, &db_value_status_array_); + s_ = db->storage()->MGet(keys_, &db_value_status_array_); if (s_.ok()) { res_.AppendArrayLenUint64(db_value_status_array_.size()); for (const auto& vs : db_value_status_array_) { @@ -534,13 +534,13 @@ void MgetCmd::Do(std::shared_ptr slot) { } } -void MgetCmd::ReadCache(std::shared_ptr slot) { +void MgetCmd::ReadCache(std::shared_ptr db) { std::vector CachePrefixKeyK; cache_value_status_array_.clear(); for (auto key : keys_) { CachePrefixKeyK.push_back(PCacheKeyPrefixK + key); } - auto s = slot->cache()->MGet(CachePrefixKeyK, &cache_value_status_array_); + auto s = db->cache()->MGet(CachePrefixKeyK, &cache_value_status_array_); if (s.ok()) { res_.AppendArrayLenUint64(cache_value_status_array_.size()); for (const auto& vs : cache_value_status_array_) { @@ -556,25 +556,25 @@ void MgetCmd::ReadCache(std::shared_ptr slot) { } } -void MgetCmd::DoThroughDB(std::shared_ptr slot) { +void MgetCmd::DoThroughDB(std::shared_ptr db) { res_.clear(); - Do(slot); + Do(db); } -void MgetCmd::DoUpdateCache(std::shared_ptr slot) { +void MgetCmd::DoUpdateCache(std::shared_ptr db) { for (size_t i = 0; i < keys_.size(); i++) { if (db_value_status_array_[i].status.ok()) { std::string CachePrefixKeyK; CachePrefixKeyK = PCacheKeyPrefixK + keys_[i]; - slot->cache()->WriteKVToCache(CachePrefixKeyK, db_value_status_array_[i].value, ttl_); + db->cache()->WriteKVToCache(CachePrefixKeyK, db_value_status_array_[i].value, ttl_); } } } -void MgetCmd::Split(std::shared_ptr slot, const HintKeys& hint_keys) { +void MgetCmd::Split(std::shared_ptr db, const HintKeys& hint_keys) { std::vector vss; const std::vector& keys = hint_keys.keys; - rocksdb::Status s = slot->db()->MGet(keys, &vss); + rocksdb::Status s = db->storage()->MGet(keys, &vss); if (s.ok()) { if (hint_keys.hints.size() != vss.size()) { res_.SetRes(CmdRes::kErrOther, "internal Mget return size invalid"); @@ -626,7 +626,7 @@ void KeysCmd::DoInitial() { } } -void KeysCmd::Do(std::shared_ptr slot) { +void KeysCmd::Do(std::shared_ptr db) { int64_t total_key = 0; int64_t cursor = 0; size_t raw_limit = g_pika_conf->max_client_response_size(); @@ -634,7 +634,7 @@ void KeysCmd::Do(std::shared_ptr slot) { std::vector keys; do { keys.clear(); - cursor = slot->db()->Scan(type_, cursor, pattern_, PIKA_SCAN_STEP_LENGTH, &keys); + cursor = db->storage()->Scan(type_, cursor, pattern_, PIKA_SCAN_STEP_LENGTH, &keys); for (const auto& key : keys) { RedisAppendLenUint64(raw, key.size(), "$"); RedisAppendContent(raw, key); @@ -659,12 +659,12 @@ void SetnxCmd::DoInitial() { value_ = argv_[2]; } -void SetnxCmd::Do(std::shared_ptr slot) { +void SetnxCmd::Do(std::shared_ptr db) { success_ = 0; - s_ = slot->db()->Setnx(key_, value_, &success_); + s_ = db->storage()->Setnx(key_, value_, &success_); if (s_.ok()) { res_.AppendInteger(success_); - AddSlotKey("k", key_, slot); + AddSlotKey("k", key_, db); } else { res_.SetRes(CmdRes::kErrOther, s_.ToString()); } @@ -702,24 +702,24 @@ void SetexCmd::DoInitial() { value_ = argv_[3]; } -void SetexCmd::Do(std::shared_ptr slot) { - s_ = slot->db()->Setex(key_, value_, static_cast(sec_)); +void SetexCmd::Do(std::shared_ptr db) { + s_ = db->storage()->Setex(key_, value_, static_cast(sec_)); if (s_.ok()) { res_.SetRes(CmdRes::kOk); - AddSlotKey("k", key_, slot); + AddSlotKey("k", key_, db); } else { res_.SetRes(CmdRes::kErrOther, s_.ToString()); } } -void SetexCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void SetexCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void SetexCmd::DoUpdateCache(std::shared_ptr slot) { +void SetexCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->Setxx(CachePrefixKeyK, value_, sec_); + db->cache()->Setxx(CachePrefixKeyK, value_, sec_); } } @@ -761,8 +761,8 @@ void PsetexCmd::DoInitial() { value_ = argv_[3]; } -void PsetexCmd::Do(std::shared_ptr slot) { - s_ = slot->db()->Setex(key_, value_, static_cast(usec_ / 1000)); +void PsetexCmd::Do(std::shared_ptr db) { + s_ = db->storage()->Setex(key_, value_, static_cast(usec_ / 1000)); if (s_.ok()) { res_.SetRes(CmdRes::kOk); } else { @@ -770,14 +770,14 @@ void PsetexCmd::Do(std::shared_ptr slot) { } } -void PsetexCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void PsetexCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void PsetexCmd::DoUpdateCache(std::shared_ptr slot) { +void PsetexCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->WriteKVToCache(CachePrefixKeyK, value_, static_cast(usec_ / 1000)); + db->cache()->WriteKVToCache(CachePrefixKeyK, value_, static_cast(usec_ / 1000)); } } @@ -815,8 +815,8 @@ void DelvxCmd::DoInitial() { value_ = argv_[2]; } -void DelvxCmd::Do(std::shared_ptr slot) { - rocksdb::Status s = slot->db()->Delvx(key_, value_, &success_); +void DelvxCmd::Do(std::shared_ptr db) { + rocksdb::Status s = db->storage()->Delvx(key_, value_, &success_); if (s.ok() || s.IsNotFound()) { res_.AppendInteger(success_); } else { @@ -840,34 +840,34 @@ void MsetCmd::DoInitial() { } } -void MsetCmd::Do(std::shared_ptr slot) { - s_ = slot->db()->MSet(kvs_); +void MsetCmd::Do(std::shared_ptr db) { + s_ = db->storage()->MSet(kvs_); if (s_.ok()) { res_.SetRes(CmdRes::kOk); std::vector::const_iterator it; for (it = kvs_.begin(); it != kvs_.end(); it++) { - AddSlotKey("k", it->key, slot); + AddSlotKey("k", it->key, db); } } else { res_.SetRes(CmdRes::kErrOther, s_.ToString()); } } -void MsetCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void MsetCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void MsetCmd::DoUpdateCache(std::shared_ptr slot) { +void MsetCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK; for (auto key : kvs_) { CachePrefixKeyK = PCacheKeyPrefixK + key.key; - slot->cache()->SetxxWithoutTTL(CachePrefixKeyK, key.value); + db->cache()->SetxxWithoutTTL(CachePrefixKeyK, key.value); } } } -void MsetCmd::Split(std::shared_ptr slot, const HintKeys& hint_keys) { +void MsetCmd::Split(std::shared_ptr db, const HintKeys& hint_keys) { std::vector kvs; const std::vector& keys = hint_keys.keys; const std::vector& hints = hint_keys.hints; @@ -882,7 +882,7 @@ void MsetCmd::Split(std::shared_ptr slot, const HintKeys& hint_keys) { return; } } - storage::Status s = slot->db()->MSet(kvs); + storage::Status s = db->storage()->MSet(kvs); if (s.ok()) { res_.SetRes(CmdRes::kOk); } else { @@ -924,14 +924,14 @@ void MsetnxCmd::DoInitial() { } } -void MsetnxCmd::Do(std::shared_ptr slot) { +void MsetnxCmd::Do(std::shared_ptr db) { success_ = 0; - rocksdb::Status s = slot->db()->MSetnx(kvs_, &success_); + rocksdb::Status s = db->storage()->MSetnx(kvs_, &success_); if (s.ok()) { res_.AppendInteger(success_); std::vector::const_iterator it; for (it = kvs_.begin(); it != kvs_.end(); it++) { - AddSlotKey("k", it->key, slot); + AddSlotKey("k", it->key, db); } } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); @@ -973,9 +973,9 @@ void GetrangeCmd::DoInitial() { } } -void GetrangeCmd::Do(std::shared_ptr slot) { +void GetrangeCmd::Do(std::shared_ptr db) { std::string substr; - s_= slot->db()->Getrange(key_, start_, end_, &substr); + s_= db->storage()->Getrange(key_, start_, end_, &substr); if (s_.ok() || s_.IsNotFound()) { res_.AppendStringLenUint64(substr.size()); res_.AppendContent(substr); @@ -984,10 +984,10 @@ void GetrangeCmd::Do(std::shared_ptr slot) { } } -void GetrangeCmd::ReadCache(std::shared_ptr slot) { +void GetrangeCmd::ReadCache(std::shared_ptr db) { std::string substr; std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - auto s = slot->cache()->GetRange(CachePrefixKeyK, start_, end_, &substr); + auto s = db->cache()->GetRange(CachePrefixKeyK, start_, end_, &substr); if (s.ok()) { res_.AppendStringLen(substr.size()); res_.AppendContent(substr); @@ -996,10 +996,10 @@ void GetrangeCmd::ReadCache(std::shared_ptr slot) { } } -void GetrangeCmd::DoThroughDB(std::shared_ptr slot) { +void GetrangeCmd::DoThroughDB(std::shared_ptr db) { res_.clear(); std::string substr; - s_ = slot->db()->GetrangeWithValue(key_, start_, end_, &substr, &value_, &sec_); + s_ = db->storage()->GetrangeWithValue(key_, start_, end_, &substr, &value_, &sec_); if (s_.ok()) { res_.AppendStringLen(substr.size()); res_.AppendContent(substr); @@ -1011,10 +1011,10 @@ void GetrangeCmd::DoThroughDB(std::shared_ptr slot) { } } -void GetrangeCmd::DoUpdateCache(std::shared_ptr slot) { +void GetrangeCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->WriteKVToCache(CachePrefixKeyK, value_, sec_); + db->cache()->WriteKVToCache(CachePrefixKeyK, value_, sec_); } } @@ -1031,25 +1031,25 @@ void SetrangeCmd::DoInitial() { value_ = argv_[3]; } -void SetrangeCmd::Do(std::shared_ptr slot) { +void SetrangeCmd::Do(std::shared_ptr db) { int32_t new_len = 0; - s_ = slot->db()->Setrange(key_, offset_, value_, &new_len); + s_ = db->storage()->Setrange(key_, offset_, value_, &new_len); if (s_.ok()) { res_.AppendInteger(new_len); - AddSlotKey("k", key_, slot); + AddSlotKey("k", key_, db); } else { res_.SetRes(CmdRes::kErrOther, s_.ToString()); } } -void SetrangeCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void SetrangeCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void SetrangeCmd::DoUpdateCache(std::shared_ptr slot) { +void SetrangeCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->SetRangexx(CachePrefixKeyK, offset_, value_); + db->cache()->SetRangexx(CachePrefixKeyK, offset_, value_); } } @@ -1061,9 +1061,9 @@ void StrlenCmd::DoInitial() { key_ = argv_[1]; } -void StrlenCmd::Do(std::shared_ptr slot) { +void StrlenCmd::Do(std::shared_ptr db) { int32_t len = 0; - s_ = slot->db()->Strlen(key_, &len); + s_ = db->storage()->Strlen(key_, &len); if (s_.ok() || s_.IsNotFound()) { res_.AppendInteger(len); @@ -1072,10 +1072,10 @@ void StrlenCmd::Do(std::shared_ptr slot) { } } -void StrlenCmd::ReadCache(std::shared_ptr slot) { +void StrlenCmd::ReadCache(std::shared_ptr db) { int32_t len = 0; std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - auto s= slot->cache()->Strlen(CachePrefixKeyK, &len); + auto s= db->cache()->Strlen(CachePrefixKeyK, &len); if (s.ok()) { res_.AppendInteger(len); } else { @@ -1083,9 +1083,9 @@ void StrlenCmd::ReadCache(std::shared_ptr slot) { } } -void StrlenCmd::DoThroughDB(std::shared_ptr slot) { +void StrlenCmd::DoThroughDB(std::shared_ptr db) { res_.clear(); - s_ = slot->db()->GetWithTTL(key_, &value_, &sec_); + s_ = db->storage()->GetWithTTL(key_, &value_, &sec_); if (s_.ok() || s_.IsNotFound()) { res_.AppendInteger(value_.size()); } else { @@ -1093,10 +1093,10 @@ void StrlenCmd::DoThroughDB(std::shared_ptr slot) { } } -void StrlenCmd::DoUpdateCache(std::shared_ptr slot) { +void StrlenCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->WriteKVToCache(CachePrefixKeyK, value_, sec_); + db->cache()->WriteKVToCache(CachePrefixKeyK, value_, sec_); } } @@ -1109,9 +1109,9 @@ void ExistsCmd::DoInitial() { keys_.erase(keys_.begin()); } -void ExistsCmd::Do(std::shared_ptr slot) { +void ExistsCmd::Do(std::shared_ptr db) { std::map type_status; - int64_t res = slot->db()->Exists(keys_, &type_status); + int64_t res = db->storage()->Exists(keys_, &type_status); if (res != -1) { res_.AppendInteger(res); } else { @@ -1119,9 +1119,9 @@ void ExistsCmd::Do(std::shared_ptr slot) { } } -void ExistsCmd::Split(std::shared_ptr slot, const HintKeys& hint_keys) { +void ExistsCmd::Split(std::shared_ptr db, const HintKeys& hint_keys) { std::map type_status; - int64_t res = slot->db()->Exists(hint_keys.keys, &type_status); + int64_t res = db->storage()->Exists(hint_keys.keys, &type_status); if (res != -1) { split_res_ += res; } else { @@ -1131,12 +1131,12 @@ void ExistsCmd::Split(std::shared_ptr slot, const HintKeys& hint_keys) { void ExistsCmd::Merge() { res_.AppendInteger(split_res_); } -void ExistsCmd::ReadCache(std::shared_ptr slot) { +void ExistsCmd::ReadCache(std::shared_ptr db) { int result = keys_.size(); std::string CachePrefixKeyK; for (auto key : keys_) { CachePrefixKeyK = PCacheKeyPrefixK + key; - bool exit = slot->cache()->Exists(CachePrefixKeyK); + bool exit = db->cache()->Exists(CachePrefixKeyK); if (!exit){ result--; } @@ -1148,9 +1148,9 @@ void ExistsCmd::ReadCache(std::shared_ptr slot) { } } -void ExistsCmd::DoThroughDB(std::shared_ptr slot) { +void ExistsCmd::DoThroughDB(std::shared_ptr db) { res_.clear(); - Do(slot); + Do(db); } void ExpireCmd::DoInitial() { @@ -1165,9 +1165,9 @@ void ExpireCmd::DoInitial() { } } -void ExpireCmd::Do(std::shared_ptr slot) { +void ExpireCmd::Do(std::shared_ptr db) { std::map type_status; - int64_t res = slot->db()->Expire(key_, static_cast(sec_), &type_status); + int64_t res = db->storage()->Expire(key_, static_cast(sec_), &type_status); if (res != -1) { res_.AppendInteger(res); s_ = rocksdb::Status::OK(); @@ -1199,14 +1199,14 @@ std::string ExpireCmd::ToRedisProtocol() { return content; } -void ExpireCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void ExpireCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void ExpireCmd::DoUpdateCache(std::shared_ptr slot) { +void ExpireCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->Expire(CachePrefixKeyK, sec_); + db->cache()->Expire(CachePrefixKeyK, sec_); } } @@ -1222,9 +1222,9 @@ void PexpireCmd::DoInitial() { } } -void PexpireCmd::Do(std::shared_ptr slot) { +void PexpireCmd::Do(std::shared_ptr db) { std::map type_status; - int64_t res = slot->db()->Expire(key_, static_cast(msec_ / 1000), &type_status); + int64_t res = db->storage()->Expire(key_, static_cast(msec_ / 1000), &type_status); if (res != -1) { res_.AppendInteger(res); s_ = rocksdb::Status::OK(); @@ -1256,14 +1256,14 @@ std::string PexpireCmd::ToRedisProtocol() { return content; } -void PexpireCmd::DoThroughDB(std::shared_ptr slot){ - Do(slot); +void PexpireCmd::DoThroughDB(std::shared_ptr db){ + Do(db); } -void PexpireCmd::DoUpdateCache(std::shared_ptr slot) { +void PexpireCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->Expire(CachePrefixKeyK, msec_/1000); + db->cache()->Expire(CachePrefixKeyK, msec_/1000); } } @@ -1279,9 +1279,9 @@ void ExpireatCmd::DoInitial() { } } -void ExpireatCmd::Do(std::shared_ptr slot) { +void ExpireatCmd::Do(std::shared_ptr db) { std::map type_status; - int32_t res = slot->db()->Expireat(key_, static_cast(time_stamp_), &type_status); + int32_t res = db->storage()->Expireat(key_, static_cast(time_stamp_), &type_status); if (res != -1) { res_.AppendInteger(res); s_ = rocksdb::Status::OK(); @@ -1292,14 +1292,14 @@ void ExpireatCmd::Do(std::shared_ptr slot) { } } -void ExpireatCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void ExpireatCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void ExpireatCmd::DoUpdateCache(std::shared_ptr slot) { +void ExpireatCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->Expireat(CachePrefixKeyK, time_stamp_); + db->cache()->Expireat(CachePrefixKeyK, time_stamp_); } } @@ -1337,9 +1337,9 @@ std::string PexpireatCmd::ToRedisProtocol() { return content; } -void PexpireatCmd::Do(std::shared_ptr slot) { +void PexpireatCmd::Do(std::shared_ptr db) { std::map type_status; - int32_t res = slot->db()->Expireat(key_, static_cast(time_stamp_ms_ / 1000), &type_status); + int32_t res = db->storage()->Expireat(key_, static_cast(time_stamp_ms_ / 1000), &type_status); if (res != -1) { res_.AppendInteger(res); s_ = rocksdb::Status::OK(); @@ -1349,14 +1349,14 @@ void PexpireatCmd::Do(std::shared_ptr slot) { } } -void PexpireatCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void PexpireatCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void PexpireatCmd::DoUpdateCache(std::shared_ptr slot) { +void PexpireatCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->Expireat(CachePrefixKeyK, time_stamp_ms_/1000); + db->cache()->Expireat(CachePrefixKeyK, time_stamp_ms_/1000); } } @@ -1368,10 +1368,10 @@ void TtlCmd::DoInitial() { key_ = argv_[1]; } -void TtlCmd::Do(std::shared_ptr slot) { +void TtlCmd::Do(std::shared_ptr db) { std::map type_timestamp; std::map type_status; - type_timestamp = slot->db()->TTL(key_, &type_status); + type_timestamp = db->storage()->TTL(key_, &type_status); for (const auto& item : type_timestamp) { // mean operation exception errors happen in database if (item.second == -3) { @@ -1395,10 +1395,10 @@ void TtlCmd::Do(std::shared_ptr slot) { } } -void TtlCmd::ReadCache(std::shared_ptr slot) { +void TtlCmd::ReadCache(std::shared_ptr db) { int64_t ttl = -1; std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - auto s = slot->cache()->TTL(CachePrefixKeyK, &ttl); + auto s = db->cache()->TTL(CachePrefixKeyK, &ttl); if (s.ok()) { res_.AppendInteger(ttl); } else { @@ -1406,9 +1406,9 @@ void TtlCmd::ReadCache(std::shared_ptr slot) { } } -void TtlCmd::DoThroughDB(std::shared_ptr slot) { +void TtlCmd::DoThroughDB(std::shared_ptr db) { res_.clear(); - Do(slot); + Do(db); } void PttlCmd::DoInitial() { @@ -1419,10 +1419,10 @@ void PttlCmd::DoInitial() { key_ = argv_[1]; } -void PttlCmd::Do(std::shared_ptr slot) { +void PttlCmd::Do(std::shared_ptr db) { std::map type_timestamp; std::map type_status; - type_timestamp = slot->db()->TTL(key_, &type_status); + type_timestamp = db->storage()->TTL(key_, &type_status); for (const auto& item : type_timestamp) { // mean operation exception errors happen in database if (item.second == -3) { @@ -1466,10 +1466,10 @@ void PttlCmd::Do(std::shared_ptr slot) { } } -void PttlCmd::ReadCache(std::shared_ptr slot) { +void PttlCmd::ReadCache(std::shared_ptr db) { int64_t ttl = -1; std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - auto s = slot->cache()->TTL(CachePrefixKeyK, &ttl); + auto s = db->cache()->TTL(CachePrefixKeyK, &ttl); if (s.ok()) { res_.AppendInteger(ttl * 1000); } else { @@ -1477,9 +1477,9 @@ void PttlCmd::ReadCache(std::shared_ptr slot) { } } -void PttlCmd::DoThroughDB(std::shared_ptr slot) { +void PttlCmd::DoThroughDB(std::shared_ptr db) { res_.clear(); - Do(slot); + Do(db); } void PersistCmd::DoInitial() { @@ -1490,9 +1490,9 @@ void PersistCmd::DoInitial() { key_ = argv_[1]; } -void PersistCmd::Do(std::shared_ptr slot) { +void PersistCmd::Do(std::shared_ptr db) { std::map type_status; - int32_t res = slot->db()->Persist(key_, &type_status); + int32_t res = db->storage()->Persist(key_, &type_status); if (res != -1) { res_.AppendInteger(res); s_ = rocksdb::Status::OK(); @@ -1502,14 +1502,14 @@ void PersistCmd::Do(std::shared_ptr slot) { } } -void PersistCmd::DoThroughDB(std::shared_ptr slot) { - Do(slot); +void PersistCmd::DoThroughDB(std::shared_ptr db) { + Do(db); } -void PersistCmd::DoUpdateCache(std::shared_ptr slot) { +void PersistCmd::DoUpdateCache(std::shared_ptr db) { if (s_.ok()) { std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - slot->cache()->Persist(CachePrefixKeyK); + db->cache()->Persist(CachePrefixKeyK); } } @@ -1521,9 +1521,9 @@ void TypeCmd::DoInitial() { key_ = argv_[1]; } -void TypeCmd::Do(std::shared_ptr slot) { +void TypeCmd::Do(std::shared_ptr db) { std::vector types(1); - rocksdb::Status s = slot->db()->GetType(key_, true, types); + rocksdb::Status s = db->storage()->GetType(key_, true, types); if (s.ok()) { res_.AppendContent("+" + types[0]); } else { @@ -1531,10 +1531,10 @@ void TypeCmd::Do(std::shared_ptr slot) { } } -void TypeCmd::ReadCache(std::shared_ptr slot) { +void TypeCmd::ReadCache(std::shared_ptr db) { std::string type; std::string CachePrefixKeyK = PCacheKeyPrefixK + key_; - auto s = slot->cache()->Type(CachePrefixKeyK, &type); + auto s = db->cache()->Type(CachePrefixKeyK, &type); if (s.ok()) { res_.AppendContent("+" + type); } else { @@ -1542,9 +1542,9 @@ void TypeCmd::ReadCache(std::shared_ptr slot) { } } -void TypeCmd::DoThroughDB(std::shared_ptr slot) { +void TypeCmd::DoThroughDB(std::shared_ptr db) { res_.clear(); - Do(slot); + Do(db); } void PTypeCmd::DoInitial() { @@ -1555,9 +1555,9 @@ void PTypeCmd::DoInitial() { key_ = argv_[1]; } -void PTypeCmd::Do(std::shared_ptr slot) { +void PTypeCmd::Do(std::shared_ptr db) { std::vector types(5); - rocksdb::Status s = slot->db()->GetType(key_, false, types); + rocksdb::Status s = db->storage()->GetType(key_, false, types); if (s.ok()) { res_.AppendArrayLenUint64(types.size()); @@ -1620,7 +1620,7 @@ void ScanCmd::DoInitial() { } } -void ScanCmd::Do(std::shared_ptr slot) { +void ScanCmd::Do(std::shared_ptr db) { int64_t total_key = 0; int64_t batch_count = 0; int64_t left = count_; @@ -1633,7 +1633,7 @@ void ScanCmd::Do(std::shared_ptr slot) { keys.clear(); batch_count = left < PIKA_SCAN_STEP_LENGTH ? left : PIKA_SCAN_STEP_LENGTH; left = left > PIKA_SCAN_STEP_LENGTH ? left - PIKA_SCAN_STEP_LENGTH : 0; - cursor_ret = slot->db()->Scan(type_, cursor_ret, pattern_, batch_count, &keys); + cursor_ret = db->storage()->Scan(type_, cursor_ret, pattern_, batch_count, &keys); for (const auto& key : keys) { RedisAppendLenUint64(raw, key.size(), "$"); RedisAppendContent(raw, key); @@ -1701,10 +1701,10 @@ void ScanxCmd::DoInitial() { } } -void ScanxCmd::Do(std::shared_ptr slot) { +void ScanxCmd::Do(std::shared_ptr db) { std::string next_key; std::vector keys; - rocksdb::Status s = slot->db()->Scanx(type_, start_key_, pattern_, count_, &keys, &next_key); + rocksdb::Status s = db->storage()->Scanx(type_, start_key_, pattern_, count_, &keys, &next_key); if (s.ok()) { res_.AppendArrayLen(2); @@ -1734,8 +1734,8 @@ void PKSetexAtCmd::DoInitial() { } } -void PKSetexAtCmd::Do(std::shared_ptr slot) { - s_ = slot->db()->PKSetexAt(key_, value_, static_cast(time_stamp_)); +void PKSetexAtCmd::Do(std::shared_ptr db) { + s_ = db->storage()->PKSetexAt(key_, value_, static_cast(time_stamp_)); if (s_.ok()) { res_.SetRes(CmdRes::kOk); } else { @@ -1797,11 +1797,11 @@ void PKScanRangeCmd::DoInitial() { } } -void PKScanRangeCmd::Do(std::shared_ptr slot) { +void PKScanRangeCmd::Do(std::shared_ptr db) { std::string next_key; std::vector keys; std::vector kvs; - s_ = slot->db()->PKScanRange(type_, key_start_, key_end_, pattern_, static_cast(limit_), &keys, &kvs, &next_key); + s_ = db->storage()->PKScanRange(type_, key_start_, key_end_, pattern_, static_cast(limit_), &keys, &kvs, &next_key); if (s_.ok()) { res_.AppendArrayLen(2); @@ -1881,11 +1881,11 @@ void PKRScanRangeCmd::DoInitial() { } } -void PKRScanRangeCmd::Do(std::shared_ptr slot) { +void PKRScanRangeCmd::Do(std::shared_ptr db) { std::string next_key; std::vector keys; std::vector kvs; - s_ = slot->db()->PKRScanRange(type_, key_start_, key_end_, pattern_, static_cast(limit_), + s_ = db->storage()->PKRScanRange(type_, key_start_, key_end_, pattern_, static_cast(limit_), &keys, &kvs, &next_key); if (s_.ok()) { diff --git a/src/pika_slot_command.cc b/src/pika_slot_command.cc index 7279f7b20f..48f45ef414 100644 --- a/src/pika_slot_command.cc +++ b/src/pika_slot_command.cc @@ -799,7 +799,7 @@ int GetSlotsID(const std::string &str, uint32_t *pcrc, int *phastag) { uint32_t CRC32CheckSum(const char *buf, int len) { return CRC32Update(0, buf, len); } // add key to slotkey -void AddSlotKey(const std::string& type, const std::string& key, const std::shared_ptr& slot) { +void AddSlotKey(const std::string& type, const std::string& key, const std::shared_ptr& db) { if (g_pika_conf->slotmigrate() != true) { return; } @@ -812,7 +812,7 @@ void AddSlotKey(const std::string& type, const std::string& key, const std::shar std::string slot_key = GetSlotKey(slotID); std::vector members; members.emplace_back(type + key); - s = slot->db()->SAdd(slot_key, members, &res); + s = db->storage()->SAdd(slot_key, members, &res); if (!s.ok()) { LOG(ERROR) << "sadd key[" << key << "] to slotKey[" << slot_key << "] failed, error: " << s.ToString(); return; @@ -822,7 +822,7 @@ void AddSlotKey(const std::string& type, const std::string& key, const std::shar // prevent write slot_key success, but write tag_key failed, so always write tag_key if (hastag) { std::string tag_key = GetSlotsTagKey(crc); - s = slot->db()->SAdd(tag_key, members, &res); + s = db->storage()->SAdd(tag_key, members, &res); if (!s.ok()) { LOG(ERROR) << "sadd key[" << key << "] to tagKey[" << tag_key << "] failed, error: " << s.ToString(); return; @@ -831,19 +831,19 @@ void AddSlotKey(const std::string& type, const std::string& key, const std::shar } // del key from slotkey -void RemSlotKey(const std::string& key, const std::shared_ptr& slot) { +void RemSlotKey(const std::string& key, const std::shared_ptr& db) { if (g_pika_conf->slotmigrate() != true) { return; } std::string type; - if (GetKeyType(key, type, slot) < 0) { + if (GetKeyType(key, type, db) < 0) { LOG(WARNING) << "SRem key: " << key << " from slotKey error"; return; } std::string slotKey = GetSlotKey(GetSlotID(key)); int32_t count = 0; std::vector members(1, type + key); - rocksdb::Status s = slot->db()->SRem(slotKey, members, &count); + rocksdb::Status s = db->storage()->SRem(slotKey, members, &count); if (!s.ok()) { LOG(WARNING) << "SRem key: " << key << " from slotKey, error: " << s.ToString(); return;