Skip to content

Commit

Permalink
br: add retry for raw kv client put (#58963) (#59078)
Browse files Browse the repository at this point in the history
close #58845
  • Loading branch information
ti-chi-bot authored Feb 8, 2025
1 parent 4fb3bea commit 4e52165
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 4 deletions.
12 changes: 11 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2521,7 +2521,7 @@ func (rc *Client) restoreMetaKvEntries(
log.Debug("after rewrite entry", zap.Int("new-key-len", len(newEntry.Key)),
zap.Int("new-value-len", len(entry.e.Value)), zap.ByteString("new-key", newEntry.Key))

if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.ts); err != nil {
if err := PutRawKvWithRetry(ctx, rc.rawKVClient, newEntry.Key, newEntry.Value, entry.ts); err != nil {
return 0, 0, errors.Trace(err)
}

Expand Down Expand Up @@ -2933,3 +2933,13 @@ func (b *waitTiFlashBackoffer) NextBackoff(error) time.Duration {
func (b *waitTiFlashBackoffer) Attempt() int {
return b.Attempts
}

func PutRawKvWithRetry(ctx context.Context, client *RawKVBatchClient, key, value []byte, originTs uint64) error {
err := utils.WithRetry(ctx, func() error {
return client.Put(ctx, key, value, originTs)
}, utils.NewRawClientBackoffStrategy())
if err != nil {
return errors.Errorf("failed to put raw kv after retry")
}
return nil
}
67 changes: 67 additions & 0 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/tablecodec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/rawkv"
pd "github.com/tikv/pd/client"
"golang.org/x/exp/slices"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -1865,3 +1866,69 @@ func TestCheckNewCollationEnable(t *testing.T) {
require.Equal(t, ca.newCollationEnableInCluster == "True", enabled)
}
}

type mockRawKVClient struct {
restore.RawkvClient
putCount int
errThreshold int
}

func (m *mockRawKVClient) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error {
m.putCount += 1
if m.errThreshold >= m.putCount {
return errors.New("rpcClient is idle")
}
return nil
}

func TestPutRawKvWithRetry(t *testing.T) {
tests := []struct {
name string
errThreshold int
cancelAfter time.Duration
wantErr string
wantPuts int
}{
{
name: "success on first try",
errThreshold: 0,
wantPuts: 1,
},
{
name: "success on after failure",
errThreshold: 2,
wantPuts: 3,
},
{
name: "fails all retries",
errThreshold: 5,
wantErr: "failed to put raw kv after retry",
wantPuts: 5,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockRawClient := &mockRawKVClient{
errThreshold: tt.errThreshold,
}
client := restore.NewRawKVBatchClient(mockRawClient, 1)

ctx := context.Background()
if tt.cancelAfter > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, tt.cancelAfter)
defer cancel()
}

err := restore.PutRawKvWithRetry(ctx, client, []byte("key"), []byte("value"), 1)

if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.wantPuts, mockRawClient.putCount)
})
}
}
Empty file.
36 changes: 36 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ const (
flashbackRetryTime = 3
flashbackWaitInterval = 3000 * time.Millisecond
flashbackMaxWaitInterval = 15 * time.Second

rawClientMaxAttempts = 5
rawClientDelayTime = 500 * time.Millisecond
rawClientMaxDelayTime = 5 * time.Second
)

// ConstantBackoff is a backoffer that retry forever until success.
Expand Down Expand Up @@ -294,3 +298,35 @@ func (bo *flashbackBackoffer) NextBackoff(err error) time.Duration {
func (bo *flashbackBackoffer) Attempt() int {
return bo.attempt
}

type RawClientBackoffStrategy struct {
Attempts int
BaseBackoff time.Duration
MaxBackoff time.Duration
}

func NewRawClientBackoffStrategy() Backoffer {
return &RawClientBackoffStrategy{
Attempts: rawClientMaxAttempts,
BaseBackoff: rawClientDelayTime,
MaxBackoff: rawClientMaxDelayTime,
}
}

// NextBackoff returns a duration to wait before retrying again
func (b *RawClientBackoffStrategy) NextBackoff(error) time.Duration {
bo := b.BaseBackoff
b.Attempts--
if b.Attempts == 0 {
return 0
}
b.BaseBackoff *= 2
if b.BaseBackoff > b.MaxBackoff {
b.BaseBackoff = b.MaxBackoff
}
return bo
}

func (b *RawClientBackoffStrategy) Attempt() int {
return b.Attempts
}
6 changes: 3 additions & 3 deletions build/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package(default_visibility = ["//visibility:public"])

load("@io_bazel_rules_go//go:def.bzl", "nogo")
load("@bazel_skylib//rules:common_settings.bzl", "bool_flag")
load("@io_bazel_rules_go//go:def.bzl", "nogo")
load("//build/linter/staticcheck:def.bzl", "staticcheck_analyzers")

package(default_visibility = ["//visibility:public"])

bool_flag(
name = "with_nogo_flag",
build_setting_default = False,
Expand Down

0 comments on commit 4e52165

Please sign in to comment.