From ada1013e6261192551c746cd18269423a74fcb95 Mon Sep 17 00:00:00 2001 From: live-wire Date: Thu, 27 Jul 2023 11:38:51 +0200 Subject: [PATCH 1/2] Make sure application mode clusters aren't stuck in status updating --- controllers/flinkcluster/flinkcluster_reconciler.go | 9 +++++++++ controllers/flinkcluster/flinkcluster_updater.go | 12 ++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/controllers/flinkcluster/flinkcluster_reconciler.go b/controllers/flinkcluster/flinkcluster_reconciler.go index 579e0fad..61e05519 100644 --- a/controllers/flinkcluster/flinkcluster_reconciler.go +++ b/controllers/flinkcluster/flinkcluster_reconciler.go @@ -478,6 +478,15 @@ func (reconciler *ClusterReconciler) reconcileJob(ctx context.Context) (ctrl.Res if err != nil { return requeueResult, err } + } else if observedSubmitter.Status.Failed >= 1 { + log.Info("Found failed job submitter") + err = reconciler.deleteJob(ctx, observedSubmitter) + if err != nil { + return requeueResult, err + } + } else { + log.Info("Found job submitter, wait for it to be active or failed") + return requeueResult, nil } } else { err = reconciler.createJob(ctx, desiredJob) diff --git a/controllers/flinkcluster/flinkcluster_updater.go b/controllers/flinkcluster/flinkcluster_updater.go index 378ad5b3..a76dccd8 100644 --- a/controllers/flinkcluster/flinkcluster_updater.go +++ b/controllers/flinkcluster/flinkcluster_updater.go @@ -22,10 +22,11 @@ package flinkcluster import ( "encoding/json" "fmt" - batchv1 "k8s.io/api/batch/v1" "reflect" "time" + batchv1 "k8s.io/api/batch/v1" + "golang.org/x/net/context" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -659,10 +660,13 @@ func (updater *ClusterStatusUpdater) deriveJobStatus(ctx context.Context) *v1bet newJobState = v1beta1.JobStatePending case shouldUpdateJob(&observed): newJobState = v1beta1.JobStateUpdating - case oldJob.ShouldRestart(jobSpec): - newJobState = v1beta1.JobStateRestarting case oldJob.IsStopped(): - newJobState = oldJob.State + // When a new job is deploying, update the job state to deploying. + if observedSubmitter.job != nil && observedSubmitter.job.Status.Active == 1 { + newJobState = v1beta1.JobStateDeploying + } else { + newJobState = oldJob.State + } case oldJob.IsPending() && oldJob.DeployTime != "": newJobState = v1beta1.JobStateDeploying // Derive the job state from the observed Flink job, if it exists. From bbafcc1ecacecaffb087cb3445cf0e2783c41b9c Mon Sep 17 00:00:00 2001 From: live-wire Date: Thu, 17 Aug 2023 15:25:38 +0200 Subject: [PATCH 2/2] Job status takes a brief moment to publish --- controllers/flinkcluster/flinkcluster_reconciler.go | 1 + controllers/flinkcluster/flinkcluster_updater.go | 2 +- controllers/flinkcluster/flinkcluster_util.go | 4 ++++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/controllers/flinkcluster/flinkcluster_reconciler.go b/controllers/flinkcluster/flinkcluster_reconciler.go index 61e05519..f8b2cfcb 100644 --- a/controllers/flinkcluster/flinkcluster_reconciler.go +++ b/controllers/flinkcluster/flinkcluster_reconciler.go @@ -576,6 +576,7 @@ func (reconciler *ClusterReconciler) createJob(ctx context.Context, job *batchv1 } else { log.Info("Job submitter created") } + return err } diff --git a/controllers/flinkcluster/flinkcluster_updater.go b/controllers/flinkcluster/flinkcluster_updater.go index a76dccd8..914bd728 100644 --- a/controllers/flinkcluster/flinkcluster_updater.go +++ b/controllers/flinkcluster/flinkcluster_updater.go @@ -662,7 +662,7 @@ func (updater *ClusterStatusUpdater) deriveJobStatus(ctx context.Context) *v1bet newJobState = v1beta1.JobStateUpdating case oldJob.IsStopped(): // When a new job is deploying, update the job state to deploying. - if observedSubmitter.job != nil && observedSubmitter.job.Status.Active == 1 { + if observedSubmitter.job != nil && (observedSubmitter.job.Status.Active == 1 || isJobInitialising(observedSubmitter.job.Status)) { newJobState = v1beta1.JobStateDeploying } else { newJobState = oldJob.State diff --git a/controllers/flinkcluster/flinkcluster_util.go b/controllers/flinkcluster/flinkcluster_util.go index bf2dfa81..9079b113 100644 --- a/controllers/flinkcluster/flinkcluster_util.go +++ b/controllers/flinkcluster/flinkcluster_util.go @@ -600,3 +600,7 @@ func GenJobId(cluster *v1beta1.FlinkCluster) (string, error) { hash := md5.Sum([]byte(cluster.Status.Revision.NextRevision)) return hex.EncodeToString(hash[:]), nil } + +func isJobInitialising(jobStatus batchv1.JobStatus) bool { + return jobStatus.Active == 0 && jobStatus.Succeeded == 0 && jobStatus.Failed == 0 +}