Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: change storage ttl time from seconds to milliseconds #2822

Merged
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
working-directory: ${{ github.workspace }}/build
# Execute tests defined by the CMake configuration.
# See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail
run: ctest -C ${{ env.BUILD_TYPE }}
run: ctest -C ${{ env.BUILD_TYPE }} --verbose

- name: Unit Test
working-directory: ${{ github.workspace }}
Expand Down Expand Up @@ -111,7 +111,7 @@ jobs:

- name: Test
working-directory: ${{ github.workspace }}/build
run: ctest --rerun-failed --output-on-failure -C ${{ env.BUILD_TYPE }}
run: ctest -C ${{ env.BUILD_TYPE }} --verbose

- name: Unit Test
working-directory: ${{ github.workspace }}
Expand Down
2 changes: 1 addition & 1 deletion src/net/examples/performance/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ int main(int argc, char* argv[]) {

std::unique_ptr<ServerThread> st_thread(NewDispatchThread(ip, port, 24, &conn_factory, 1000));
st_thread->StartThread();
uint64_t st, ed;
pstd::TimeType st, ed;

while (!should_stop) {
st = NowMicros();
Expand Down
8 changes: 4 additions & 4 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2730,8 +2730,8 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
"The rsync rate limit now is "
<< new_throughput_limit << "(Which Is Around " << (new_throughput_limit >> 20) << " MB/s)";
res_.AppendStringRaw("+OK\r\n");
} else if(set_item == "rsync-timeout-ms"){
if(pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0){
} else if (set_item == "rsync-timeout-ms") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rsync-timeout-ms'\r\n");
return;
}
Expand Down Expand Up @@ -2926,9 +2926,9 @@ void DbsizeCmd::Do() {
if (!dbs) {
res_.SetRes(CmdRes::kInvalidDB);
} else {
if (g_pika_conf->slotmigrate()){
if (g_pika_conf->slotmigrate()) {
int64_t dbsize = 0;
for (int i = 0; i < g_pika_conf->default_slot_num(); ++i){
for (int i = 0; i < g_pika_conf->default_slot_num(); ++i) {
int32_t card = 0;
rocksdb::Status s = dbs->storage()->SCard(SlotKeyPrefix+std::to_string(i), &card);
if (s.ok() && card >= 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/pika_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void BitGetCmd::DoThroughDB() {
Do();
}

void BitGetCmd::DoUpdateCache(){
void BitGetCmd::DoUpdateCache() {
if (s_.ok()) {
db_->cache()->PushKeyToAsyncLoadQueue(PIKA_KEY_TYPE_KV, key_, db_);
}
Expand Down
4 changes: 2 additions & 2 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ int PikaConf::Load() {

std::string admin_cmd_list;
GetConfStr("admin-cmd-list", &admin_cmd_list);
if (admin_cmd_list == ""){
if (admin_cmd_list == "") {
admin_cmd_list = "info, monitor, ping";
SetAdminCmd(admin_cmd_list);
}
Expand Down Expand Up @@ -704,7 +704,7 @@ int PikaConf::Load() {

int64_t tmp_rsync_timeout_ms = -1;
GetConfInt64("rsync-timeout-ms", &tmp_rsync_timeout_ms);
if(tmp_rsync_timeout_ms <= 0){
if (tmp_rsync_timeout_ms <= 0) {
rsync_timeout_ms_.store(1000);
} else {
rsync_timeout_ms_.store(tmp_rsync_timeout_ms);
Expand Down
2 changes: 1 addition & 1 deletion src/pika_geo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ static void GetAllNeighbors(const std::shared_ptr<DB>& db, std::string& key, Geo
int32_t count = 0;
int32_t card = db->storage()->Exists({range.storekey});
if (card) {
if (db->storage()->Del({range.storekey}) > 0){
if (db->storage()->Del({range.storekey}) > 0) {
db->cache()->Del({range.storekey});
}
}
Expand Down
25 changes: 7 additions & 18 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ void AppendCmd::Do() {
}
}

void AppendCmd::DoThroughDB(){
void AppendCmd::DoThroughDB() {
Do();
}

Expand Down Expand Up @@ -529,7 +529,7 @@ void MgetCmd::AssembleResponseFromCache() {
void MgetCmd::Do() {
// Without using the cache and querying only the DB, we need to use keys_.
// This line will only be assigned when querying the DB directly.
if(cache_miss_keys_.size() == 0) {
if (cache_miss_keys_.size() == 0) {
cache_miss_keys_ = keys_;
}
db_value_status_array_.clear();
Expand Down Expand Up @@ -944,7 +944,7 @@ void MsetCmd::DoBinlog() {
set_argv[0] = "set";
set_cmd_->SetConn(GetConn());
set_cmd_->SetResp(resp_.lock());
for(auto& kv: kvs_){
for(auto& kv: kvs_) {
set_argv[1] = kv.key;
set_argv[2] = kv.value;
set_cmd_->Initial(set_argv, db_name_);
Expand Down Expand Up @@ -1293,7 +1293,7 @@ std::string PexpireCmd::ToRedisProtocol() {
return content;
}

void PexpireCmd::DoThroughDB(){
void PexpireCmd::DoThroughDB() {
Do();
}

Expand Down Expand Up @@ -1433,7 +1433,7 @@ void PttlCmd::DoInitial() {
}

void PttlCmd::Do() {
int64_t timestamp = db_->storage()->TTL(key_);
int64_t timestamp = db_->storage()->PTTL(key_);
if (timestamp == -3) {
res_.SetRes(CmdRes::kErrOther, "ttl internal error");
} else {
Expand All @@ -1442,19 +1442,8 @@ void PttlCmd::Do() {
}

void PttlCmd::ReadCache() {
int64_t timestamp = db_->cache()->TTL(key_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expire, expireat, pexpire, pexireat这些也需要修改吧。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这几个方法没有没有重写 ReadCche 方法,应该不用改动

if (timestamp == -3) {
res_.SetRes(CmdRes::kErrOther, "ttl internal error");
} else if (timestamp != -2) {
if (timestamp == -1) {
res_.AppendInteger(-1);
} else {
res_.AppendInteger(timestamp * 1000);
}
} else {
// mean this key not exist
res_.SetRes(CmdRes::kCacheMiss);
}
// redis cache don't support pttl cache, so read directly from db
DoThroughDB();
}

void PttlCmd::DoThroughDB() {
Expand Down
2 changes: 1 addition & 1 deletion src/pika_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void LLenCmd::Do() {
void LLenCmd::ReadCache() {
uint64_t llen = 0;
auto s = db_->cache()->LLen(key_, &llen);
if (s.ok()){
if (s.ok()) {
res_.AppendInteger(llen);
} else if (s.IsNotFound()) {
res_.SetRes(CmdRes::kCacheMiss);
Expand Down
16 changes: 8 additions & 8 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ int32_t PikaServer::GetSlaveListString(std::string& slave_list_str) {
master_boffset.offset - sent_slave_boffset.offset;
tmp_stream << "(" << db->DBName() << ":" << lag << ")";
}
} else if (s.ok() && slave_state == SlaveState::kSlaveDbSync){
} else if (s.ok() && slave_state == SlaveState::kSlaveDbSync) {
tmp_stream << "(" << db->DBName() << ":full syncing)";
} else {
tmp_stream << "(" << db->DBName() << ":not syncing)";
Expand Down Expand Up @@ -1500,16 +1500,16 @@ void DoBgslotsreload(void* arg) {
rocksdb::Status s;
std::vector<std::string> keys;
int64_t cursor_ret = -1;
while(cursor_ret != 0 && p->GetSlotsreloading()){
while(cursor_ret != 0 && p->GetSlotsreloading()) {
cursor_ret = reload.db->storage()->Scan(storage::DataType::kAll, reload.cursor, reload.pattern, reload.count, &keys);

std::vector<std::string>::const_iterator iter;
for (iter = keys.begin(); iter != keys.end(); iter++) {
std::string key_type;
int s = GetKeyType(*iter, key_type, reload.db);
//if key is slotkey, can't add to SlotKey
if (s > 0){
if (key_type == "s" && ((*iter).find(SlotKeyPrefix) != std::string::npos || (*iter).find(SlotTagPrefix) != std::string::npos)){
if (s > 0) {
if (key_type == "s" && ((*iter).find(SlotKeyPrefix) != std::string::npos || (*iter).find(SlotTagPrefix) != std::string::npos)) {
continue;
}

Expand Down Expand Up @@ -1618,7 +1618,7 @@ void DoBgslotscleanup(void* arg) {
std::vector<std::string> keys;
int64_t cursor_ret = -1;
std::vector<int> cleanupSlots(cleanup.cleanup_slots);
while (cursor_ret != 0 && p->GetSlotscleaningup()){
while (cursor_ret != 0 && p->GetSlotscleaningup()) {
cursor_ret = g_pika_server->bgslots_cleanup_.db->storage()->Scan(storage::DataType::kAll, cleanup.cursor, cleanup.pattern, cleanup.count, &keys);

std::string key_type;
Expand All @@ -1627,12 +1627,12 @@ void DoBgslotscleanup(void* arg) {
if ((*iter).find(SlotKeyPrefix) != std::string::npos || (*iter).find(SlotTagPrefix) != std::string::npos) {
continue;
}
if (std::find(cleanupSlots.begin(), cleanupSlots.end(), GetSlotID(g_pika_conf->default_slot_num(), *iter)) != cleanupSlots.end()){
if (std::find(cleanupSlots.begin(), cleanupSlots.end(), GetSlotID(g_pika_conf->default_slot_num(), *iter)) != cleanupSlots.end()) {
if (GetKeyType(*iter, key_type, g_pika_server->bgslots_cleanup_.db) <= 0) {
LOG(WARNING) << "slots clean get key type for slot " << GetSlotID(g_pika_conf->default_slot_num(), *iter) << " key " << *iter << " error";
continue;
}
if (DeleteKey(*iter, key_type[0], g_pika_server->bgslots_cleanup_.db) <= 0){
if (DeleteKey(*iter, key_type[0], g_pika_server->bgslots_cleanup_.db) <= 0) {
LOG(WARNING) << "slots clean del for slot " << GetSlotID(g_pika_conf->default_slot_num(), *iter) << " key "<< *iter << " error";
}
}
Expand All @@ -1643,7 +1643,7 @@ void DoBgslotscleanup(void* arg) {
keys.clear();
}

for (int cleanupSlot : cleanupSlots){
for (int cleanupSlot : cleanupSlots) {
WriteDelKeyToBinlog(GetSlotKey(cleanupSlot), g_pika_server->bgslots_cleanup_.db);
WriteDelKeyToBinlog(GetSlotsTagKey(cleanupSlot), g_pika_server->bgslots_cleanup_.db);
}
Expand Down
10 changes: 5 additions & 5 deletions src/pika_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void ZCardCmd::Do() {
}
}

void ZCardCmd::ReadCache(){
void ZCardCmd::ReadCache() {
res_.SetRes(CmdRes::kCacheMiss);
}

Expand Down Expand Up @@ -590,7 +590,7 @@ void ZRevrangebyscoreCmd::Do() {
}
}

void ZRevrangebyscoreCmd::ReadCache(){
void ZRevrangebyscoreCmd::ReadCache() {
if (min_score_ == storage::ZSET_SCORE_MAX || max_score_ == storage::ZSET_SCORE_MIN
|| max_score_ < min_score_) {
res_.AppendContent("*0");
Expand Down Expand Up @@ -827,7 +827,7 @@ void ZUnionstoreCmd::DoBinlog() {
del_cmd->SetResp(resp_.lock());
del_cmd->DoBinlog();

if(value_to_dest_.empty()){
if (value_to_dest_.empty()) {
// The union operation got an empty set, only use del to simulate overwrite the dest_key with empty set
return;
}
Expand Down Expand Up @@ -975,7 +975,7 @@ void ZRankCmd::ReadCache() {
auto s = db_->cache()->ZRank(key_, member_, &rank, db_);
if (s.ok()) {
res_.AppendInteger(rank);
} else if (s.IsNotFound()){
} else if (s.IsNotFound()) {
res_.SetRes(CmdRes::kCacheMiss);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
Expand Down Expand Up @@ -1020,7 +1020,7 @@ void ZRevrankCmd::ReadCache() {
auto s = db_->cache()->ZRevrank(key_, member_, &revrank, db_);
if (s.ok()) {
res_.AppendInteger(revrank);
} else if (s.IsNotFound()){
} else if (s.IsNotFound()) {
res_.SetRes(CmdRes::kCacheMiss);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
Expand Down
7 changes: 6 additions & 1 deletion src/pstd/include/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class SequentialFile;
class RWFile;
class RandomRWFile;

using TimeType = uint64_t;

/*
* Set the resource limits of a process
*/
Expand Down Expand Up @@ -61,7 +63,10 @@ class FileLock : public pstd::noncopyable {
int GetChildren(const std::string& dir, std::vector<std::string>& result);
void GetDescendant(const std::string& dir, std::vector<std::string>& result);

uint64_t NowMicros();
TimeType NowMicros();

TimeType NowMillis();

void SleepForMicroseconds(int micros);

Status NewSequentialFile(const std::string& fname, std::unique_ptr<SequentialFile>& result);
Expand Down
7 changes: 6 additions & 1 deletion src/pstd/src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,16 @@ uint64_t Du(const std::string& path) {
return sum;
}

uint64_t NowMicros() {
TimeType NowMicros() {
auto now = std::chrono::system_clock::now();
return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
}

TimeType NowMillis() {
auto now = std::chrono::system_clock::now();
return std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
}

void SleepForMicroseconds(int micros) { std::this_thread::sleep_for(std::chrono::microseconds(micros)); }

SequentialFile::~SequentialFile() = default;
Expand Down
7 changes: 7 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,13 @@ class Storage {
// return > 0 TTL in seconds
int64_t TTL(const Slice& key);

// Returns the remaining time to live of a key that has a timeout.
// return -3 operation exception errors happen in database
// return -2 if the key does not exist
// return -1 if the key exists but has not associated expire
// return > 0 TTL in milliseconds
int64_t PTTL(const Slice& key);

// Reutrns the data all type of the key
// if single is true, the query will return the first one
Status GetType(const std::string& key, enum DataType& type);
Expand Down
12 changes: 8 additions & 4 deletions src/storage/src/base_data_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class BaseDataValue : public InternalValue {
dst += user_value_.size();
memcpy(dst, reserve_, kSuffixReserveLength);
dst += kSuffixReserveLength;
EncodeFixed64(dst, ctime_);
uint64_t ctime = ctime_ > 0 ? (ctime_ | (1ULL << 63)) : 0;
EncodeFixed64(dst, ctime);
dst += kTimestampLength;
return rocksdb::Slice(start_pos, needed);
}
Expand All @@ -58,7 +59,8 @@ class ParsedBaseDataValue : public ParsedInternalValue {
if (value_->size() >= kBaseDataValueSuffixLength) {
user_value_ = rocksdb::Slice(value_->data(), value_->size() - kBaseDataValueSuffixLength);
memcpy(reserve_, value_->data() + user_value_.size(), kSuffixReserveLength);
ctime_ = DecodeFixed64(value_->data() + user_value_.size() + kSuffixReserveLength);
uint64_t ctime = DecodeFixed64(value_->data() + user_value_.size() + kSuffixReserveLength);
ctime_ = (ctime & ~(1ULL << 63));
}
}

Expand All @@ -70,7 +72,8 @@ class ParsedBaseDataValue : public ParsedInternalValue {
if (value.size() >= kBaseDataValueSuffixLength) {
user_value_ = rocksdb::Slice(value.data(), value.size() - kBaseDataValueSuffixLength);
memcpy(reserve_, value.data() + user_value_.size(), kSuffixReserveLength);
ctime_ = DecodeFixed64(value.data() + user_value_.size() + kSuffixReserveLength);
uint64_t ctime = DecodeFixed64(value.data() + user_value_.size() + kSuffixReserveLength);
ctime_ = (ctime & ~(1ULL << 63));
}
}

Expand All @@ -81,7 +84,8 @@ class ParsedBaseDataValue : public ParsedInternalValue {
void SetCtimeToValue() override {
if (value_) {
char* dst = const_cast<char*>(value_->data()) + value_->size() - kTimestampLength;
EncodeFixed64(dst, ctime_);
uint64_t ctime = ctime_ > 0 ? (ctime_ | (1ULL << 63)) : 0;
EncodeFixed64(dst, ctime);
}
}

Expand Down
9 changes: 3 additions & 6 deletions src/storage/src/base_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ class BaseMetaFilter : public rocksdb::CompactionFilter {
BaseMetaFilter() = default;
bool Filter(int level, const rocksdb::Slice& key, const rocksdb::Slice& value, std::string* new_value,
bool* value_changed) const override {
int64_t unix_time;
rocksdb::Env::Default()->GetCurrentTime(&unix_time);
auto cur_time = static_cast<uint64_t>(unix_time);
auto cur_time = pstd::NowMillis();
/*
* For the filtering of meta information, because the field designs of string
* and list are different, their filtering policies are written separately.
Expand Down Expand Up @@ -181,9 +179,8 @@ class BaseDataFilter : public rocksdb::CompactionFilter {
return true;
}

int64_t unix_time;
rocksdb::Env::Default()->GetCurrentTime(&unix_time);
if (cur_meta_etime_ != 0 && cur_meta_etime_ < static_cast<uint64_t>(unix_time)) {
pstd::TimeType unix_time = pstd::NowMillis();
if (cur_meta_etime_ != 0 && cur_meta_etime_ < unix_time) {
TRACE("Drop[Timeout]");
return true;
}
Expand Down
Loading
Loading