Skip to content

Commit

Permalink
Make parallel restore configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Ming Qiu <[email protected]>
  • Loading branch information
qiuming-best committed Mar 11, 2024
1 parent 84c1eca commit a2202e0
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 7 deletions.
3 changes: 3 additions & 0 deletions config/crd/v1/bases/velero.io_restores.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,9 @@ spec:
description: UploaderConfig specifies the configuration for the restore.
nullable: true
properties:
parallel:
description: Parallel is the number of parallel for restore.
type: integer
writeSparseFiles:
description: WriteSparseFiles is a flag to indicate whether write
files sparsely or not.
Expand Down
2 changes: 1 addition & 1 deletion config/crd/v1/crds/crds.go

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions design/Implemented/velero-uploader-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,5 +177,38 @@ Roughly, the process is as follows:
4. Each respective controller within the CRs calls the uploader, and the WriteSparseFiles from map in CRs is passed to the uploader.
5. When the uploader subsequently calls the Kopia API, it can use the WriteSparseFiles to set the WriteSparseFiles parameter, and if the uploader calls the Restic command it would append `--sparse` flag within the restore command.

### Parallel Restore
Setting the parallelism of restore operations can improve the efficiency and speed of the recovery process, especially when dealing with large amounts of data.

### Velero CLI
The Velero CLI will support a --parallel flag, allowing users to set the parallelism value when creating restores with Restic or Kopia uploader.

### UploaderConfig
below the sub-option parallel is added into UploaderConfig:

```yaml
// UploaderConfigForRestore defines the configuration for the restore.
type UploaderConfigForRestore struct {
// Parallel is a flag to set the parallelism value for restore
// +optional
Parallel int `json:"parallel,omitempty"`
}
```

Kopia Parallel Resotre Policy

Check failure on line 198 in design/Implemented/velero-uploader-configuration.md

View workflow job for this annotation

GitHub Actions / Run Codespell

Resotre ==> Restore
Velero Uploader can set restore policies when calling Kopia APIs. In the Kopia codebase, the structure for restore policies is defined as follows:

// UploadPolicy describes the policy to apply when uploading snapshots.
type UploadPolicy struct {
...
MaxParallelFileReads *OptionalInt `json:"maxParallelFileReads,omitempty"`
}
Velero can set the MaxParallelFileReads parameter for Kopia's upload policy as follows:

curPolicy := getDefaultPolicy()
if parallelUpload > 0 {
curPolicy.UploadPolicy.MaxParallelFileReads = newOptionalInt(parallelUpload)
}

## Alternatives Considered
To enhance extensibility further, the option of storing `UploaderConfig` in a Kubernetes ConfigMap can be explored, this approach would allow the addition and modification of configuration options without the need to modify the CRD.
3 changes: 3 additions & 0 deletions pkg/apis/velero/v1/restore_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ type UploaderConfigForRestore struct {
// +optional
// +nullable
WriteSparseFiles *bool `json:"writeSparseFiles,omitempty"`
// Parallel is the number of parallel for restore.
// +optional
Parallel int `json:"parallel,omitempty"`
}

// RestoreHooks contains custom behaviors that should be executed during or post restore.
Expand Down
6 changes: 6 additions & 0 deletions pkg/builder/restore_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,9 @@ func (b *RestoreBuilder) WriteSparseFiles(val bool) *RestoreBuilder {
b.object.Spec.UploaderConfig.WriteSparseFiles = &val
return b
}

// RestoreConcurrency sets the Restore's uploader restore concurrency
func (b *RestoreBuilder) RestoreConcurrency(val int) *RestoreBuilder {
b.object.Spec.UploaderConfig.Parallel = val
return b
}
8 changes: 8 additions & 0 deletions pkg/cmd/cli/restore/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type CreateOptions struct {
ItemOperationTimeout time.Duration
ResourceModifierConfigMap string
WriteSparseFiles flag.OptionalBool
Parallel int
client kbclient.WithWatch
}

Expand Down Expand Up @@ -151,6 +152,8 @@ func (o *CreateOptions) BindFlags(flags *pflag.FlagSet) {

f = flags.VarPF(&o.WriteSparseFiles, "write-sparse-files", "", "Whether to write sparse files during restoring volumes")
f.NoOptDefVal = cmd.TRUE

flags.IntVar(&o.Parallel, "parallel", 0, "The number of restore operations to run in parallel. If set to 0, the default parallelism will be the number of CPUs allocated.")
}

func (o *CreateOptions) Complete(args []string, f client.Factory) error {
Expand Down Expand Up @@ -200,6 +203,10 @@ func (o *CreateOptions) Validate(c *cobra.Command, args []string, f client.Facto
return errors.New("existing-resource-policy has invalid value, it accepts only none, update as value")
}

if o.Parallel < 0 {
return errors.New("parallel cannot be negative")
}

switch {
case o.BackupName != "":
backup := new(api.Backup)
Expand Down Expand Up @@ -325,6 +332,7 @@ func (o *CreateOptions) Run(c *cobra.Command, f client.Factory) error {
},
UploaderConfig: &api.UploaderConfigForRestore{
WriteSparseFiles: o.WriteSparseFiles.Value,
Parallel: o.Parallel,
},
},
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/cli/restore/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestCreateCommand(t *testing.T) {
allowPartiallyFailed := "true"
itemOperationTimeout := "10m0s"
writeSparseFiles := "true"

parallel := 2
flags := new(pflag.FlagSet)
o := NewCreateOptions()
o.BindFlags(flags)
Expand All @@ -108,6 +108,7 @@ func TestCreateCommand(t *testing.T) {
flags.Parse([]string{"--allow-partially-failed", allowPartiallyFailed})
flags.Parse([]string{"--item-operation-timeout", itemOperationTimeout})
flags.Parse([]string{"--write-sparse-files", writeSparseFiles})
flags.Parse([]string{"--parallel", "2"})
client := velerotest.NewFakeControllerRuntimeClient(t).(kbclient.WithWatch)

f.On("Namespace").Return(mock.Anything)
Expand Down Expand Up @@ -144,6 +145,7 @@ func TestCreateCommand(t *testing.T) {
require.Equal(t, allowPartiallyFailed, o.AllowPartiallyFailed.String())
require.Equal(t, itemOperationTimeout, o.ItemOperationTimeout.String())
require.Equal(t, writeSparseFiles, o.WriteSparseFiles.String())
require.Equal(t, parallel, o.Parallel)
})

t.Run("create a restore from schedule", func(t *testing.T) {
Expand Down
14 changes: 11 additions & 3 deletions pkg/cmd/util/output/restore_describer.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,16 @@ func DescribeRestore(ctx context.Context, kbClient kbclient.Client, restore *vel
d.Println()
d.Printf("Preserve Service NodePorts:\t%s\n", BoolPointerString(restore.Spec.PreserveNodePorts, "false", "true", "auto"))

if restore.Spec.UploaderConfig != nil && boolptr.IsSetToTrue(restore.Spec.UploaderConfig.WriteSparseFiles) {
d.Println()
DescribeUploaderConfigForRestore(d, restore.Spec)
if restore.Spec.UploaderConfig != nil {
if boolptr.IsSetToTrue(restore.Spec.UploaderConfig.WriteSparseFiles) {
d.Println()
DescribeUploaderConfigForRestore(d, restore.Spec)
}

if restore.Spec.UploaderConfig.Parallel > 0 {
d.Println()
d.Printf("Parallelism:\t%d\n", restore.Spec.UploaderConfig.Parallel)
}
}

d.Println()
Expand All @@ -203,6 +210,7 @@ func DescribeRestore(ctx context.Context, kbClient kbclient.Client, restore *vel
func DescribeUploaderConfigForRestore(d *Describer, spec velerov1api.RestoreSpec) {
d.Printf("Uploader config:\n")
d.Printf("\tWrite Sparse Files:\t%T\n", boolptr.IsSetToTrue(spec.UploaderConfig.WriteSparseFiles))
d.Printf("\tParallel Restore:\t%d\n", spec.UploaderConfig.Parallel)
}

func describeRestoreItemOperations(ctx context.Context, kbClient kbclient.Client, d *Describer, restore *velerov1api.Restore, details bool, insecureSkipTLSVerify bool, caCertPath string) {
Expand Down
14 changes: 12 additions & 2 deletions pkg/uploader/kopia/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
IgnorePermissionErrors: true,
}

restoreConcurrency := runtime.NumCPU()

if len(uploaderCfg) > 0 {
writeSparseFiles, err := uploaderutil.GetWriteSparseFiles(uploaderCfg)
if err != nil {
Expand All @@ -419,9 +421,17 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
if writeSparseFiles {
fsOutput.WriteSparseFiles = true
}

concurrency, err := uploaderutil.GetRestoreConcurrency(uploaderCfg)
if err != nil {
return 0, 0, errors.Wrap(err, "failed to get parallel restore uploader config")
}
if restoreConcurrency > 0 {
restoreConcurrency = concurrency
}
}

log.Debugf("Restore filesystem output %v", fsOutput)
log.Debugf("Restore filesystem output %v, concurrency %d", fsOutput, restoreConcurrency)

err = fsOutput.Init(ctx)
if err != nil {
Expand All @@ -436,7 +446,7 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
}

stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{
Parallel: runtime.NumCPU(),
Parallel: restoreConcurrency,
RestoreDirEntryAtDepth: math.MaxInt32,
Cancel: cancleCh,
ProgressCallback: func(ctx context.Context, stats restore.Stats) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/uploader/provider/restic.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,5 +246,9 @@ func (rp *resticProvider) parseRestoreExtraFlags(uploaderCfg map[string]string)
extraFlags = append(extraFlags, "--sparse")
}

if restoreConcurrency, err := uploaderutil.GetRestoreConcurrency(uploaderCfg); err == nil && restoreConcurrency > 0 {
return extraFlags, errors.New("restic does not support parallel restore")
}

return extraFlags, nil
}
7 changes: 7 additions & 0 deletions pkg/uploader/provider/restic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,13 @@ func TestParseUploaderConfig(t *testing.T) {
},
expectedFlags: []string{},
},
{
name: "RestoreConcorrency",
uploaderConfig: map[string]string{
"Parallel": "5",
},
expectedFlags: []string{},
},
}

for _, testCase := range testCases {
Expand Down
17 changes: 17 additions & 0 deletions pkg/uploader/util/uploader_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
const (
ParallelFilesUpload = "ParallelFilesUpload"
WriteSparseFiles = "WriteSparseFiles"
RestoreConcurrency = "RestoreConcurrency"
)

func StoreBackupConfig(config *velerov1api.UploaderConfigForBackup) map[string]string {
Expand All @@ -42,6 +43,10 @@ func StoreRestoreConfig(config *velerov1api.UploaderConfigForRestore) map[string
} else {
data[WriteSparseFiles] = strconv.FormatBool(false)
}

if config.Parallel > 0 {
data[RestoreConcurrency] = strconv.Itoa(config.Parallel)
}
return data
}

Expand All @@ -68,3 +73,15 @@ func GetWriteSparseFiles(uploaderCfg map[string]string) (bool, error) {
}
return false, nil
}

func GetRestoreConcurrency(uploaderCfg map[string]string) (int, error) {
restoreConcurrency, ok := uploaderCfg[RestoreConcurrency]
if ok {
restoreConcurrencyInt, err := strconv.Atoi(restoreConcurrency)
if err != nil {
return 0, errors.Wrap(err, "failed to parse RestoreConcurrency config")
}
return restoreConcurrencyInt, nil
}
return 0, nil
}
10 changes: 10 additions & 0 deletions pkg/uploader/util/uploader_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ func TestStoreRestoreConfig(t *testing.T) {
WriteSparseFiles: "false", // Assuming default value is false for nil case
},
},
{
name: "Parallel is set",
config: &velerov1api.UploaderConfigForRestore{
Parallel: 5,
},
expectedData: map[string]string{
RestoreConcurrency: "5",
WriteSparseFiles: "false",
},
},
}

for _, tc := range testCases {
Expand Down

0 comments on commit a2202e0

Please sign in to comment.