Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add retry for raw kv client put (#58963) #59081

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3176,7 +3176,7 @@ func (rc *Client) restoreMetaKvEntries(
failpoint.Inject("failed-to-restore-metakv", func(_ failpoint.Value) {
failpoint.Return(0, 0, errors.Errorf("failpoint: failed to restore metakv"))
})
if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.ts); err != nil {
if err := PutRawKvWithRetry(ctx, rc.rawKVClient, newEntry.Key, newEntry.Value, entry.ts); err != nil {
return 0, 0, errors.Trace(err)
}
// for failpoint, we need to flush the cache in rawKVClient every time
Expand Down Expand Up @@ -3802,3 +3802,13 @@ func (b *waitTiFlashBackoffer) NextBackoff(error) time.Duration {
func (b *waitTiFlashBackoffer) Attempt() int {
return b.Attempts
}

func PutRawKvWithRetry(ctx context.Context, client *RawKVBatchClient, key, value []byte, originTs uint64) error {
err := utils.WithRetry(ctx, func() error {
return client.Put(ctx, key, value, originTs)
}, utils.NewRawClientBackoffStrategy())
if err != nil {
return errors.Errorf("failed to put raw kv after retry")
}
return nil
}
67 changes: 67 additions & 0 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/rawkv"
pd "github.com/tikv/pd/client"
"google.golang.org/grpc/keepalive"
)
Expand Down Expand Up @@ -1928,3 +1929,69 @@ func TestCheckNewCollationEnable(t *testing.T) {
require.Equal(t, ca.newCollationEnableInCluster == "True", enabled)
}
}

type mockRawKVClient struct {
rawkv.Client
putCount int
errThreshold int
}

func (m *mockRawKVClient) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error {
m.putCount += 1
if m.errThreshold >= m.putCount {
return errors.New("rpcClient is idle")
}
return nil
}

func TestPutRawKvWithRetry(t *testing.T) {
tests := []struct {
name string
errThreshold int
cancelAfter time.Duration
wantErr string
wantPuts int
}{
{
name: "success on first try",
errThreshold: 0,
wantPuts: 1,
},
{
name: "success on after failure",
errThreshold: 2,
wantPuts: 3,
},
{
name: "fails all retries",
errThreshold: 5,
wantErr: "failed to put raw kv after retry",
wantPuts: 5,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockRawClient := &mockRawKVClient{
errThreshold: tt.errThreshold,
}
client := restore.NewRawKVBatchClient(mockRawClient, 1)

ctx := context.Background()
if tt.cancelAfter > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, tt.cancelAfter)
defer cancel()
}

err := restore.PutRawKvWithRetry(ctx, client, []byte("key"), []byte("value"), 1)

if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.wantPuts, mockRawClient.putCount)
})
}
}
Empty file.
36 changes: 36 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ const (
ChecksumMaxWaitInterval = 30 * time.Second

gRPC_Cancel = "the client connection is closing"

rawClientMaxAttempts = 5
rawClientDelayTime = 500 * time.Millisecond
rawClientMaxDelayTime = 5 * time.Second
)

// At least, there are two possible cancel() call,
Expand Down Expand Up @@ -300,3 +304,35 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration {
func (bo *pdReqBackoffer) Attempt() int {
return bo.attempt
}

type RawClientBackoffStrategy struct {
Attempts int
BaseBackoff time.Duration
MaxBackoff time.Duration
}

func NewRawClientBackoffStrategy() Backoffer {
return &RawClientBackoffStrategy{
Attempts: rawClientMaxAttempts,
BaseBackoff: rawClientDelayTime,
MaxBackoff: rawClientMaxDelayTime,
}
}

// NextBackoff returns a duration to wait before retrying again
func (b *RawClientBackoffStrategy) NextBackoff(error) time.Duration {
bo := b.BaseBackoff
b.Attempts--
if b.Attempts == 0 {
return 0
}
b.BaseBackoff *= 2
if b.BaseBackoff > b.MaxBackoff {
b.BaseBackoff = b.MaxBackoff
}
return bo
}

func (b *RawClientBackoffStrategy) Attempt() int {
return b.Attempts
}
2 changes: 1 addition & 1 deletion br/tests/br_file_corruption/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ for filename in $(find $TEST_DIR/$DB -name "*.sst_bak"); do
mv "$filename" "${filename%_bak}"
done

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/snap_client/full-restore-validate-checksum=return(true)"
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/client/full-restore-validate-checksum=return(true)"
restore_fail=0
run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$DB" --checksum=true || restore_fail=1
export GO_FAILPOINTS=""
Expand Down
51 changes: 51 additions & 0 deletions br/tests/br_test_utils.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/sh
#
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux

wait_log_checkpoint_advance() {
local task_name=${1:-$TASK_NAME}
echo "wait for log checkpoint to advance for task: $task_name"
sleep 10
local current_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)")
echo "current ts: $current_ts"
i=0
while true; do
# extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty
log_backup_status=$(unset BR_LOG_TO_TERM && run_br --skip-goleak --pd $PD_ADDR log status --task-name $task_name --json 2>br.log)
echo "log backup status: $log_backup_status"
local checkpoint_ts=$(echo "$log_backup_status" | head -n 1 | jq 'if .[0].last_errors | length == 0 then .[0].checkpoint else empty end')
echo "checkpoint ts: $checkpoint_ts"

# check whether the checkpoint ts is a number
if [ $checkpoint_ts -gt 0 ] 2>/dev/null; then
if [ $checkpoint_ts -gt $current_ts ]; then
echo "the checkpoint has advanced"
break
fi
echo "the checkpoint hasn't advanced"
i=$((i+1))
if [ "$i" -gt 50 ]; then
echo 'the checkpoint lag is too large'
exit 1
fi
sleep 10
else
echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!"
exit 1
fi
done
}
2 changes: 1 addition & 1 deletion br/tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mkdir $COV_DIR
# Putting multiple light tests together and heavy tests in a separate group.
declare -A groups
groups=(
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable"
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable br_check_dup_table"
["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full"
["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history br_pitr_failpoint br_pitr_long_running_schema_loading"
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr'
Expand Down