Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: delete slot #2251

Merged
merged 20 commits into from
Jan 19, 2024
Merged
157 changes: 78 additions & 79 deletions include/pika_admin.h

Large diffs are not rendered by default.

14 changes: 2 additions & 12 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Version final : public pstd::noncopyable {

void debug() {
std::shared_lock l(rwlock_);
printf("Current pro_num %u pro_offset %lu\n", pro_num_, pro_offset_);
printf("Current pro_num %u pro_offset %llu\n", pro_num_, pro_offset_);
}

private:
Expand All @@ -63,12 +63,8 @@ class Binlog : public pstd::noncopyable {
// Need to hold Lock();
pstd::Status Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index);

uint64_t file_size() { return file_size_; }

std::string filename() { return filename_; }

bool IsBinlogIoError() { return binlog_io_error_; }

// need to hold mutex_
void SetTerm(uint32_t term) {
std::lock_guard l(version_->rwlock_);
Expand All @@ -85,11 +81,9 @@ class Binlog : public pstd::noncopyable {

private:
pstd::Status Put(const char* item, int len);
pstd::Status EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int* temp_pro_offset);
static pstd::Status AppendPadding(pstd::WritableFile* file, uint64_t* len);
// pstd::WritableFile *queue() { return queue_; }

void InitLogFile();
pstd::Status EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int* temp_pro_offset);

/*
* Produce
Expand All @@ -109,17 +103,13 @@ class Binlog : public pstd::noncopyable {

int block_offset_ = 0;

char* pool_ = nullptr;
bool exit_all_consume_ = false;
const std::string binlog_path_;

uint64_t file_size_ = 0;

std::string filename_;

std::atomic<bool> binlog_io_error_;
// Not use
// int32_t retry_;
};

#endif
3 changes: 0 additions & 3 deletions include/pika_binlog_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

#include "include/pika_binlog.h"

// using pstd::Slice;
// using pstd::Status;

class PikaBinlogReader {
public:
PikaBinlogReader(uint32_t cur_filenum, uint64_t cur_offset);
Expand Down
49 changes: 24 additions & 25 deletions include/pika_bit.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include "storage/storage.h"

#include "include/pika_command.h"
#include "include/pika_slot.h"
#include "include/pika_kv.h"

/*
Expand All @@ -23,11 +22,11 @@ class BitGetCmd : public Cmd {
res.push_back(key_);
return res;
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void ReadCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Do(std::shared_ptr<DB> db) override;
void ReadCache(std::shared_ptr<DB> db) override;
void DoUpdateCache(std::shared_ptr<DB> db) override;
void DoThroughDB(std::shared_ptr<DB> db) override;
void Split(std::shared_ptr<DB> db, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new BitGetCmd(*this); }

Expand All @@ -50,10 +49,10 @@ class BitSetCmd : public Cmd {
res.push_back(key_);
return res;
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Do(std::shared_ptr<DB> db) override;
void DoUpdateCache(std::shared_ptr<DB> db) override;
void DoThroughDB(std::shared_ptr<DB> db) override;
void Split(std::shared_ptr<DB> db, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new BitSetCmd(*this); }

Expand All @@ -78,11 +77,11 @@ class BitCountCmd : public Cmd {
res.push_back(key_);
return res;
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void ReadCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Do(std::shared_ptr<DB> db) override;
void ReadCache(std::shared_ptr<DB> db) override;
void DoUpdateCache(std::shared_ptr<DB> db) override;
void DoThroughDB(std::shared_ptr<DB> db) override;
void Split(std::shared_ptr<DB> db, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new BitCountCmd(*this); }

Expand All @@ -109,11 +108,11 @@ class BitPosCmd : public Cmd {
res.push_back(key_);
return res;
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void ReadCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Do(std::shared_ptr<DB> db) override;
void ReadCache(std::shared_ptr<DB> db) override;
void DoUpdateCache(std::shared_ptr<DB> db) override;
void DoThroughDB(std::shared_ptr<DB> db) override;
void Split(std::shared_ptr<DB> db, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new BitPosCmd(*this); }

Expand Down Expand Up @@ -153,13 +152,13 @@ class BitOpCmd : public Cmd {
std::vector<std::string> current_key() const override {
return {dest_key_};
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Do(std::shared_ptr<DB> db) override;
void DoUpdateCache(std::shared_ptr<DB> db) override;
void DoThroughDB(std::shared_ptr<DB> db) override;
void Split(std::shared_ptr<DB> db, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new BitOpCmd(*this); }
void DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) override;
void DoBinlog(const std::shared_ptr<SyncMasterDB>& db) override;

private:
std::string dest_key_;
Expand Down
Loading