diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 0b7ca80a..1ca27732 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -24,8 +24,10 @@ import ( "math" "math/rand" "reflect" + "regexp" "runtime/debug" "sort" + "strconv" "strings" "sync" "time" @@ -64,6 +66,13 @@ import ( // defaultBackoffTime is the default backoff time in seconds const defaultBackoffTime = 20 +var ( + // Regex variables for extracting resource values + cpuRegex = regexp.MustCompile(`cpu (\d+(\.\d+)?)`) + memoryRegex = regexp.MustCompile(`memory (\d+(\.\d+)?)`) + gpuRegex = regexp.MustCompile(`GPU (\d+)`) +) + // XController the AppWrapper Controller type type XController struct { // MCAD configuration @@ -216,7 +225,7 @@ func (qjm *XController) allocatableCapacity() *clusterstateapi.Resource { } } } - klog.Infof("[allocatableCapacity] The available capacity to dispatch appwrapper is %v and time took to calculate is %v", capacity, time.Since(startTime)) + klog.V(2).Infof("[allocatableCapacity] The available capacity to dispatch appwrapper is %v and time took to calculate is %v", capacity, time.Since(startTime)) return capacity } @@ -445,7 +454,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { } else { // Only back-off AWs that are in state running and not in state Failed if updateNewJob.Status.State != arbv1.AppWrapperStateFailed { - klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name) + klog.V(4).Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name) qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message)) } } @@ -921,6 +930,63 @@ func (qjm *XController) chooseAgent(ctx context.Context, qj *arbv1.AppWrapper) s return "" } +// Function for calculating required resources to run AppWrapper and logging if there is a required increase for resources. +func calculateRequiredResources(namespace string, appwrapperName string, requestedResources string, totalAvailable string) { + // Requested resources values // + requestedCPU := cpuRegex.FindStringSubmatch(requestedResources) + requestedMemory := memoryRegex.FindStringSubmatch(requestedResources) + requestedGPU := gpuRegex.FindStringSubmatch(requestedResources) + + // Available resources values // + availableCPU := cpuRegex.FindStringSubmatch(totalAvailable) + availableMemory := memoryRegex.FindStringSubmatch(totalAvailable) + availableGPU := gpuRegex.FindStringSubmatch(totalAvailable) + + // Convert values to float // + requestedCPUValue, err := strconv.ParseFloat(requestedCPU[1], 64) + if err != nil { + klog.Warningf("Error parsing CPU value: ", err) + } + availableCPUValue, err := strconv.ParseFloat(availableCPU[1], 64) + if err != nil { + klog.Warningf("Error parsing CPU value: ", err) + } + requestedMemoryValue, err := strconv.ParseFloat(requestedMemory[1], 64) + if err != nil { + klog.Warningf("Error parsing Memory value: ", err) + } + availableMemoryValue, err := strconv.ParseFloat(availableMemory[1], 64) + if err != nil { + klog.Warningf("Error parsing Memory value: ", err) + } + requestedGPUValue, err := strconv.ParseFloat(requestedGPU[1], 64) + if err != nil { + klog.Warningf("Error parsing GPU value: ", err) + } + availableGPUValue, err := strconv.ParseFloat(availableGPU[1], 64) + if err != nil { + klog.Warningf("Error parsing GPU value: ", err) + } + + // Get required resources // + requiredCPU := requestedCPUValue - availableCPUValue + if requiredCPU < availableCPUValue { + requiredCPU = 0 + } + requiredMemory := requestedMemoryValue - availableMemoryValue + if requiredMemory < availableMemoryValue { + requiredMemory = 0 + } + requiredGPU := requestedGPUValue - availableGPUValue + if requiredGPU < availableGPUValue { + requiredGPU = 0 + } + if requiredCPU != 0 || requiredMemory != 0 || requiredGPU != 0 { + // This is important information to know hence no log level + klog.Infof("[ScheduleNext] Appwrapper '%s/%s' requires additional resources CPU: %f, Memory: %f, GPU: %f", namespace, appwrapperName, requiredCPU, requiredMemory, requiredGPU) + } +} + // Thread to find queue-job(QJ) for next schedule func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { ctx := context.Background() @@ -937,7 +1003,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { scheduleNextRetrier.SetJitter(0.05) // Retry the execution err = scheduleNextRetrier.Run(func() error { - klog.Infof("[ScheduleNext] activeQ.Pop %s/%s *Delay=%.6f seconds RemainingLength=%d &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj, + klog.V(6).Infof("[ScheduleNext] activeQ.Pop %s/%s *Delay=%.6f seconds RemainingLength=%d Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj.ResourceVersion, qj.Status) apiCacheAWJob, retryErr := qjm.getAppWrapper(qj.Namespace, qj.Name, "[ScheduleNext] -- get fresh copy after queue pop") @@ -995,7 +1061,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.V(3).Infof("[ScheduleNext] Cannot pop QueueJob from qjqueue! err=%#v", retryErr) return err } - klog.V(3).Infof("[ScheduleNext] activeQ.Pop_afterPriorityUpdate %s/%s *Delay=%.6f seconds RemainingLength=%d &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj, qj.ResourceVersion, qj.Status) + klog.V(4).Infof("[ScheduleNext] activeQ.Pop_afterPriorityUpdate %s/%s *Delay=%.6f seconds RemainingLength=%d Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj.ResourceVersion, qj.Status) apiCacheAWJob, retryErr := qjm.getAppWrapper(qj.Namespace, qj.Name, "[ScheduleNext] -- after dynamic priority pop") if retryErr != nil { if apierrors.IsNotFound(retryErr) { @@ -1005,7 +1071,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { return err } if apiCacheAWJob.Status.CanRun { - klog.Infof("[ScheduleNext] AppWrapper job: %s/%s from API is already scheduled. Ignoring request: Status=%+v", qj.Namespace, qj.Name, qj.Status) + klog.V(4).Infof("[ScheduleNext] AppWrapper job: %s/%s from API is already scheduled. Ignoring request: Status=%+v", qj.Namespace, qj.Name, qj.Status) return nil } apiCacheAWJob.DeepCopyInto(qj) @@ -1030,9 +1096,9 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.V(4).Infof("[ScheduleNext] after Pop qjqLength=%d qj %s/%s Version=%s activeQ=%t Unsched=%t Status=%v", qjm.qjqueue.Length(), qj.Namespace, qj.Name, qj.ResourceVersion, qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.Status) if qjm.isDispatcher { - klog.Infof("[ScheduleNext] [Dispatcher Mode] Attempting to dispatch next appwrapper: '%s/%s Status=%v", qj.Namespace, qj.Name, qj.Status) + klog.V(4).Infof("[ScheduleNext] [Dispatcher Mode] Attempting to dispatch next appwrapper: '%s/%s Status=%v", qj.Namespace, qj.Name, qj.Status) } else { - klog.Infof("[ScheduleNext] [Agent Mode] Attempting to dispatch next appwrapper: '%s/%s' Status=%v", qj.Namespace, qj.Name, qj.Status) + klog.V(4).Infof("[ScheduleNext] [Agent Mode] Attempting to dispatch next appwrapper: '%s/%s' Status=%v", qj.Namespace, qj.Name, qj.Status) } dispatchFailedReason := "AppWrapperNotRunnable." @@ -1110,6 +1176,8 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { resources, proposedPreemptions := qjm.getAggregatedAvailableResourcesPriority( unallocatedResources, priorityindex, qj, "") klog.Infof("[ScheduleNext] [Agent Mode] Appwrapper '%s/%s' with resources %v to be scheduled on aggregated idle resources %v", qj.Namespace, qj.Name, aggqj, resources) + // call calculateRequiredResources to log the remaining required resources for the app wrapper + calculateRequiredResources(qj.Namespace, qj.Name, aggqj.String(), resources.String()) // Jobs dispatched with quota management may be borrowing quota from other tree nodes making those jobs preemptable, regardless of their priority. // Cluster resources need to be considered to determine if both quota and resources (after deleting borrowing AppWrappers) are availabe for the new AppWrapper @@ -1167,15 +1235,15 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.Info("%s %s %s", quotaFits, preemptAWs, msg) if quotaFits { - klog.Infof("[ScheduleNext] [Agent mode] quota evaluation successful for app wrapper '%s/%s' activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", - qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + klog.V(4).Infof("[ScheduleNext] [Agent mode] quota evaluation successful for app wrapper '%s/%s' activeQ=%t Unsched=%t Version=%s Status=%+v", + qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) // Set any jobs that are marked for preemption qjm.preemptAWJobs(ctx, preemptAWs) } else { // Not enough free quota to dispatch appwrapper dispatchFailedMessage = "Insufficient quota and/or resources to dispatch AppWrapper." dispatchFailedReason = "quota limit exceeded" - klog.Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s", - qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, msg) + klog.Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t Version=%s Status=%+v msg=%s", + qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status, msg) // Call update etcd here to retrigger AW execution for failed quota // TODO: quota management tests fail if this is converted into go-routine, need to inspect why? qjm.backoff(context.Background(), qj, dispatchFailedReason, dispatchFailedMessage) @@ -1188,19 +1256,19 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { fits = false } } else { - klog.V(4).Infof("[ScheduleNext] [Agent Mode] Quota evaluation not enabled for '%s/%s' at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Namespace, - qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + klog.V(4).Infof("[ScheduleNext] [Agent Mode] Quota evaluation not enabled for '%s/%s' at %s activeQ=%t Unsched=%t Version=%s Status=%+v", qj.Namespace, + qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) if aggqj.LessEqual(resources) { // Check if enough resources to dispatch fits = true - klog.Infof("[ScheduleNext] [Agent Mode] available resource successful check for '%s/%s' at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v.", - qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + klog.V(4).Infof("[ScheduleNext] [Agent Mode] available resource successful check for '%s/%s' at %s activeQ=%t Unsched=%t Version=%s Status=%+v.", + qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) } else { // Not enough free resources to dispatch HOL fits = false dispatchFailedMessage = "Insufficient resources to dispatch AppWrapper." - klog.Infof("[ScheduleNext] [Agent Mode] Failed to dispatch app wrapper '%s/%s' due to insufficient resources, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", + klog.Infof("[ScheduleNext] [Agent Mode] Failed to dispatch app wrapper '%s/%s' due to insufficient resources, activeQ=%t Unsched=%t Version=%s Status=%+v", qj.Namespace, qj.Name, qjm.qjqueue.IfExistActiveQ(qj), - qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) // TODO: Remove forwarded logic as a big AW will never be forwarded forwarded = true // should we call backoff or update etcd? @@ -1262,8 +1330,8 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { fowardingLoopCount += 1 } if !forwarded { // start thread to backoff - klog.Infof("[ScheduleNext] [Agent Mode] backing off app wrapper '%s/%s' after waiting for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", - qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + klog.Infof("[ScheduleNext] [Agent Mode] backing off app wrapper '%s/%s' after waiting for %s activeQ=%t Unsched=%t Version=%s Status=%+v", + qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) if qjm.quotaManager != nil && quotaFits { qjm.quotaManager.Release(qj) } @@ -1401,8 +1469,8 @@ func (qjm *XController) backoff(ctx context.Context, q *arbv1.AppWrapper, reason klog.Errorf("[backoff] Failed to update status for %s/%s. Continuing with possible stale object without updating conditions. err=%s", q.Namespace, q.Name, err) } qjm.qjqueue.AddUnschedulableIfNotPresent(q) - klog.V(3).Infof("[backoff] %s/%s move to unschedulableQ before sleep for %d seconds. activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", q.Namespace, q.Name, - qjm.config.BackoffTimeOrDefault(defaultBackoffTime), qjm.qjqueue.IfExistActiveQ(q), qjm.qjqueue.IfExistUnschedulableQ(q), q, q.ResourceVersion, q.Status) + klog.V(4).Infof("[backoff] %s/%s move to unschedulableQ before sleep for %d seconds. activeQ=%t Unsched=%t Version=%s Status=%+v", q.Namespace, q.Name, + qjm.config.BackoffTimeOrDefault(defaultBackoffTime), qjm.qjqueue.IfExistActiveQ(q), qjm.qjqueue.IfExistUnschedulableQ(q), q.ResourceVersion, q.Status) time.Sleep(time.Duration(qjm.config.BackoffTimeOrDefault(defaultBackoffTime)) * time.Second) qjm.qjqueue.MoveToActiveQueueIfExists(q) @@ -1447,7 +1515,7 @@ func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper) { // TODO: should we really return? return } - klog.V(6).Infof("[UpdateQueueJobs] %s/%s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Namespace, newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status) + klog.V(6).Infof("[UpdateQueueJobs] %s/%s: qjqueue=%t Version=%s Status=%+v", newjob.Namespace, newjob.Name, qjm.qjqueue.IfExist(newjob), newjob.ResourceVersion, newjob.Status) // set appwrapper status to Complete or RunningHoldCompletion derivedAwStatus := qjm.getAppWrapperCompletionStatus(newjob) @@ -1509,7 +1577,7 @@ func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper) { qjm.eventQueue.Delete(updateQj) qjm.qjqueue.Delete(updateQj) } - klog.Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion, + klog.V(4).Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion, newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed) } } @@ -1546,7 +1614,7 @@ func (cc *XController) addQueueJob(obj interface{}) { klog.V(6).Infof("[Informer-addQJ] %s/%s Delay=%.6f seconds CreationTimestamp=%s ControllerFirstTimestamp=%s", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.CreationTimestamp, qj.Status.ControllerFirstTimestamp) - klog.V(6).Infof("[Informer-addQJ] enqueue %s/%s &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, qj, qj.ResourceVersion, qj.Status) + klog.V(6).Infof("[Informer-addQJ] enqueue %s/%s Version=%s Status=%+v", qj.Namespace, qj.Name, qj.ResourceVersion, qj.Status) // Requeue the item to be processed again in 30 seconds. // TODO: tune the frequency of reprocessing an AW @@ -1759,7 +1827,7 @@ func (cc *XController) agentEventQueueWorker() { return nil } - klog.V(3).Infof("[Controller: Dispatcher Mode] XQJ Status Update from AGENT: Name:%s, Namespace:%s, Status: %+v\n", queuejob.Name, queuejob.Namespace, queuejob.Status) + klog.V(4).Infof("[Controller: Dispatcher Mode] XQJ Status Update from AGENT: Name:%s, Namespace:%s, Status: %+v\n", queuejob.Name, queuejob.Namespace, queuejob.Status) // sync AppWrapper if err := cc.updateQueueJobStatus(ctx, queuejob); err != nil { @@ -1849,7 +1917,7 @@ func (cc *XController) worker() { } queuejob.Status.State = arbv1.AppWrapperStateEnqueued - klog.V(10).Infof("[worker] before add to activeQ %s/%s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", queuejob.Namespace, queuejob.Name, cc.qjqueue.IfExistActiveQ(queuejob), cc.qjqueue.IfExistUnschedulableQ(queuejob), queuejob, queuejob.ResourceVersion, queuejob.Status) + klog.V(10).Infof("[worker] before add to activeQ %s/%s activeQ=%t Unsched=%t Version=%s Status=%+v", queuejob.Namespace, queuejob.Name, cc.qjqueue.IfExistActiveQ(queuejob), cc.qjqueue.IfExistUnschedulableQ(queuejob), queuejob.ResourceVersion, queuejob.Status) index := getIndexOfMatchedCondition(queuejob, arbv1.AppWrapperCondQueueing, "AwaitingHeadOfLine") if index < 0 { queuejob.Status.QueueJobState = arbv1.AppWrapperCondQueueing @@ -1947,7 +2015,7 @@ func (cc *XController) syncQueueJob(ctx context.Context, qj *arbv1.AppWrapper) e klog.Errorf("[syncQueueJob] Error updating pod status counts for AppWrapper job: %s/%s, err=%+v", qj.Namespace, qj.Name, err) return err } - klog.Infof("[syncQueueJob] Pod counts updated for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", awNew.Namespace, awNew.Name, awNew.ResourceVersion, + klog.V(4).Infof("[syncQueueJob] Pod counts updated for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", awNew.Namespace, awNew.Name, awNew.ResourceVersion, awNew.Status.CanRun, awNew.Status.State, awNew.Status.Pending, awNew.Status.Running, awNew.Status.Succeeded, awNew.Status.Failed) // Update etcd conditions if AppWrapper Job has at least 1 running pod and transitioning from dispatched to running. @@ -2080,7 +2148,7 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper, } return nil } else if qj.Status.CanRun && qj.Status.State == arbv1.AppWrapperStateActive { - klog.Infof("[manageQueueJob] Getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", qj.Namespace, qj.Name, qj.ResourceVersion, + klog.V(4).Infof("[manageQueueJob] Getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", qj.Namespace, qj.Name, qj.ResourceVersion, qj.Status.CanRun, qj.Status.State, qj.Status.Pending, qj.Status.Running, qj.Status.Succeeded, qj.Status.Failed) } else if podPhaseChanges { // Continued bug fix @@ -2106,9 +2174,9 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper, qj.Status.State = arbv1.AppWrapperStateEnqueued if cc.qjqueue.IfExistUnschedulableQ(qj) { - klog.V(10).Infof("[manageQueueJob] [Dispatcher] leaving '%s/%s' to qjqueue.UnschedulableQ activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + klog.V(10).Infof("[manageQueueJob] [Dispatcher] leaving '%s/%s' to qjqueue.UnschedulableQ activeQ=%t Unsched=%t Version=%s Status=%+v", qj.Namespace, qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) } else { - klog.V(10).Infof("[manageQueueJob] [Dispatcher] before add to activeQ '%s/%s' activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + klog.V(10).Infof("[manageQueueJob] [Dispatcher] before add to activeQ '%s/%s' activeQ=%t Unsched=%t Version=%s Status=%+v", qj.Namespace, qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) qj.Status.QueueJobState = arbv1.AppWrapperCondQueueing qj.Status.FilterIgnore = true // Update Queueing status, add to qjqueue for ScheduleNext err := cc.updateStatusInEtcdWithRetry(ctx, qj, "manageQueueJob - setQueueing") diff --git a/pkg/controller/queuejobresources/genericresource/genericresource.go b/pkg/controller/queuejobresources/genericresource/genericresource.go index b9686153..3cff0416 100644 --- a/pkg/controller/queuejobresources/genericresource/genericresource.go +++ b/pkg/controller/queuejobresources/genericresource/genericresource.go @@ -193,9 +193,9 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG return name, gvk, err } -//SyncQueueJob uses dynamic clients to unwrap (spawn) items inside genericItems block, it is used to create resources inside etcd and return errors when -//unwrapping fails. -//More context here: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/598 +// SyncQueueJob uses dynamic clients to unwrap (spawn) items inside genericItems block, it is used to create resources inside etcd and return errors when +// unwrapping fails. +// More context here: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/598 func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperGenericResource) (podList []*v1.Pod, err error) { startTime := time.Now() defer func() { @@ -329,7 +329,6 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra if errors.IsAlreadyExists(err) { klog.V(4).Infof("%v\n", err.Error()) } else { - klog.Errorf("Error creating the object `%v`, the error is `%v`", newName, errors.ReasonForError(err)) return []*v1.Pod{}, err } } @@ -437,7 +436,7 @@ func hasFields(obj runtime.RawExtension) (hasFields bool, replica float64, conta containerList, isFound, _ := unstructured.NestedSlice(subspec, "containers") if !isFound { - klog.Warningf("[hasFields] No containers field found in raw object: %#v", subspec) + klog.V(6).Infof("[hasFields] No containers field found in raw object: %#v", subspec) return false, 0, nil } objContainers := make([]v1.Container, len(containerList))