Skip to content

Commit

Permalink
store/tikv: deduplicate lock keys (#10800) (#10803)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Jun 13, 2019
1 parent 328a876 commit 872fa42
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 18 deletions.
8 changes: 1 addition & 7 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,7 @@ func (st *TxnState) KeysNeedToLock() ([]kv.Key, error) {
if !keyNeedToLock(k, v) {
return nil
}
if mb := st.Transaction.GetMemBuffer(); mb != nil {
_, err1 := mb.Get(k)
if err1 == nil {
// Key is already in txn MemBuffer, must already been locked, we don't need to lock it again.
return nil
}
}
// If the key is already locked, it will be deduplicated in LockKeys method later.
// The statement MemBuffer will be reused, so we must copy the key here.
keys = append(keys, append([]byte{}, k...))
return nil
Expand Down
10 changes: 10 additions & 0 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,13 @@ func (s *testCommitterSuite) TestPessimisticPrewriteRequest(c *C) {
c.Assert(len(req.Prewrite.IsPessimisticLock), Greater, 0)
c.Assert(req.Prewrite.ForUpdateTs, Equals, uint64(100))
}

func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) {
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
err := txn.LockKeys(context.Background(), 100, kv.Key("abc"), kv.Key("def"))
c.Assert(err, IsNil)
err = txn.LockKeys(context.Background(), 100, kv.Key("abc"), kv.Key("def"))
c.Assert(err, IsNil)
c.Assert(txn.lockKeys, HasLen, 2)
}
30 changes: 19 additions & 11 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type tikvTxn struct {
commitTS uint64
valid bool
lockKeys [][]byte
lockedMap map[string]struct{}
mu sync.Mutex // For thread-safe LockKeys function.
dirty bool
setCnt int64
Expand All @@ -85,6 +86,7 @@ func newTikvTxnWithStartTS(store *tikvStore, startTS uint64) (*tikvTxn, error) {
return &tikvTxn{
snapshot: snapshot,
us: kv.NewUnionStore(snapshot),
lockedMap: map[string]struct{}{},
store: store,
startTS: startTS,
startTime: time.Now(),
Expand Down Expand Up @@ -344,7 +346,16 @@ func (txn *tikvTxn) rollbackPessimisticLocks() error {
return txn.committer.pessimisticRollbackKeys(NewBackoffer(context.Background(), cleanupMaxBackoff), txn.lockKeys)
}

func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keys ...kv.Key) error {
func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keysInput ...kv.Key) error {
// Exclude keys that are already locked.
keys := make([][]byte, 0, len(keysInput))
txn.mu.Lock()
for _, key := range keysInput {
if _, ok := txn.lockedMap[string(key)]; !ok {
keys = append(keys, key)
}
}
txn.mu.Unlock()
if len(keys) == 0 {
return nil
}
Expand All @@ -366,17 +377,13 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keys ...kv
}

bo := NewBackoffer(ctx, pessimisticLockMaxBackoff).WithVars(txn.vars)
keys1 := make([][]byte, len(keys))
for i, key := range keys {
keys1[i] = key
}
txn.committer.forUpdateTS = forUpdateTS
// If the number of keys1 greater than 1, it can be on different region,
// If the number of keys greater than 1, it can be on different region,
// concurrently execute on multiple regions may lead to deadlock.
txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys1) == 1
err := txn.committer.pessimisticLockKeys(bo, keys1)
txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys) == 1
err := txn.committer.pessimisticLockKeys(bo, keys)
if err != nil {
wg := txn.asyncPessimisticRollback(ctx, keys1)
wg := txn.asyncPessimisticRollback(ctx, keys)
if dl, ok := errors.Cause(err).(*ErrDeadlock); ok && hashInKeys(dl.DeadlockKeyHash, keys) {
dl.IsRetryable = true
// Wait for the pessimistic rollback to finish before we retry the statement.
Expand All @@ -388,8 +395,9 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keys ...kv
}
}
txn.mu.Lock()
txn.lockKeys = append(txn.lockKeys, keys...)
for _, key := range keys {
txn.lockKeys = append(txn.lockKeys, key)
txn.lockedMap[string(key)] = struct{}{}
}
txn.dirty = true
txn.mu.Unlock()
Expand Down Expand Up @@ -417,7 +425,7 @@ func (txn *tikvTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte)
return wg
}

func hashInKeys(deadlockKeyHash uint64, keys []kv.Key) bool {
func hashInKeys(deadlockKeyHash uint64, keys [][]byte) bool {
for _, key := range keys {
if farm.Fingerprint64(key) == deadlockKeyHash {
return true
Expand Down

0 comments on commit 872fa42

Please sign in to comment.