Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/txn (#1585) #2124

Merged
merged 5 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "storage/storage.h"

#include "include/pika_command.h"
#include "pika_db.h"

/*
* Admin
Expand Down Expand Up @@ -149,6 +150,7 @@ class SelectCmd : public Cmd {
void DoInitial() override;
void Clear() override { db_name_.clear(); }
std::string db_name_;
std::shared_ptr<DB> select_db_;
};

class FlushallCmd : public Cmd {
Expand All @@ -158,11 +160,13 @@ class FlushallCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushallCmd(*this); }

void Execute() override;
void FlushAllWithoutLock();
private:
void DoInitial() override;
std::string ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum,
uint64_t offset) override;
void DoWithoutLock(std::shared_ptr<Slot> slot);
};

class FlushdbCmd : public Cmd {
Expand All @@ -172,11 +176,15 @@ class FlushdbCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushdbCmd(*this); }
void FlushAllSlotsWithoutLock(std::shared_ptr<DB> db);
void Execute() override;
std::string GetFlushDname() { return db_name_; }

private:
std::string db_name_;
void DoInitial() override;
void Clear() override { db_name_.clear(); }
void DoWithoutLock(std::shared_ptr<Slot> slot);
};

class ClientCmd : public Cmd {
Expand Down Expand Up @@ -219,6 +227,7 @@ class InfoCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new InfoCmd(*this); }
void Execute() override;

private:
InfoSection info_section_;
Expand Down Expand Up @@ -280,6 +289,7 @@ class ConfigCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new ConfigCmd(*this); }
void Execute() override;

private:
std::vector<std::string> config_args_v_;
Expand Down
41 changes: 37 additions & 4 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef PIKA_CLIENT_CONN_H_
#define PIKA_CLIENT_CONN_H_

#include <bitset>
#include <utility>

#include "include/pika_command.h"
Expand Down Expand Up @@ -54,6 +55,14 @@ class PikaClientConn : public net::RedisConn {
uint32_t slot_id;
};

struct TxnStateBitMask {
public:
static constexpr uint8_t Start = 0;
static constexpr uint8_t InitCmdFailed = 1;
static constexpr uint8_t WatchFailed = 2;
static constexpr uint8_t Execing = 3;
};

// Auth related
class AuthStat {
public:
Expand All @@ -72,7 +81,7 @@ class PikaClientConn : public net::RedisConn {

PikaClientConn(int fd, const std::string& ip_port, net::Thread* server_thread, net::NetMultiplexer* mpx,
const net::HandleType& handle_type, int max_conn_rbuf_size);
~PikaClientConn() override = default;
~PikaClientConn() = default;

void ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async,
std::string* response) override;
Expand All @@ -84,10 +93,30 @@ class PikaClientConn : public net::RedisConn {

bool IsPubSub() { return is_pubsub_; }
void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; }
void SetCurrentTable(const std::string& db_name) { current_db_ = db_name; }
const std::string& GetCurrentTable() override{ return current_db_; }
void SetCurrentDb(const std::string& db_name) { current_db_ = db_name; }
const std::string& GetCurrentTable() override { return current_db_; }
void SetWriteCompleteCallback(WriteCompleteCallback cb) { write_completed_cb_ = std::move(cb); }

// Txn
void PushCmdToQue(std::shared_ptr<Cmd> cmd);
std::queue<std::shared_ptr<Cmd>> GetTxnCmdQue();
void ClearTxnCmdQue();
bool IsInTxn();
bool IsTxnFailed();
bool IsTxnInitFailed();
bool IsTxnWatchFailed();
bool IsTxnExecing(void);
void SetTxnWatchFailState(bool is_failed);
void SetTxnInitFailState(bool is_failed);
void SetTxnStartState(bool is_start);

void AddKeysToWatch(const std::vector<std::string> &db_keys);
void RemoveWatchedKeys();
void SetTxnFailedFromKeys(const std::vector<std::string> &db_keys);
void SetAllTxnFailed();
void SetTxnFailedFromDBs(std::string db_name);
void ExitTxn();

net::ServerThread* server_thread() { return server_thread_; }

AuthStat& auth_stat() { return auth_stat_; }
Expand All @@ -101,14 +130,18 @@ class PikaClientConn : public net::RedisConn {
std::string current_db_;
WriteCompleteCallback write_completed_cb_;
bool is_pubsub_ = false;
std::queue<std::shared_ptr<Cmd>> txn_cmd_que_;
std::bitset<16> txn_state_;
std::unordered_set<std::string> watched_db_keys_;
std::mutex txn_state_mu_;

std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr);

void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration);
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, const std::shared_ptr<std::string>& resp_ptr);
void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr);
void TryWriteResp();

AuthStat auth_stat_;
Expand Down
31 changes: 27 additions & 4 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef PIKA_COMMAND_H_
#define PIKA_COMMAND_H_

#include <string>
#include <memory>
#include <unordered_map>
#include <unordered_set>
Expand Down Expand Up @@ -198,6 +199,13 @@ const std::string kCmdNameSDiffstore = "sdiffstore";
const std::string kCmdNameSMove = "smove";
const std::string kCmdNameSRandmember = "srandmember";

// transation
const std::string kCmdNameMulti = "multi";
const std::string kCmdNameExec = "exec";
const std::string kCmdNameDiscard = "discard";
const std::string kCmdNameWatch = "watch";
const std::string kCmdNameUnWatch = "unwatch";

// HyperLogLog
const std::string kCmdNamePfAdd = "pfadd";
const std::string kCmdNamePfCount = "pfcount";
Expand Down Expand Up @@ -298,6 +306,9 @@ class CmdRes {
kInconsistentHashTag,
kErrOther,
KIncrByOverFlow,
kInvalidTransaction,
kTxnQueued,
kTxnAbort,
};

CmdRes() = default;
Expand Down Expand Up @@ -369,6 +380,17 @@ class CmdRes {
result.append(message_);
result.append("'\r\n");
break;
case kInvalidTransaction:
return "-ERR WATCH inside MULTI is not allowed\r\n";
case kTxnQueued:
result = "+QUEUED";
result.append("\r\n");
break;
case kTxnAbort:
result = "-EXECABORT ";
result.append(message_);
result.append(kNewLine);
break;
case kErrOther:
result = "-ERR ";
result.append(message_);
Expand Down Expand Up @@ -403,7 +425,9 @@ class CmdRes {
message_ = content;
}
}

CmdRet GetCmdRet() const {
return ret_;
}
private:
std::string message_;
CmdRet ret_ = kNone;
Expand Down Expand Up @@ -447,11 +471,8 @@ class Cmd : public std::enable_shared_from_this<Cmd> {

virtual std::vector<std::string> current_key() const;
virtual void Execute();
virtual void ProcessFlushDBCmd();
virtual void ProcessFlushAllCmd();
virtual void ProcessSingleSlotCmd();
virtual void ProcessMultiSlotCmd();
virtual void ProcessDoNotSpecifySlotCmd();
virtual void Do(std::shared_ptr<Slot> slot = nullptr) = 0;
virtual Cmd* Clone() = 0;
// used for execute multikey command into different slots
Expand All @@ -470,6 +491,8 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
bool is_multi_slot() const;
bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const;
uint64_t GetDoDuration() const { return do_duration_; };
void SetDbName(const std::string& db_name) { db_name_ = db_name; }
std::string GetDBName() { return db_name_; }

std::string name() const;
CmdRes& res();
Expand Down
3 changes: 3 additions & 0 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
friend class InfoCmd;
friend class PkClusterInfoCmd;
friend class PikaServer;
friend class ExecCmd;
friend class FlushdbCmd;
friend class FlushallCmd;

std::string GetDBName();
void BgSaveDB();
Expand Down
1 change: 0 additions & 1 deletion include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class SetCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new SetCmd(*this); }

private:
std::string key_;
std::string value_;
Expand Down
2 changes: 1 addition & 1 deletion include/pika_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ class RPopLPushCmd : public BlockingBaseCmd {
}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(source_);
res.push_back(receiver_);
res.push_back(source_);
return res;
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
Expand Down
3 changes: 3 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ class PikaReplicaManager {
~PikaReplicaManager() = default;

friend Cmd;
friend class FlushdbCmd;
friend class FlushallCmd;
friend class ExecCmd;

void Start();
void Stop();
Expand Down
5 changes: 4 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@
#include "include/pika_repl_client.h"
#include "include/pika_repl_server.h"
#include "include/pika_rsync_service.h"
#include "include/pika_migrate_thread.h"
#include "include/rsync_server.h"
#include "include/pika_statistic.h"
#include "include/pika_slot_command.h"
#include "include/pika_migrate_thread.h"
#include "include/pika_transaction.h"
#include "include/pika_cmd_table_manager.h"


Expand Down Expand Up @@ -507,6 +508,8 @@ class PikaServer : public pstd::noncopyable {
friend class InfoCmd;
friend class PikaReplClientConn;
friend class PkClusterInfoCmd;
friend class FlushallCmd;
friend class ExecCmd;

private:
/*
Expand Down
2 changes: 2 additions & 0 deletions include/pika_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class Slot : public std::enable_shared_from_this<Slot>,public pstd::noncopyable
// FlushDB & FlushSubDB use
bool FlushDB();
bool FlushSubDB(const std::string& db_name);
bool FlushDBWithoutLock();
bool FlushSubDBWithoutLock(const std::string& db_name);

// key scan info use
pstd::Status GetKeyNum(std::vector<storage::KeyInfo>* key_info);
Expand Down
99 changes: 99 additions & 0 deletions include/pika_transaction.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_TRANSACTION_H_
#define PIKA_TRANSACTION_H_

#include "include/pika_command.h"
#include "net/include/redis_conn.h"
#include "pika_db.h"
#include "storage/storage.h"

class MultiCmd : public Cmd {
public:
MultiCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
Cmd* Clone() override { return new MultiCmd(*this); }
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {}
void Merge() override {}

private:
void DoInitial() override;
};

class ExecCmd : public Cmd {
public:
ExecCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
Cmd* Clone() override { return new ExecCmd(*this); }
void Execute() override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {}
void Merge() override {}
std::vector<std::string> current_key() const override { return {}; }
private:
struct CmdInfo {
public:
CmdInfo(std::shared_ptr<Cmd> cmd, std::shared_ptr<DB> db, std::shared_ptr<Slot> slot,
std::shared_ptr<SyncMasterSlot> sync_slot) : cmd_(cmd), db_(db), slot_(slot), sync_slot_(sync_slot) {}
std::shared_ptr<Cmd> cmd_;
std::shared_ptr<DB> db_;
std::shared_ptr<Slot> slot_;
std::shared_ptr<SyncMasterSlot> sync_slot_;
};
void DoInitial() override;
void Lock();
void Unlock();
bool IsTxnFailedAndSetState();
void SetCmdsVec();
void ServeToBLrPopWithKeys();
std::unordered_set<std::shared_ptr<DB>> lock_db_{};
std::unordered_map<std::shared_ptr<Slot>, std::vector<std::string>> lock_slot_keys_{};
std::unordered_set<std::shared_ptr<Slot>> r_lock_slots_{};
bool is_lock_rm_slots_{false}; // g_pika_rm->slots_rw_;
std::vector<CmdInfo> cmds_;
std::vector<CmdInfo> list_cmd_;
std::vector<std::string> keys_;
};

class DiscardCmd : public Cmd {
public:
DiscardCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
Cmd* Clone() override { return new DiscardCmd(*this); }
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {}
void Merge() override {}
private:
void DoInitial() override;
};

class WatchCmd : public Cmd {
public:
WatchCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}

void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Execute() override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {}
Cmd* Clone() override { return new WatchCmd(*this); }
void Merge() override {}
std::vector<std::string> current_key() const override { return keys_; }
private:
void DoInitial() override;
std::vector<std::string> keys_;
std::vector<std::string> db_keys_; // cause the keys watched may cross different dbs, so add dbname as keys prefix
};

class UnwatchCmd : public Cmd {
public:
UnwatchCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}

void Do(std::shared_ptr<Slot> slot = nullptr) override;
Cmd* Clone() override { return new UnwatchCmd(*this); }
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {}
void Merge() override {}
private:
void DoInitial() override;
};

#endif // PIKA_TRANSACTION_H_
Loading