From 1d3f6d9dc8265f77084977441f20439e81e82865 Mon Sep 17 00:00:00 2001 From: Bobbins228 Date: Thu, 2 Nov 2023 15:28:02 +0000 Subject: [PATCH 1/6] Revised logs at level 3 & 4, added required resources log --- .../queuejob/queuejob_controller_ex.go | 139 ++++++++++++++---- .../genericresource/genericresource.go | 9 +- 2 files changed, 114 insertions(+), 34 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 0b7ca80a..8248daa1 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -24,8 +24,10 @@ import ( "math" "math/rand" "reflect" + "regexp" "runtime/debug" "sort" + "strconv" "strings" "sync" "time" @@ -216,7 +218,7 @@ func (qjm *XController) allocatableCapacity() *clusterstateapi.Resource { } } } - klog.Infof("[allocatableCapacity] The available capacity to dispatch appwrapper is %v and time took to calculate is %v", capacity, time.Since(startTime)) + klog.V(4).Infof("[allocatableCapacity] The available capacity to dispatch appwrapper is %v and time took to calculate is %v", capacity, time.Since(startTime)) return capacity } @@ -445,7 +447,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { } else { // Only back-off AWs that are in state running and not in state Failed if updateNewJob.Status.State != arbv1.AppWrapperStateFailed { - klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name) + klog.V(4).Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name) qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message)) } } @@ -921,6 +923,67 @@ func (qjm *XController) chooseAgent(ctx context.Context, qj *arbv1.AppWrapper) s return "" } +// Function for calculating required resources to run AppWrapper and printing a log if there is a required increase for resources. +func calculateRequiredResources(namespace string, appwrapperName string, requestedResources string, totalAvailable string) { + cpuRegex := regexp.MustCompile(`cpu (\d+(\.\d+)?)`) + memoryRegex := regexp.MustCompile(`memory (\d+(\.\d+)?)`) + gpuRegex := regexp.MustCompile(`GPU (\d+)`) + + // requested resources values // + requestedCPU := cpuRegex.FindStringSubmatch(requestedResources) + requestedMemory := memoryRegex.FindStringSubmatch(requestedResources) + requestedGPU := gpuRegex.FindStringSubmatch(requestedResources) + + // available resources values // + availableCPU := cpuRegex.FindStringSubmatch(totalAvailable) + availableMemory := memoryRegex.FindStringSubmatch(totalAvailable) + availableGPU := gpuRegex.FindStringSubmatch(totalAvailable) + + // convert values to float // + requestedCPUValue, err := strconv.ParseFloat(requestedCPU[1], 64) + if err != nil { + klog.Warningf("Error parsing CPU value: ", err) + } + availableCPUValue, err := strconv.ParseFloat(availableCPU[1], 64) + if err != nil { + klog.Warningf("Error parsing CPU value: ", err) + } + requestedMemoryValue, err := strconv.ParseFloat(requestedMemory[1], 64) + if err != nil { + klog.Warningf("Error parsing Memory value: ", err) + } + availableMemoryValue, err := strconv.ParseFloat(availableMemory[1], 64) + if err != nil { + klog.Warningf("Error parsing Memory value: ", err) + } + requestedGPUValue, err := strconv.ParseFloat(requestedGPU[1], 64) + if err != nil { + klog.Warningf("Error parsing GPU value: ", err) + } + availableGPUValue, err := strconv.ParseFloat(availableGPU[1], 64) + if err != nil { + klog.Warningf("Error parsing GPU value: ", err) + } + + // Get required resources // + requiredCPU := requestedCPUValue - availableCPUValue + if requiredCPU < availableCPUValue { + requiredCPU = 0 + } + requiredMemory := requestedMemoryValue - availableMemoryValue + if requiredMemory < availableMemoryValue { + requiredMemory = 0 + } + requiredGPU := requestedGPUValue - availableGPUValue + if requiredGPU < availableGPUValue { + requiredGPU = 0 + } + if requiredCPU != 0 || requiredMemory != 0 || requiredGPU != 0 { + // This is important information to know hence no log level + klog.Infof("[ScheduleNext] Appwrapper '%s/%s' requires additional resources CPU: %f, Memory: %f, GPU: %f", namespace, appwrapperName, requiredCPU, requiredMemory, requiredGPU) + } +} + // Thread to find queue-job(QJ) for next schedule func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { ctx := context.Background() @@ -937,7 +1000,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { scheduleNextRetrier.SetJitter(0.05) // Retry the execution err = scheduleNextRetrier.Run(func() error { - klog.Infof("[ScheduleNext] activeQ.Pop %s/%s *Delay=%.6f seconds RemainingLength=%d &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj, + klog.V(6).Infof("[ScheduleNext] activeQ.Pop %s/%s *Delay=%.6f seconds RemainingLength=%d Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj.ResourceVersion, qj.Status) apiCacheAWJob, retryErr := qjm.getAppWrapper(qj.Namespace, qj.Name, "[ScheduleNext] -- get fresh copy after queue pop") @@ -995,7 +1058,8 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.V(3).Infof("[ScheduleNext] Cannot pop QueueJob from qjqueue! err=%#v", retryErr) return err } - klog.V(3).Infof("[ScheduleNext] activeQ.Pop_afterPriorityUpdate %s/%s *Delay=%.6f seconds RemainingLength=%d &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj, qj.ResourceVersion, qj.Status) + klog.V(3).Infof("[ScheduleNext] activeQ.Pop_afterPriorityUpdate %s/%s *Delay=%.6f seconds RemainingLength=%d", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length()) + klog.V(4).Infof("[ScheduleNext] activeQ.Pop_afterPriorityUpdate %s/%s *Delay=%.6f seconds RemainingLength=%d Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj.ResourceVersion, qj.Status) apiCacheAWJob, retryErr := qjm.getAppWrapper(qj.Namespace, qj.Name, "[ScheduleNext] -- after dynamic priority pop") if retryErr != nil { if apierrors.IsNotFound(retryErr) { @@ -1005,7 +1069,8 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { return err } if apiCacheAWJob.Status.CanRun { - klog.Infof("[ScheduleNext] AppWrapper job: %s/%s from API is already scheduled. Ignoring request: Status=%+v", qj.Namespace, qj.Name, qj.Status) + klog.V(3).Infof("[ScheduleNext] AppWrapper job: %s/%s from API is already scheduled. Ignoring request", qj.Namespace, qj.Name) + klog.V(4).Infof("[ScheduleNext] AppWrapper job: %s/%s from API is already scheduled. Ignoring request: Status=%+v", qj.Namespace, qj.Name, qj.Status) return nil } apiCacheAWJob.DeepCopyInto(qj) @@ -1030,9 +1095,11 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.V(4).Infof("[ScheduleNext] after Pop qjqLength=%d qj %s/%s Version=%s activeQ=%t Unsched=%t Status=%v", qjm.qjqueue.Length(), qj.Namespace, qj.Name, qj.ResourceVersion, qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.Status) if qjm.isDispatcher { - klog.Infof("[ScheduleNext] [Dispatcher Mode] Attempting to dispatch next appwrapper: '%s/%s Status=%v", qj.Namespace, qj.Name, qj.Status) + klog.V(3).Infof("[ScheduleNext] [Dispatcher Mode] Attempting to dispatch next appwrapper: '%s/%s", qj.Namespace, qj.Name) + klog.V(4).Infof("[ScheduleNext] [Dispatcher Mode] Attempting to dispatch next appwrapper: '%s/%s Status=%v", qj.Namespace, qj.Name, qj.Status) } else { - klog.Infof("[ScheduleNext] [Agent Mode] Attempting to dispatch next appwrapper: '%s/%s' Status=%v", qj.Namespace, qj.Name, qj.Status) + klog.V(3).Infof("[ScheduleNext] [Agent Mode] Attempting to dispatch next appwrapper: '%s/%s'", qj.Namespace, qj.Name) + klog.V(4).Infof("[ScheduleNext] [Agent Mode] Attempting to dispatch next appwrapper: '%s/%s' Status=%v", qj.Namespace, qj.Name, qj.Status) } dispatchFailedReason := "AppWrapperNotRunnable." @@ -1110,6 +1177,8 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { resources, proposedPreemptions := qjm.getAggregatedAvailableResourcesPriority( unallocatedResources, priorityindex, qj, "") klog.Infof("[ScheduleNext] [Agent Mode] Appwrapper '%s/%s' with resources %v to be scheduled on aggregated idle resources %v", qj.Namespace, qj.Name, aggqj, resources) + // call calculateRequiredResources to log the remaining required resources for the app wrapper + calculateRequiredResources(qj.Namespace, qj.Name, aggqj.String(), resources.String()) // Jobs dispatched with quota management may be borrowing quota from other tree nodes making those jobs preemptable, regardless of their priority. // Cluster resources need to be considered to determine if both quota and resources (after deleting borrowing AppWrappers) are availabe for the new AppWrapper @@ -1167,15 +1236,17 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.Info("%s %s %s", quotaFits, preemptAWs, msg) if quotaFits { - klog.Infof("[ScheduleNext] [Agent mode] quota evaluation successful for app wrapper '%s/%s' activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", - qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + klog.V(3).Infof("[ScheduleNext] [Agent mode] quota evaluation successful for app wrapper '%s/%s'", qj.Namespace, qj.Name) + klog.V(4).Infof("[ScheduleNext] [Agent mode] quota evaluation successful for app wrapper '%s/%s' activeQ=%t Unsched=%t Version=%s Status=%+v", + qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) // Set any jobs that are marked for preemption qjm.preemptAWJobs(ctx, preemptAWs) } else { // Not enough free quota to dispatch appwrapper dispatchFailedMessage = "Insufficient quota and/or resources to dispatch AppWrapper." dispatchFailedReason = "quota limit exceeded" - klog.Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s", - qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, msg) + klog.V(3).Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, msg=%s", qj.Namespace, qj.Name, msg) + klog.V(4).Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t Version=%s Status=%+v msg=%s", + qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status, msg) // Call update etcd here to retrigger AW execution for failed quota // TODO: quota management tests fail if this is converted into go-routine, need to inspect why? qjm.backoff(context.Background(), qj, dispatchFailedReason, dispatchFailedMessage) @@ -1188,19 +1259,22 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { fits = false } } else { - klog.V(4).Infof("[ScheduleNext] [Agent Mode] Quota evaluation not enabled for '%s/%s' at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Namespace, - qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + klog.V(4).Infof("[ScheduleNext] [Agent Mode] Quota evaluation not enabled for '%s/%s' at %s activeQ=%t Unsched=%t Version=%s Status=%+v", qj.Namespace, + qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) if aggqj.LessEqual(resources) { // Check if enough resources to dispatch fits = true - klog.Infof("[ScheduleNext] [Agent Mode] available resource successful check for '%s/%s' at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v.", - qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + klog.V(3).Infof("[ScheduleNext] [Agent Mode] available resource successful check for '%s/%s'", qj.Namespace, qj.Name) + klog.V(4).Infof("[ScheduleNext] [Agent Mode] available resource successful check for '%s/%s' at %s activeQ=%t Unsched=%t Version=%s Status=%+v.", + qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) } else { // Not enough free resources to dispatch HOL fits = false dispatchFailedMessage = "Insufficient resources to dispatch AppWrapper." - klog.Infof("[ScheduleNext] [Agent Mode] Failed to dispatch app wrapper '%s/%s' due to insufficient resources, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", + klog.V(3).Infof("[ScheduleNext] [Agent Mode] Failed to dispatch app wrapper '%s/%s' due to insufficient resources", + qj.Namespace, qj.Name) + klog.V(4).Infof("[ScheduleNext] [Agent Mode] Failed to dispatch app wrapper '%s/%s' due to insufficient resources, activeQ=%t Unsched=%t Version=%s Status=%+v", qj.Namespace, qj.Name, qjm.qjqueue.IfExistActiveQ(qj), - qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) // TODO: Remove forwarded logic as a big AW will never be forwarded forwarded = true // should we call backoff or update etcd? @@ -1262,8 +1336,10 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { fowardingLoopCount += 1 } if !forwarded { // start thread to backoff - klog.Infof("[ScheduleNext] [Agent Mode] backing off app wrapper '%s/%s' after waiting for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", - qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + klog.V(3).Infof("[ScheduleNext] [Agent Mode] backing off app wrapper '%s/%s' after waiting for %s", + qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime)) + klog.V(4).Infof("[ScheduleNext] [Agent Mode] backing off app wrapper '%s/%s' after waiting for %s activeQ=%t Unsched=%t Version=%s Status=%+v", + qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) if qjm.quotaManager != nil && quotaFits { qjm.quotaManager.Release(qj) } @@ -1401,8 +1477,10 @@ func (qjm *XController) backoff(ctx context.Context, q *arbv1.AppWrapper, reason klog.Errorf("[backoff] Failed to update status for %s/%s. Continuing with possible stale object without updating conditions. err=%s", q.Namespace, q.Name, err) } qjm.qjqueue.AddUnschedulableIfNotPresent(q) - klog.V(3).Infof("[backoff] %s/%s move to unschedulableQ before sleep for %d seconds. activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", q.Namespace, q.Name, - qjm.config.BackoffTimeOrDefault(defaultBackoffTime), qjm.qjqueue.IfExistActiveQ(q), qjm.qjqueue.IfExistUnschedulableQ(q), q, q.ResourceVersion, q.Status) + klog.V(3).Infof("[backoff] %s/%s move to unschedulableQ before sleep for %d seconds. Unsched=%t", q.Namespace, q.Name, + qjm.config.BackoffTimeOrDefault(defaultBackoffTime), qjm.qjqueue.IfExistUnschedulableQ(q)) + klog.V(4).Infof("[backoff] %s/%s move to unschedulableQ before sleep for %d seconds. activeQ=%t Unsched=%t Version=%s Status=%+v", q.Namespace, q.Name, + qjm.config.BackoffTimeOrDefault(defaultBackoffTime), qjm.qjqueue.IfExistActiveQ(q), qjm.qjqueue.IfExistUnschedulableQ(q), q.ResourceVersion, q.Status) time.Sleep(time.Duration(qjm.config.BackoffTimeOrDefault(defaultBackoffTime)) * time.Second) qjm.qjqueue.MoveToActiveQueueIfExists(q) @@ -1447,7 +1525,7 @@ func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper) { // TODO: should we really return? return } - klog.V(6).Infof("[UpdateQueueJobs] %s/%s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Namespace, newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status) + klog.V(6).Infof("[UpdateQueueJobs] %s/%s: qjqueue=%t Version=%s Status=%+v", newjob.Namespace, newjob.Name, qjm.qjqueue.IfExist(newjob), newjob.ResourceVersion, newjob.Status) // set appwrapper status to Complete or RunningHoldCompletion derivedAwStatus := qjm.getAppWrapperCompletionStatus(newjob) @@ -1509,7 +1587,9 @@ func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper) { qjm.eventQueue.Delete(updateQj) qjm.qjqueue.Delete(updateQj) } - klog.Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion, + klog.V(3).Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Status.CanRun=%t Status.State=%s", newjob.Namespace, newjob.Name, + newjob.Status.CanRun, newjob.Status.State) + klog.V(4).Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion, newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed) } } @@ -1546,7 +1626,7 @@ func (cc *XController) addQueueJob(obj interface{}) { klog.V(6).Infof("[Informer-addQJ] %s/%s Delay=%.6f seconds CreationTimestamp=%s ControllerFirstTimestamp=%s", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.CreationTimestamp, qj.Status.ControllerFirstTimestamp) - klog.V(6).Infof("[Informer-addQJ] enqueue %s/%s &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, qj, qj.ResourceVersion, qj.Status) + klog.V(6).Infof("[Informer-addQJ] enqueue %s/%s Version=%s Status=%+v", qj.Namespace, qj.Name, qj.ResourceVersion, qj.Status) // Requeue the item to be processed again in 30 seconds. // TODO: tune the frequency of reprocessing an AW @@ -1849,7 +1929,7 @@ func (cc *XController) worker() { } queuejob.Status.State = arbv1.AppWrapperStateEnqueued - klog.V(10).Infof("[worker] before add to activeQ %s/%s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", queuejob.Namespace, queuejob.Name, cc.qjqueue.IfExistActiveQ(queuejob), cc.qjqueue.IfExistUnschedulableQ(queuejob), queuejob, queuejob.ResourceVersion, queuejob.Status) + klog.V(10).Infof("[worker] before add to activeQ %s/%s activeQ=%t Unsched=%t Version=%s Status=%+v", queuejob.Namespace, queuejob.Name, cc.qjqueue.IfExistActiveQ(queuejob), cc.qjqueue.IfExistUnschedulableQ(queuejob), queuejob.ResourceVersion, queuejob.Status) index := getIndexOfMatchedCondition(queuejob, arbv1.AppWrapperCondQueueing, "AwaitingHeadOfLine") if index < 0 { queuejob.Status.QueueJobState = arbv1.AppWrapperCondQueueing @@ -1947,7 +2027,7 @@ func (cc *XController) syncQueueJob(ctx context.Context, qj *arbv1.AppWrapper) e klog.Errorf("[syncQueueJob] Error updating pod status counts for AppWrapper job: %s/%s, err=%+v", qj.Namespace, qj.Name, err) return err } - klog.Infof("[syncQueueJob] Pod counts updated for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", awNew.Namespace, awNew.Name, awNew.ResourceVersion, + klog.V(4).Infof("[syncQueueJob] Pod counts updated for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", awNew.Namespace, awNew.Name, awNew.ResourceVersion, awNew.Status.CanRun, awNew.Status.State, awNew.Status.Pending, awNew.Status.Running, awNew.Status.Succeeded, awNew.Status.Failed) // Update etcd conditions if AppWrapper Job has at least 1 running pod and transitioning from dispatched to running. @@ -2080,7 +2160,8 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper, } return nil } else if qj.Status.CanRun && qj.Status.State == arbv1.AppWrapperStateActive { - klog.Infof("[manageQueueJob] Getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", qj.Namespace, qj.Name, qj.ResourceVersion, + klog.V(3).Infof("[manageQueueJob] Getting completion status for app wrapper '%s/%s' Status.CanRun=%t Status.State=%s", qj.Namespace, qj.Name, qj.Status.CanRun, qj.Status.State) + klog.V(4).Infof("[manageQueueJob] Getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", qj.Namespace, qj.Name, qj.ResourceVersion, qj.Status.CanRun, qj.Status.State, qj.Status.Pending, qj.Status.Running, qj.Status.Succeeded, qj.Status.Failed) } else if podPhaseChanges { // Continued bug fix @@ -2106,9 +2187,9 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper, qj.Status.State = arbv1.AppWrapperStateEnqueued if cc.qjqueue.IfExistUnschedulableQ(qj) { - klog.V(10).Infof("[manageQueueJob] [Dispatcher] leaving '%s/%s' to qjqueue.UnschedulableQ activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + klog.V(10).Infof("[manageQueueJob] [Dispatcher] leaving '%s/%s' to qjqueue.UnschedulableQ activeQ=%t Unsched=%t Version=%s Status=%+v", qj.Namespace, qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) } else { - klog.V(10).Infof("[manageQueueJob] [Dispatcher] before add to activeQ '%s/%s' activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + klog.V(10).Infof("[manageQueueJob] [Dispatcher] before add to activeQ '%s/%s' activeQ=%t Unsched=%t Version=%s Status=%+v", qj.Namespace, qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) qj.Status.QueueJobState = arbv1.AppWrapperCondQueueing qj.Status.FilterIgnore = true // Update Queueing status, add to qjqueue for ScheduleNext err := cc.updateStatusInEtcdWithRetry(ctx, qj, "manageQueueJob - setQueueing") diff --git a/pkg/controller/queuejobresources/genericresource/genericresource.go b/pkg/controller/queuejobresources/genericresource/genericresource.go index b9686153..3cff0416 100644 --- a/pkg/controller/queuejobresources/genericresource/genericresource.go +++ b/pkg/controller/queuejobresources/genericresource/genericresource.go @@ -193,9 +193,9 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG return name, gvk, err } -//SyncQueueJob uses dynamic clients to unwrap (spawn) items inside genericItems block, it is used to create resources inside etcd and return errors when -//unwrapping fails. -//More context here: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/598 +// SyncQueueJob uses dynamic clients to unwrap (spawn) items inside genericItems block, it is used to create resources inside etcd and return errors when +// unwrapping fails. +// More context here: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/598 func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperGenericResource) (podList []*v1.Pod, err error) { startTime := time.Now() defer func() { @@ -329,7 +329,6 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra if errors.IsAlreadyExists(err) { klog.V(4).Infof("%v\n", err.Error()) } else { - klog.Errorf("Error creating the object `%v`, the error is `%v`", newName, errors.ReasonForError(err)) return []*v1.Pod{}, err } } @@ -437,7 +436,7 @@ func hasFields(obj runtime.RawExtension) (hasFields bool, replica float64, conta containerList, isFound, _ := unstructured.NestedSlice(subspec, "containers") if !isFound { - klog.Warningf("[hasFields] No containers field found in raw object: %#v", subspec) + klog.V(6).Infof("[hasFields] No containers field found in raw object: %#v", subspec) return false, 0, nil } objContainers := make([]v1.Container, len(containerList)) From 7c1244abb7217fab45b5fa709e84f3ded42b2eaf Mon Sep 17 00:00:00 2001 From: Bobbins228 Date: Mon, 6 Nov 2023 14:26:17 +0000 Subject: [PATCH 2/6] Removed potentially duplicated logs updated warning-type logs --- .../queuejob/queuejob_controller_ex.go | 24 ++++--------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 8248daa1..f1536c19 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1058,7 +1058,6 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.V(3).Infof("[ScheduleNext] Cannot pop QueueJob from qjqueue! err=%#v", retryErr) return err } - klog.V(3).Infof("[ScheduleNext] activeQ.Pop_afterPriorityUpdate %s/%s *Delay=%.6f seconds RemainingLength=%d", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length()) klog.V(4).Infof("[ScheduleNext] activeQ.Pop_afterPriorityUpdate %s/%s *Delay=%.6f seconds RemainingLength=%d Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj.ResourceVersion, qj.Status) apiCacheAWJob, retryErr := qjm.getAppWrapper(qj.Namespace, qj.Name, "[ScheduleNext] -- after dynamic priority pop") if retryErr != nil { @@ -1069,7 +1068,6 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { return err } if apiCacheAWJob.Status.CanRun { - klog.V(3).Infof("[ScheduleNext] AppWrapper job: %s/%s from API is already scheduled. Ignoring request", qj.Namespace, qj.Name) klog.V(4).Infof("[ScheduleNext] AppWrapper job: %s/%s from API is already scheduled. Ignoring request: Status=%+v", qj.Namespace, qj.Name, qj.Status) return nil } @@ -1095,10 +1093,8 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.V(4).Infof("[ScheduleNext] after Pop qjqLength=%d qj %s/%s Version=%s activeQ=%t Unsched=%t Status=%v", qjm.qjqueue.Length(), qj.Namespace, qj.Name, qj.ResourceVersion, qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.Status) if qjm.isDispatcher { - klog.V(3).Infof("[ScheduleNext] [Dispatcher Mode] Attempting to dispatch next appwrapper: '%s/%s", qj.Namespace, qj.Name) klog.V(4).Infof("[ScheduleNext] [Dispatcher Mode] Attempting to dispatch next appwrapper: '%s/%s Status=%v", qj.Namespace, qj.Name, qj.Status) } else { - klog.V(3).Infof("[ScheduleNext] [Agent Mode] Attempting to dispatch next appwrapper: '%s/%s'", qj.Namespace, qj.Name) klog.V(4).Infof("[ScheduleNext] [Agent Mode] Attempting to dispatch next appwrapper: '%s/%s' Status=%v", qj.Namespace, qj.Name, qj.Status) } @@ -1236,7 +1232,6 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.Info("%s %s %s", quotaFits, preemptAWs, msg) if quotaFits { - klog.V(3).Infof("[ScheduleNext] [Agent mode] quota evaluation successful for app wrapper '%s/%s'", qj.Namespace, qj.Name) klog.V(4).Infof("[ScheduleNext] [Agent mode] quota evaluation successful for app wrapper '%s/%s' activeQ=%t Unsched=%t Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) // Set any jobs that are marked for preemption @@ -1244,8 +1239,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { } else { // Not enough free quota to dispatch appwrapper dispatchFailedMessage = "Insufficient quota and/or resources to dispatch AppWrapper." dispatchFailedReason = "quota limit exceeded" - klog.V(3).Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, msg=%s", qj.Namespace, qj.Name, msg) - klog.V(4).Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t Version=%s Status=%+v msg=%s", + klog.Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t Version=%s Status=%+v msg=%s", qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status, msg) // Call update etcd here to retrigger AW execution for failed quota // TODO: quota management tests fail if this is converted into go-routine, need to inspect why? @@ -1264,15 +1258,12 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { if aggqj.LessEqual(resources) { // Check if enough resources to dispatch fits = true - klog.V(3).Infof("[ScheduleNext] [Agent Mode] available resource successful check for '%s/%s'", qj.Namespace, qj.Name) klog.V(4).Infof("[ScheduleNext] [Agent Mode] available resource successful check for '%s/%s' at %s activeQ=%t Unsched=%t Version=%s Status=%+v.", qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) } else { // Not enough free resources to dispatch HOL fits = false dispatchFailedMessage = "Insufficient resources to dispatch AppWrapper." - klog.V(3).Infof("[ScheduleNext] [Agent Mode] Failed to dispatch app wrapper '%s/%s' due to insufficient resources", - qj.Namespace, qj.Name) - klog.V(4).Infof("[ScheduleNext] [Agent Mode] Failed to dispatch app wrapper '%s/%s' due to insufficient resources, activeQ=%t Unsched=%t Version=%s Status=%+v", + klog.Infof("[ScheduleNext] [Agent Mode] Failed to dispatch app wrapper '%s/%s' due to insufficient resources, activeQ=%t Unsched=%t Version=%s Status=%+v", qj.Namespace, qj.Name, qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) // TODO: Remove forwarded logic as a big AW will never be forwarded @@ -1336,9 +1327,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { fowardingLoopCount += 1 } if !forwarded { // start thread to backoff - klog.V(3).Infof("[ScheduleNext] [Agent Mode] backing off app wrapper '%s/%s' after waiting for %s", - qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime)) - klog.V(4).Infof("[ScheduleNext] [Agent Mode] backing off app wrapper '%s/%s' after waiting for %s activeQ=%t Unsched=%t Version=%s Status=%+v", + klog.Infof("[ScheduleNext] [Agent Mode] backing off app wrapper '%s/%s' after waiting for %s activeQ=%t Unsched=%t Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.ResourceVersion, qj.Status) if qjm.quotaManager != nil && quotaFits { qjm.quotaManager.Release(qj) @@ -1477,8 +1466,6 @@ func (qjm *XController) backoff(ctx context.Context, q *arbv1.AppWrapper, reason klog.Errorf("[backoff] Failed to update status for %s/%s. Continuing with possible stale object without updating conditions. err=%s", q.Namespace, q.Name, err) } qjm.qjqueue.AddUnschedulableIfNotPresent(q) - klog.V(3).Infof("[backoff] %s/%s move to unschedulableQ before sleep for %d seconds. Unsched=%t", q.Namespace, q.Name, - qjm.config.BackoffTimeOrDefault(defaultBackoffTime), qjm.qjqueue.IfExistUnschedulableQ(q)) klog.V(4).Infof("[backoff] %s/%s move to unschedulableQ before sleep for %d seconds. activeQ=%t Unsched=%t Version=%s Status=%+v", q.Namespace, q.Name, qjm.config.BackoffTimeOrDefault(defaultBackoffTime), qjm.qjqueue.IfExistActiveQ(q), qjm.qjqueue.IfExistUnschedulableQ(q), q.ResourceVersion, q.Status) time.Sleep(time.Duration(qjm.config.BackoffTimeOrDefault(defaultBackoffTime)) * time.Second) @@ -1587,8 +1574,6 @@ func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper) { qjm.eventQueue.Delete(updateQj) qjm.qjqueue.Delete(updateQj) } - klog.V(3).Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Status.CanRun=%t Status.State=%s", newjob.Namespace, newjob.Name, - newjob.Status.CanRun, newjob.Status.State) klog.V(4).Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion, newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed) } @@ -1839,7 +1824,7 @@ func (cc *XController) agentEventQueueWorker() { return nil } - klog.V(3).Infof("[Controller: Dispatcher Mode] XQJ Status Update from AGENT: Name:%s, Namespace:%s, Status: %+v\n", queuejob.Name, queuejob.Namespace, queuejob.Status) + klog.V(4).Infof("[Controller: Dispatcher Mode] XQJ Status Update from AGENT: Name:%s, Namespace:%s, Status: %+v\n", queuejob.Name, queuejob.Namespace, queuejob.Status) // sync AppWrapper if err := cc.updateQueueJobStatus(ctx, queuejob); err != nil { @@ -2160,7 +2145,6 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper, } return nil } else if qj.Status.CanRun && qj.Status.State == arbv1.AppWrapperStateActive { - klog.V(3).Infof("[manageQueueJob] Getting completion status for app wrapper '%s/%s' Status.CanRun=%t Status.State=%s", qj.Namespace, qj.Name, qj.Status.CanRun, qj.Status.State) klog.V(4).Infof("[manageQueueJob] Getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", qj.Namespace, qj.Name, qj.ResourceVersion, qj.Status.CanRun, qj.Status.State, qj.Status.Pending, qj.Status.Running, qj.Status.Succeeded, qj.Status.Failed) From d94454dd88255abff59c6feb25db21aabb7bed76 Mon Sep 17 00:00:00 2001 From: Bobbins228 Date: Tue, 7 Nov 2023 09:45:51 +0000 Subject: [PATCH 3/6] Updated comments --- pkg/controller/queuejob/queuejob_controller_ex.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index f1536c19..f06d2326 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -929,17 +929,17 @@ func calculateRequiredResources(namespace string, appwrapperName string, request memoryRegex := regexp.MustCompile(`memory (\d+(\.\d+)?)`) gpuRegex := regexp.MustCompile(`GPU (\d+)`) - // requested resources values // + // Requested resources values // requestedCPU := cpuRegex.FindStringSubmatch(requestedResources) requestedMemory := memoryRegex.FindStringSubmatch(requestedResources) requestedGPU := gpuRegex.FindStringSubmatch(requestedResources) - // available resources values // + // Available resources values // availableCPU := cpuRegex.FindStringSubmatch(totalAvailable) availableMemory := memoryRegex.FindStringSubmatch(totalAvailable) availableGPU := gpuRegex.FindStringSubmatch(totalAvailable) - // convert values to float // + // Convert values to float // requestedCPUValue, err := strconv.ParseFloat(requestedCPU[1], 64) if err != nil { klog.Warningf("Error parsing CPU value: ", err) From b189216c0e1b97e7bd229423727734e9885127fb Mon Sep 17 00:00:00 2001 From: Bobbins228 Date: Wed, 8 Nov 2023 15:45:57 +0000 Subject: [PATCH 4/6] Made Regex variables global --- pkg/controller/queuejob/queuejob_controller_ex.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index f06d2326..bf4584b4 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -66,6 +66,13 @@ import ( // defaultBackoffTime is the default backoff time in seconds const defaultBackoffTime = 20 +var ( + // Regex variables for extracting resource values + cpuRegex = regexp.MustCompile(`cpu (\d+(\.\d+)?)`) + memoryRegex = regexp.MustCompile(`memory (\d+(\.\d+)?)`) + gpuRegex = regexp.MustCompile(`GPU (\d+)`) +) + // XController the AppWrapper Controller type type XController struct { // MCAD configuration @@ -925,10 +932,6 @@ func (qjm *XController) chooseAgent(ctx context.Context, qj *arbv1.AppWrapper) s // Function for calculating required resources to run AppWrapper and printing a log if there is a required increase for resources. func calculateRequiredResources(namespace string, appwrapperName string, requestedResources string, totalAvailable string) { - cpuRegex := regexp.MustCompile(`cpu (\d+(\.\d+)?)`) - memoryRegex := regexp.MustCompile(`memory (\d+(\.\d+)?)`) - gpuRegex := regexp.MustCompile(`GPU (\d+)`) - // Requested resources values // requestedCPU := cpuRegex.FindStringSubmatch(requestedResources) requestedMemory := memoryRegex.FindStringSubmatch(requestedResources) From fb1aaed5113b31df54dc4fef1774e7de1f17befa Mon Sep 17 00:00:00 2001 From: Bobbins228 Date: Thu, 23 Nov 2023 09:39:53 +0000 Subject: [PATCH 5/6] Updated comment --- pkg/controller/queuejob/queuejob_controller_ex.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index bf4584b4..15cda830 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -68,9 +68,9 @@ const defaultBackoffTime = 20 var ( // Regex variables for extracting resource values - cpuRegex = regexp.MustCompile(`cpu (\d+(\.\d+)?)`) + cpuRegex = regexp.MustCompile(`cpu (\d+(\.\d+)?)`) memoryRegex = regexp.MustCompile(`memory (\d+(\.\d+)?)`) - gpuRegex = regexp.MustCompile(`GPU (\d+)`) + gpuRegex = regexp.MustCompile(`GPU (\d+)`) ) // XController the AppWrapper Controller type @@ -930,7 +930,7 @@ func (qjm *XController) chooseAgent(ctx context.Context, qj *arbv1.AppWrapper) s return "" } -// Function for calculating required resources to run AppWrapper and printing a log if there is a required increase for resources. +// Function for calculating required resources to run AppWrapper and logging if there is a required increase for resources. func calculateRequiredResources(namespace string, appwrapperName string, requestedResources string, totalAvailable string) { // Requested resources values // requestedCPU := cpuRegex.FindStringSubmatch(requestedResources) From 6fbbd696fcfdd267645bf9305b9e5751c495a61b Mon Sep 17 00:00:00 2001 From: Bobbins228 Date: Tue, 28 Nov 2023 10:19:23 +0000 Subject: [PATCH 6/6] Review changes: allocatableCapacity log level --- pkg/controller/queuejob/queuejob_controller_ex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 15cda830..1ca27732 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -225,7 +225,7 @@ func (qjm *XController) allocatableCapacity() *clusterstateapi.Resource { } } } - klog.V(4).Infof("[allocatableCapacity] The available capacity to dispatch appwrapper is %v and time took to calculate is %v", capacity, time.Since(startTime)) + klog.V(2).Infof("[allocatableCapacity] The available capacity to dispatch appwrapper is %v and time took to calculate is %v", capacity, time.Since(startTime)) return capacity }