Skip to content

Commit

Permalink
only use snapshot getter when point/batch point executor is running
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <[email protected]>
  • Loading branch information
you06 committed Dec 17, 2024
1 parent 57a73ea commit 54b5cbb
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 21 deletions.
3 changes: 2 additions & 1 deletion pkg/executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type BatchPointGetExec struct {
rowDecoder *rowcodec.ChunkDecoder
keepOrder bool
desc bool
isTxnDirty bool // when isTxnDirty is true, we need to read through MemBuffer.
batchGetter kv.BatchGetter

columns []*model.ColumnInfo
Expand Down Expand Up @@ -111,7 +112,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
batchGetter = driver.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, e.snapshot)
} else if lock != nil && (lock.Tp == pmodel.TableLockRead || lock.Tp == pmodel.TableLockReadOnly) && e.Ctx().GetSessionVars().EnablePointGetCache {
batchGetter = newCacheBatchGetter(e.Ctx(), e.tblInfo.ID, e.snapshot)
} else {
} else if e.isTxnDirty {
batchGetter = driver.NewBufferBatchGetter(txn.GetMemBuffer(), nil, e.snapshot)
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5320,8 +5320,12 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
handles: handles,
idxVals: plan.IndexValues,
partitionNames: plan.PartitionNames,
isTxnDirty: false,
}

if txn, err := e.Ctx().Txn(false); err == nil && txn != nil && !txn.IsReadOnly() {
e.isTxnDirty = true
}
e.snapshot, err = b.getSnapshot()
if err != nil {
b.err = err
Expand Down
45 changes: 26 additions & 19 deletions pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) exec.Execut
e.SetMaxChunkSize(1)
e.Init(p)

if txn, err := b.ctx.Txn(false); err != nil {
b.err = err
return nil
} else if txn != nil && txn.Valid() && !txn.IsReadOnly() {
e.memBufferSnapshotGetter = txn.GetMemBuffer().SnapshotGetter()
}
e.snapshot, err = b.getSnapshot()
if err != nil {
b.err = err
Expand Down Expand Up @@ -135,23 +141,24 @@ type PointGetExecutor struct {
exec.BaseExecutor
indexUsageReporter *exec.IndexUsageReporter

tblInfo *model.TableInfo
handle kv.Handle
idxInfo *model.IndexInfo
partitionDefIdx *int
partitionNames []pmodel.CIStr
idxKey kv.Key
handleVal []byte
idxVals []types.Datum
txnScope string
readReplicaScope string
isStaleness bool
txn kv.Transaction
snapshot kv.Snapshot
done bool
lock bool
lockWaitTime int64
rowDecoder *rowcodec.ChunkDecoder
tblInfo *model.TableInfo
handle kv.Handle
idxInfo *model.IndexInfo
partitionDefIdx *int
partitionNames []pmodel.CIStr
idxKey kv.Key
handleVal []byte
idxVals []types.Datum
txnScope string
readReplicaScope string
isStaleness bool
txn kv.Transaction
memBufferSnapshotGetter kv.Getter
snapshot kv.Snapshot
done bool
lock bool
lockWaitTime int64
rowDecoder *rowcodec.ChunkDecoder

columns []*model.ColumnInfo
// virtualColumnIndex records all the indices of virtual columns and sort them in definition
Expand Down Expand Up @@ -645,10 +652,10 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error)
err error
)

if e.txn.Valid() && !e.txn.IsReadOnly() {
if e.memBufferSnapshotGetter != nil {
// We cannot use txn.Get directly here because the snapshot in txn and the snapshot of e.snapshot may be
// different for pessimistic transaction.
val, err = e.txn.GetMemBuffer().Get(ctx, key)
val, err = e.memBufferSnapshotGetter.Get(ctx, key)
if err == nil {
return val, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/session/test/txn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 10,
shard_count = 11,
deps = [
"//pkg/config",
"//pkg/kv",
Expand Down
47 changes: 47 additions & 0 deletions pkg/session/test/txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,3 +579,50 @@ func TestMemBufferCleanupMemoryLeak(t *testing.T) {
}
tk.MustExec("commit")
}

func TestMemDBRaceInUnionExec(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tables := 5
for i := 0; i < tables; i++ {
tk.MustExec(fmt.Sprintf("create table t%d(id int primary key, v int)", i))
}

tk.MustExec("insert into t0 values(1, 1), (2, 2), (3, 3), (4, 4)")
for i := 1; i < tables; i++ {
tk.MustExec(fmt.Sprintf("insert into t%d select * from t0", i))
}

tk.MustExec("set tidb_pessimistic_txn_fair_locking=0")

// point get
for i := 0; i < 1001; i++ {
tk.MustExec("begin pessimistic")
dirty := i%2 == 0
if dirty {
tk.MustExec("insert into t0 values(5, 5)")
}
tk.MustQuery(`select * from t0 where id = 1 for update union
select * from t1 where id = 1 for update union
select * from t2 where id = 1 for update union
select * from t3 where id = 1 for update union
select * from t4 where id = 1 for update`)
tk.MustExec("rollback")
}

// batch point get
for i := 0; i < 1001; i++ {
tk.MustExec("begin pessimistic")
dirty := i%2 == 0
if dirty {
tk.MustExec("insert into t0 values(5, 5)")
}
tk.MustQuery(`select * from t0 where id in (1, 2, 3) for update union
select * from t1 where id in (1, 2, 3) for update union
select * from t2 where id in (1, 2, 3) for update union
select * from t3 where id in (1, 2, 3) for update union
select * from t4 where id in (1, 2, 3) for update`)
tk.MustExec("rollback")
}
}

0 comments on commit 54b5cbb

Please sign in to comment.