Skip to content

Commit

Permalink
feat(tianmu):The delta layer supports master-slave synchronization(st…
Browse files Browse the repository at this point in the history
  • Loading branch information
konghaiya committed Mar 10, 2023
1 parent c040c50 commit 5e1305e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 26 deletions.
39 changes: 17 additions & 22 deletions storage/tianmu/core/tianmu_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,21 @@ void TianmuTable::DeleteItem(uint64_t row, uint64_t col, core::Transaction *curr
m_attrs[col]->DeleteData(row);
}

void TianmuTable::GetKeys(TABLE *table, std::vector<std::string> &keys,
std::shared_ptr<index::TianmuTableIndex> &indexTab) {
std::vector<uint> cols = indexTab->KeyCols();
std::vector<loader::ValueCache> vcs;
vcs.reserve(cols.size());
int i = 0;
for (auto &col : cols) {
vcs.emplace_back(1, 128);
Field2VC(table->field[col], vcs[i], col);
vcs[i].Commit();
keys.emplace_back(vcs[i].GetDataBytesPointer(0), vcs[i].Size(0));
i++;
}
}

uint64_t TianmuTable::ProceedNormal(system::IOParameters &iop) {
std::unique_ptr<system::Stream> fs;
if (iop.LocalLoad()) {
Expand Down Expand Up @@ -1480,17 +1495,7 @@ void TianmuTable::InsertIndexForDelta(TABLE *table, uint64_t row_id) {
std::shared_ptr<index::TianmuTableIndex> tab = ha_tianmu_engine_->GetTableIndex(share->Path());
if (tab) {
std::vector<std::string> fields;
std::vector<uint> cols = tab->KeyCols();
std::vector<loader::ValueCache> vcs;
vcs.reserve(cols.size());
int i = 0;
for (auto &col : cols) {
vcs.emplace_back(1, 128);
Field2VC(table->field[col], vcs[i], col);
vcs[i].Commit();
fields.emplace_back(vcs[i].GetDataBytesPointer(0), vcs[i].Size(0));
i++;
}
GetKeys(table, fields, tab);

if (tab->InsertIndex(current_txn_, fields, row_id) == common::ErrorCode::DUPP_KEY) {
TIANMU_LOG(LogCtl_Level::INFO, "Insert duplicate key on row %d, key: %s", row_id, fields[0].data());
Expand All @@ -1510,17 +1515,7 @@ void TianmuTable::DeleteIndexForDelta(TABLE *table, uint64_t row_id) {
std::shared_ptr<index::TianmuTableIndex> tab = ha_tianmu_engine_->GetTableIndex(share->Path());
if (tab) {
std::vector<std::string> fields;
std::vector<uint> cols = tab->KeyCols();
std::vector<loader::ValueCache> vcs;
vcs.reserve(cols.size());
int i = 0;
for (auto &col : cols) {
vcs.emplace_back(1, 128);
Field2VC(table->field[col], vcs[i], col);
vcs[i].Commit();
fields.emplace_back(vcs[i].GetDataBytesPointer(0), vcs[i].Size(0));
i++;
}
GetKeys(table, fields, tab);

if (tab->DeleteIndex(current_txn_, fields, row_id) == common::ErrorCode::FAILED) {
TIANMU_LOG(LogCtl_Level::DEBUG, "Delete row: %s for primary key field", row_id);
Expand Down
3 changes: 3 additions & 0 deletions storage/tianmu/core/tianmu_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "core/delta_table.h"
#include "core/just_a_table.h"
#include "core/tianmu_attr.h"
#include "index/tianmu_table_index.h"
#include "util/fs.h"

namespace Tianmu {
Expand Down Expand Up @@ -152,6 +153,8 @@ class TianmuTable final : public JustATable {

std::unique_lock<std::mutex> write_lock;

void GetKeys(TABLE *table, std::vector<std::string> &keys, std::shared_ptr<index::TianmuTableIndex> &indexTab);

private:
uint64_t ProceedNormal(system::IOParameters &iop);
uint64_t ProcessDelayed(system::IOParameters &iop);
Expand Down
11 changes: 7 additions & 4 deletions storage/tianmu/handler/ha_tianmu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -748,13 +748,15 @@ int ha_tianmu::index_read([[maybe_unused]] uchar *buf, [[maybe_unused]] const uc
table->status = STATUS_NOT_FOUND;
auto index = ha_tianmu_engine_->GetTableIndex(table_name_);
if (index && (active_index == table_share->primary_key)) {
auto tab = ha_tianmu_engine_->GetTableRD(table_name_);
std::vector<std::string> keys;
key_convert(key, key_len, index->KeyCols(), keys);
// support equality fullkey lookup over primary key, using full tuple
tab->GetKeys(table, keys, index);

if (find_flag == HA_READ_KEY_EXACT) {
uint64_t rowid;
if (index->GetRowByKey(current_txn_, keys, rowid) == common::ErrorCode::SUCCESS) {
rc = fill_row_by_id(buf, rowid);
current_position_ = rowid;
rc = 0;
}
if (!rc)
table->status = 0;
Expand All @@ -764,7 +766,8 @@ int ha_tianmu::index_read([[maybe_unused]] uchar *buf, [[maybe_unused]] const uc
iter->ScanToKey(index, keys, op);
uint64_t rowid;
iter->GetRowid(rowid);
rc = fill_row_by_id(buf, rowid);
current_position_ = rowid;
rc = 0;
if (!rc)
table->status = 0;
} else {
Expand Down

0 comments on commit 5e1305e

Please sign in to comment.