Skip to content

Commit

Permalink
[cinder-csi-plugin] Refactor list volumes (#2766)
Browse files Browse the repository at this point in the history
* refactor list volumes call

* upgrade tests

* comments improvements

* fix imports and list options

* token split

* add more jointoken  tests
  • Loading branch information
kon-angelo authored Feb 19, 2025
1 parent d6196ca commit 29f3f0b
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 797 deletions.
153 changes: 40 additions & 113 deletions pkg/csi/cinder/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package cinder

import (
"context"
"encoding/json"
"fmt"
"slices"
"sort"
"strconv"

Expand Down Expand Up @@ -126,7 +126,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol

// Volume Create
properties := map[string]string{cinderCSIClusterIDKey: cs.Driver.clusterID}
//Tag volume with metadata if present: https://github.com/kubernetes-csi/external-provisioner/pull/399
// Tag volume with metadata if present: https://github.com/kubernetes-csi/external-provisioner/pull/399
for _, mKey := range sharedcsi.RecognizedCSIProvisionerParams {
if v, ok := req.Parameters[mKey]; ok {
properties[mKey] = v
Expand Down Expand Up @@ -161,7 +161,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
var back *backups.Backup
back, err = cloud.GetBackupByID(ctx, snapshotID)
if err != nil {
//If there is an error getting the backup as well, fail.
// If there is an error getting the backup as well, fail.
return nil, status.Errorf(codes.NotFound, "VolumeContentSource Snapshot or Backup with ID %s not found", snapshotID)
}
if back.Status != "available" {
Expand Down Expand Up @@ -402,12 +402,6 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
return &csi.ControllerUnpublishVolumeResponse{}, nil
}

type CloudsStartingToken struct {
CloudName string `json:"cloud"`
Token string `json:"token"`
isEmpty bool
}

func (cs *controllerServer) extractNodeIDs(attachments []volumes.Attachment) []string {
nodeIDs := make([]string, len(attachments))
for i, attachment := range attachments {
Expand Down Expand Up @@ -439,123 +433,56 @@ func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolume
return nil, status.Errorf(codes.InvalidArgument, "[ListVolumes] Invalid max entries request %v, must not be negative ", req.MaxEntries)
}
maxEntries := int(req.MaxEntries)

var err error
var cloudsToken = CloudsStartingToken{
CloudName: "",
Token: "",
isEmpty: len(req.StartingToken) == 0,
}

cloudsNames := maps.Keys(cs.Clouds)
sort.Strings(cloudsNames)

currentCloudName := cloudsNames[0]
var (
token string
idx int
cloudName = cloudsNames[0]
)
if req.StartingToken != "" {
err = json.Unmarshal([]byte(req.StartingToken), &cloudsToken)
if err != nil {
return nil, status.Errorf(codes.Aborted, "[ListVolumes] Invalid request: Token invalid")
}
currentCloudName = cloudsToken.CloudName
}

startingToken := cloudsToken.Token
var cloudsVentries []*csi.ListVolumesResponse_Entry
var vlist []volumes.Volume
var nextPageToken string

if !cloudsToken.isEmpty && startingToken == "" {
// previous call ended on last volumes from "currentCloudName" we should pass to next one
for i := range cloudsNames {
if cloudsNames[i] == currentCloudName {
currentCloudName = cloudsNames[i+1]
break
}
token, cloudName = splitToken(req.StartingToken)
idx = slices.Index(cloudsNames, cloudName)
if idx < 0 {
return nil, status.Errorf(codes.Internal, "[ListVolumes] Invalid request: %s", fmt.Errorf("unknown cloud specified in the request: %v", cloudName))
}
}

startIdx := 0
for _, cloudName := range cloudsNames {
if cloudName == currentCloudName {
break
}
startIdx++
}
for idx := startIdx; idx < len(cloudsNames); idx++ {
if maxEntries > 0 {
vlist, nextPageToken, err = cs.Clouds[cloudsNames[idx]].ListVolumes(ctx, maxEntries-len(cloudsVentries), startingToken)
} else {
vlist, nextPageToken, err = cs.Clouds[cloudsNames[idx]].ListVolumes(ctx, maxEntries, startingToken)
}
startingToken = nextPageToken
if err != nil {
klog.Errorf("Failed to ListVolumes: %v", err)
if cpoerrors.IsInvalidError(err) {
return nil, status.Errorf(codes.Aborted, "[ListVolumes] Invalid request: %v", err)
}
return nil, status.Errorf(codes.Internal, "ListVolumes failed with error %v", err)
}

ventries := cs.createVolumeEntries(vlist)
klog.V(4).Infof("ListVolumes: retrieved %d entries and %q next token from cloud %q", len(ventries), nextPageToken, cloudsNames[idx])

cloudsVentries = append(cloudsVentries, ventries...)

// Reach maxEntries setup nextToken with cloud identifier if needed
sendEmptyToken := false
if maxEntries > 0 && len(cloudsVentries) == maxEntries {
if nextPageToken == "" {
if idx+1 == len(cloudsNames) {
// no more entries and no more clouds
// send no token its finished
klog.V(4).Infof("ListVolumes: completed with %d entries and %q next token", len(cloudsVentries), "")
return &csi.ListVolumesResponse{
Entries: cloudsVentries,
NextToken: "",
}, nil
} else {
// still clouds to process
// set token to next non empty cloud
i := 0
for i = idx + 1; i < len(cloudsNames); i++ {
vlistTmp, _, err := cs.Clouds[cloudsNames[i]].ListVolumes(ctx, 1, "")
if err != nil {
klog.Errorf("Failed to ListVolumes: %v", err)
if cpoerrors.IsInvalidError(err) {
return nil, status.Errorf(codes.Aborted, "[ListVolumes] Invalid request: %v", err)
}
return nil, status.Errorf(codes.Internal, "ListVolumes failed with error %v", err)
}
if len(vlistTmp) > 0 {
cloudsToken.CloudName = cloudsNames[i]
cloudsToken.isEmpty = false
break
}
}
if i == len(cloudsNames) {
sendEmptyToken = true
}
}
}
cloudsToken.CloudName = cloudsNames[idx]
cloudsToken.Token = nextPageToken
var data []byte
data, _ = json.Marshal(cloudsToken)
if sendEmptyToken {
data = []byte("")
}
klog.V(4).Infof("ListVolumes: completed with %d entries and %q next token", len(cloudsVentries), string(data))
return &csi.ListVolumesResponse{
Entries: cloudsVentries,
NextToken: string(data),
}, nil
var volumeList []volumes.Volume
volumeList, token, err = cs.Clouds[cloudName].ListVolumes(ctx, maxEntries, token)
if err != nil {
klog.Errorf("Failed to ListVolumes: %v", err)
if cpoerrors.IsInvalidError(err) {
return nil, status.Errorf(codes.Aborted, "[ListVolumes] Invalid request: %v", err)
}
return nil, status.Errorf(codes.Internal, "ListVolumes failed with error %v", err)
}
volumeEntries := cs.createVolumeEntries(volumeList)
klog.V(4).Infof("ListVolumes: retrieved %d entries and %q next token from cloud %q", len(volumeEntries), token, cloudName)

switch {
// if we have not finished listing all volumes from this cloud, we will continue on next call.
case token != "":
// if we listed all volumes from this cloud but more clouds exist, return a token of the next cloud.
case idx+1 < len(cloudsNames):
cloudName = cloudsNames[idx+1]
default:
// work is done.
klog.V(4).Infof("ListVolumes: completed with %d entries and %q next token", len(volumeEntries), "")
return &csi.ListVolumesResponse{
Entries: volumeEntries,
NextToken: "",
}, nil
}

klog.V(4).Infof("ListVolumes: completed with %d entries and %q next token", len(cloudsVentries), "")
nextToken := joinToken(token, cloudName)
klog.V(4).Infof("ListVolumes: completed with %d entries and %q next token", len(volumeEntries), nextToken)
return &csi.ListVolumesResponse{
Entries: cloudsVentries,
NextToken: "",
Entries: volumeEntries,
NextToken: nextToken,
}, nil
}

Expand Down
Loading

0 comments on commit 29f3f0b

Please sign in to comment.