Skip to content

Commit

Permalink
add retry for raw kv client put
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Jan 15, 2025
1 parent 6180fb7 commit 5e698b5
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 7 deletions.
4 changes: 3 additions & 1 deletion br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ go_test(
],
embed = [":log_client"],
flaky = True,
shard_count = 46,
shard_count = 47,
deps = [
"//br/pkg/errors",
"//br/pkg/glue",
"//br/pkg/gluetidb",
"//br/pkg/mock",
"//br/pkg/restore",
"//br/pkg/restore/internal/import_client",
"//br/pkg/restore/internal/rawkv",
"//br/pkg/restore/split",
"//br/pkg/restore/utils",
"//br/pkg/storage",
Expand Down Expand Up @@ -128,6 +129,7 @@ go_test(
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//rawkv",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
Expand Down
41 changes: 35 additions & 6 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,17 @@ import (
"google.golang.org/grpc/keepalive"
)

const MetaKVBatchSize = 64 * 1024 * 1024
const maxSplitKeysOnce = 10240

// rawKVBatchCount specifies the count of entries that the rawkv client puts into TiKV.
const rawKVBatchCount = 64
const (
MetaKVBatchSize = 64 * 1024 * 1024
maxSplitKeysOnce = 10240
// rawKVBatchCount specifies the count of entries that the rawkv client puts into TiKV.
rawKVBatchCount = 64

// raw KV retry constants
rawKVMaxRetries = 5
rawKVInitialRetryInterval = 500 * time.Millisecond
rawKVMaxRetryInterval = 5 * time.Second
)

// LogRestoreManager is a comprehensive wrapper that encapsulates all logic related to log restoration,
// including concurrency management, checkpoint handling, and file importing for efficient log processing.
Expand Down Expand Up @@ -1401,7 +1407,7 @@ func (rc *LogClient) restoreMetaKvEntries(
failpoint.Inject("failed-to-restore-metakv", func(_ failpoint.Value) {
failpoint.Return(0, 0, errors.Errorf("failpoint: failed to restore metakv"))
})
if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.Ts); err != nil {
if err := PutRawKvWithRetry(ctx, rc.rawKVClient, newEntry.Key, newEntry.Value, entry.Ts); err != nil {
return 0, 0, errors.Trace(err)
}
// for failpoint, we need to flush the cache in rawKVClient every time
Expand Down Expand Up @@ -1961,3 +1967,26 @@ func (rc *LogClient) FailpointDoChecksumForLogRestore(

return eg.Wait()
}

func PutRawKvWithRetry(ctx context.Context, client *rawkv.RawKVBatchClient, key, value []byte, originTs uint64) error {
retry := utils.InitialRetryState(rawKVMaxRetries, rawKVInitialRetryInterval, rawKVMaxRetryInterval)
for retry.ShouldRetry() {
err := client.Put(ctx, key, value, originTs)
if err == nil {
return nil
}
retryAfter := retry.ExponentialBackoff()
log.Warn(
"raw kv client put got error, retrying",
logutil.ShortError(err),
zap.Duration("retry-after", retryAfter),
)

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(retryAfter):
}
}
return errors.Errorf("max retry attempts (%d) exceeded for raw kv put", rawKVMaxRetries)
}
76 changes: 76 additions & 0 deletions br/pkg/restore/log_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/br/pkg/gluetidb"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/restore"
rawclient "github.com/pingcap/tidb/br/pkg/restore/internal/rawkv"
logclient "github.com/pingcap/tidb/br/pkg/restore/log_client"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/utils"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/pingcap/tidb/pkg/util/sqlexec"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/rawkv"
"google.golang.org/grpc/keepalive"
)

Expand Down Expand Up @@ -1986,3 +1988,77 @@ func fakeRowKey(tableID, rowID int64) kv.Key {
func fakeRowRawKey(tableID, rowID int64) kv.Key {
return tablecodec.EncodeRecordKey(tablecodec.GenTableRecordPrefix(tableID), kv.IntHandle(rowID))
}

type mockRawKVClient struct {
rawkv.Client
putCount int
errThreshold int
}

func (m *mockRawKVClient) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error {
m.putCount += 1
if m.errThreshold >= m.putCount {
return errors.New("rpcClient is idle")
}
return nil
}

func TestPutRawKvWithRetry(t *testing.T) {
tests := []struct {
name string
errThreshold int
cancelAfter time.Duration
wantErr string
wantPuts int
}{
{
name: "success on first try",
errThreshold: 0,
wantPuts: 1,
},
{
name: "success on after failure",
errThreshold: 2,
wantPuts: 3,
},
{
name: "fails all retries",
errThreshold: 5,
wantErr: "max retry attempts",
wantPuts: 5,
},
{
name: "context cancelled",
errThreshold: 5,
cancelAfter: 100 * time.Millisecond,
wantErr: "context deadline exceeded",
wantPuts: 1,
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mockRawClient := &mockRawKVClient{
errThreshold: tt.errThreshold,
}
client := rawclient.NewRawKVBatchClient(mockRawClient, 1)

ctx := context.Background()
if tt.cancelAfter > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, tt.cancelAfter)
defer cancel()
}

err := logclient.PutRawKvWithRetry(ctx, client, []byte("key"), []byte("value"), 1)

if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.wantPuts, mockRawClient.putCount)
})
}
}

0 comments on commit 5e698b5

Please sign in to comment.