Skip to content

Commit

Permalink
store/tikv: support resolve specified lock keys (#10292) (#10793)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjinpeng87 authored and jackysp committed Jun 13, 2019
1 parent c5c2737 commit 328a876
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 3 deletions.
1 change: 1 addition & 0 deletions store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type mvccLock struct {
op kvrpcpb.Op
ttl uint64
forUpdateTS uint64
txnSize uint64
}

type mvccEntry struct {
Expand Down
6 changes: 4 additions & 2 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte,
anyError := false
batch := &leveldb.Batch{}
errs := make([]error, 0, len(mutations))
txnSize := len(mutations)
for _, m := range mutations {
// If the operation is Insert, check if key is exists at first.
var err error
Expand All @@ -649,7 +650,7 @@ func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte,
continue
}
}
err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl)
err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, uint64(txnSize))
errs = append(errs, err)
if err != nil {
anyError = true
Expand Down Expand Up @@ -684,7 +685,7 @@ func checkConflictValue(iter *Iterator, key []byte, startTS uint64) error {
return nil
}

func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64) error {
func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64, txnSize uint64) error {
startKey := mvccEncode(mutation.Key, lockVer)
iter := newIterator(db, &util.Range{
Start: startKey,
Expand Down Expand Up @@ -723,6 +724,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
value: mutation.Value,
op: op,
ttl: ttl,
txnSize: txnSize,
}
writeKey := mvccEncode(mutation.Key, lockVer)
writeValue, err := lock.MarshalBinary()
Expand Down
1 change: 1 addition & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys) *tikvrpc.Reque
LockTtl: c.lockTTL,
IsPessimisticLock: isPessimisticLock,
ForUpdateTs: c.forUpdateTS,
TxnSize: uint64(len(batch.keys)),
},
Context: pb.Context{
Priority: c.priority,
Expand Down
18 changes: 17 additions & 1 deletion store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
// ResolvedCacheSize is max number of cached txn status.
const ResolvedCacheSize = 2048

// bigTxnThreshold : transaction involves keys exceed this threshold can be treated as `big transaction`.
const bigTxnThreshold = 16

var (
tikvLockResolverCountWithBatchResolve = metrics.TiKVLockResolverCounter.WithLabelValues("batch_resolve")
tikvLockResolverCountWithExpired = metrics.TiKVLockResolverCounter.WithLabelValues("expired")
Expand All @@ -43,6 +46,7 @@ var (
tikvLockResolverCountWithQueryTxnStatusCommitted = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status_committed")
tikvLockResolverCountWithQueryTxnStatusRolledBack = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status_rolled_back")
tikvLockResolverCountWithResolveLocks = metrics.TiKVLockResolverCounter.WithLabelValues("query_resolve_locks")
tikvLockResolverCountWithResolveLockLite = metrics.TiKVLockResolverCounter.WithLabelValues("query_resolve_lock_lite")
)

// LockResolver resolves locks and also caches resolved txn status.
Expand Down Expand Up @@ -125,6 +129,7 @@ type Lock struct {
Primary []byte
TxnID uint64
TTL uint64
TxnSize uint64
}

func (l *Lock) String() string {
Expand All @@ -137,11 +142,13 @@ func NewLock(l *kvrpcpb.LockInfo) *Lock {
if ttl == 0 {
ttl = defaultLockTTL
}
txnSize := l.GetTxnSize()
return &Lock{
Key: l.GetKey(),
Primary: l.GetPrimaryLock(),
TxnID: l.GetLockVersion(),
TTL: ttl,
TxnSize: txnSize,
}
}

Expand Down Expand Up @@ -388,6 +395,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte

func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cleanRegions map[RegionVerID]struct{}) error {
tikvLockResolverCountWithResolveLocks.Inc()
cleanWholeRegion := l.TxnSize >= bigTxnThreshold
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
if err != nil {
Expand All @@ -405,6 +413,12 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cl
if status.IsCommitted() {
req.ResolveLock.CommitVersion = status.CommitTS()
}
if l.TxnSize < bigTxnThreshold {
// Only resolve specified keys when it is a small transaction,
// prevent from scanning the whole region in this case.
tikvLockResolverCountWithResolveLockLite.Inc()
req.ResolveLock.Keys = [][]byte{l.Key}
}
resp, err := lr.store.SendReq(bo, req, loc.Region, readTimeoutShort)
if err != nil {
return errors.Trace(err)
Expand All @@ -429,7 +443,9 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cl
logutil.Logger(context.Background()).Error("resolveLock error", zap.Error(err))
return err
}
cleanRegions[loc.Region] = struct{}{}
if cleanWholeRegion {
cleanRegions[loc.Region] = struct{}{}
}
return nil
}
}

0 comments on commit 328a876

Please sign in to comment.