Skip to content

Commit

Permalink
fix: propagate priority-class label for deploy and statefulset (#4036)
Browse files Browse the repository at this point in the history
Signed-off-by: Abirdcfly <[email protected]>
  • Loading branch information
Abirdcfly authored Jan 23, 2025
1 parent 269534b commit 195224d
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 7 deletions.
3 changes: 1 addition & 2 deletions pkg/controller/jobframework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ func MaximumExecutionTimeSeconds(job GenericJob) *int32 {
return ptr.To(int32(v))
}

func workloadPriorityClassName(job GenericJob) string {
object := job.Object()
func WorkloadPriorityClassName(object client.Object) string {
if workloadPriorityClassLabel := object.GetLabels()[constants.WorkloadPriorityClassLabel]; workloadPriorityClassLabel != "" {
return workloadPriorityClassLabel
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ func (r *JobReconciler) prepareWorkload(ctx context.Context, job GenericJob, wl
}

func (r *JobReconciler) extractPriority(ctx context.Context, podSets []kueue.PodSet, job GenericJob) (string, string, int32, error) {
if workloadPriorityClass := workloadPriorityClassName(job); len(workloadPriorityClass) > 0 {
if workloadPriorityClass := WorkloadPriorityClassName(job.Object()); len(workloadPriorityClass) > 0 {
return utilpriority.GetPriorityFromWorkloadPriorityClass(ctx, r.client, workloadPriorityClass)
}
if jobWithPriorityClass, isImplemented := job.(JobWithPriorityClass); isImplemented {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/jobframework/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func ValidateJobOnCreate(job GenericJob) field.ErrorList {
// ValidateJobOnUpdate encapsulates all GenericJob validations that must be performed on a Update operation
func ValidateJobOnUpdate(oldJob, newJob GenericJob) field.ErrorList {
allErrs := validateUpdateForQueueName(oldJob, newJob)
allErrs = append(allErrs, validateUpdateForWorkloadPriorityClassName(oldJob, newJob)...)
allErrs = append(allErrs, ValidateUpdateForWorkloadPriorityClassName(oldJob.Object(), newJob.Object())...)
allErrs = append(allErrs, validateUpdateForMaxExecTime(oldJob, newJob)...)
return allErrs
}
Expand Down Expand Up @@ -119,8 +119,8 @@ func validateUpdateForQueueName(oldJob, newJob GenericJob) field.ErrorList {
return allErrs
}

func validateUpdateForWorkloadPriorityClassName(oldJob, newJob GenericJob) field.ErrorList {
allErrs := apivalidation.ValidateImmutableField(workloadPriorityClassName(newJob), workloadPriorityClassName(oldJob), workloadPriorityClassNamePath)
func ValidateUpdateForWorkloadPriorityClassName(oldObj, newObj client.Object) field.ErrorList {
allErrs := apivalidation.ValidateImmutableField(WorkloadPriorityClassName(newObj), WorkloadPriorityClassName(oldObj), workloadPriorityClassNamePath)
return allErrs
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/jobs/deployment/deployment_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func (wh *Webhook) Default(ctx context.Context, obj runtime.Object) error {
}
deployment.Spec.Template.Labels[constants.QueueLabel] = queueName
}
if priorityClass := jobframework.WorkloadPriorityClassName(deployment.Object()); priorityClass != "" {
deployment.Spec.Template.Labels[constants.WorkloadPriorityClassLabel] = priorityClass
}

return nil
}
Expand Down Expand Up @@ -107,6 +110,10 @@ func (wh *Webhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Ob
oldDeployment.Spec.Template.GetLabels()[constants.QueueLabel],
podSpecQueueNameLabelPath,
)...)
allErrs = append(allErrs, jobframework.ValidateUpdateForWorkloadPriorityClassName(
oldDeployment.Object(),
newDeployment.Object(),
)...)

return warnings, allErrs.ToAggregate()
}
Expand Down
53 changes: 53 additions & 0 deletions pkg/controller/jobs/deployment/deployment_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/util/validation/field"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingdeployment "sigs.k8s.io/kueue/pkg/util/testingjobs/deployment"
Expand All @@ -48,6 +49,42 @@ func TestDefault(t *testing.T) {
PodTemplateSpecQueue("test-queue").
Obj(),
},
"deployment with queue and priority class": {
deployment: testingdeployment.MakeDeployment("test-pod", "").
Queue("test-queue").
Label(constants.WorkloadPriorityClassLabel, "test").
Obj(),
want: testingdeployment.MakeDeployment("test-pod", "").
Queue("test-queue").
Label(constants.WorkloadPriorityClassLabel, "test").
PodTemplateSpecQueue("test-queue").
PodTemplateSpecLabel(constants.WorkloadPriorityClassLabel, "test").
Obj(),
},
"deployment with queue, priority class and pod template spec queue, priority class": {
deployment: testingdeployment.MakeDeployment("test-pod", "").
Queue("new-test-queue").
Label(constants.WorkloadPriorityClassLabel, "new-test").
PodTemplateSpecQueue("test-queue").
PodTemplateSpecLabel(constants.WorkloadPriorityClassLabel, "test").
Obj(),
want: testingdeployment.MakeDeployment("test-pod", "").
Queue("new-test-queue").
Label(constants.WorkloadPriorityClassLabel, "new-test").
PodTemplateSpecQueue("new-test-queue").
PodTemplateSpecLabel(constants.WorkloadPriorityClassLabel, "new-test").
Obj(),
},
"deployment without queue with pod template spec queue and priority class": {
deployment: testingdeployment.MakeDeployment("test-pod", "").
PodTemplateSpecQueue("test-queue").
PodTemplateSpecLabel(constants.WorkloadPriorityClassLabel, "test").
Obj(),
want: testingdeployment.MakeDeployment("test-pod", "").
PodTemplateSpecQueue("test-queue").
PodTemplateSpecLabel(constants.WorkloadPriorityClassLabel, "test").
Obj(),
},
}

for name, tc := range testCases {
Expand Down Expand Up @@ -152,6 +189,22 @@ func TestValidateUpdate(t *testing.T) {
},
}.ToAggregate(),
},
"update priority-class": {
oldDeployment: testingdeployment.MakeDeployment("test-pod", "").
Queue("test-queue").
Label(constants.WorkloadPriorityClassLabel, "test").
Obj(),
newDeployment: testingdeployment.MakeDeployment("test-pod", "").
Queue("test-queue").
Label(constants.WorkloadPriorityClassLabel, "new-test").
Obj(),
wantErr: field.ErrorList{
&field.Error{
Type: field.ErrorTypeInvalid,
Field: "metadata.labels[kueue.x-k8s.io/priority-class]",
},
}.ToAggregate(),
},
}

for name, tc := range testCases {
Expand Down
9 changes: 8 additions & 1 deletion pkg/controller/jobs/statefulset/statefulset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func (wh *Webhook) Default(ctx context.Context, obj runtime.Object) error {
}
ss.Spec.Template.Annotations[pod.GroupTotalCountAnnotation] = fmt.Sprint(ptr.Deref(ss.Spec.Replicas, 1))
ss.Spec.Template.Annotations[pod.GroupFastAdmissionAnnotation] = "true"

if priorityClass := jobframework.WorkloadPriorityClassName(ss.Object()); priorityClass != "" {
ss.Spec.Template.Labels[constants.WorkloadPriorityClassLabel] = priorityClass
}
return nil
}

Expand All @@ -103,6 +105,7 @@ var (
statefulsetLabelsPath = field.NewPath("metadata", "labels")
statefulsetQueueNameLabelPath = statefulsetLabelsPath.Key(constants.QueueLabel)
statefulsetReplicasPath = field.NewPath("spec", "replicas")
priorityClassNameLabelPath = statefulsetLabelsPath.Key(constants.WorkloadPriorityClassLabel)
statefulsetGroupNameLabelPath = statefulsetLabelsPath.Key(pod.GroupNameLabel)

podSpecQueueNameLabelPath = field.NewPath("spec", "template", "metadata", "labels").
Expand Down Expand Up @@ -130,6 +133,10 @@ func (wh *Webhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Ob
oldStatefulSet.GetLabels()[pod.GroupNameLabel],
statefulsetGroupNameLabelPath,
)...)
allErrs = append(allErrs, jobframework.ValidateUpdateForWorkloadPriorityClassName(
oldStatefulSet.Object(),
newStatefulSet.Object(),
)...)

if isManagedByKueue(newStatefulSet.Object()) {
// TODO(#3279): support resizes later
Expand Down
42 changes: 42 additions & 0 deletions pkg/controller/jobs/statefulset/statefulset_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,24 @@ func TestDefault(t *testing.T) {
PodTemplateSpecPodGroupFastAdmissionAnnotation(true).
Obj(),
},
"statefulset with queue and priority class": {
enableIntegrations: []string{"pod"},
statefulset: testingstatefulset.MakeStatefulSet("test-pod", "").
Replicas(10).
Queue("test-queue").
Label(constants.WorkloadPriorityClassLabel, "test").
Obj(),
want: testingstatefulset.MakeStatefulSet("test-pod", "").
Replicas(10).
Queue("test-queue").
Label(constants.WorkloadPriorityClassLabel, "test").
PodTemplateSpecQueue("test-queue").
PodTemplateSpecLabel(constants.WorkloadPriorityClassLabel, "test").
PodTemplateSpecPodGroupNameLabel("test-pod", "", gvk).
PodTemplateSpecPodGroupTotalCountAnnotation(10).
PodTemplateSpecPodGroupFastAdmissionAnnotation(true).
Obj(),
},
"statefulset without replicas": {
enableIntegrations: []string{"pod"},
statefulset: testingstatefulset.MakeStatefulSet("test-pod", "").
Expand Down Expand Up @@ -208,6 +226,30 @@ func TestValidateUpdate(t *testing.T) {
},
}.ToAggregate(),
},
"change in priority class label": {
oldObj: &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
constants.QueueLabel: "queue1",
constants.WorkloadPriorityClassLabel: "priority1",
},
},
},
newObj: &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
constants.QueueLabel: "queue1",
constants.WorkloadPriorityClassLabel: "priority2",
},
},
},
wantErr: field.ErrorList{
&field.Error{
Type: field.ErrorTypeInvalid,
Field: priorityClassNameLabelPath.String(),
},
}.ToAggregate(),
},
"change in pod template queue label": {
oldObj: &appsv1.StatefulSet{
Spec: appsv1.StatefulSetSpec{
Expand Down

0 comments on commit 195224d

Please sign in to comment.