diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 333071ad6..245e56da7 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -972,178 +972,62 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes } defer metrics.MeasureSince([]string{"nomad", "job", "scale"}, time.Now()) - // Validate the arguments - namespace := args.Target[structs.ScalingTargetNamespace] - jobID := args.Target[structs.ScalingTargetJob] - groupName := args.Target[structs.ScalingTargetGroup] - if namespace != "" && namespace != args.RequestNamespace() { - return structs.NewErrRPCCoded(400, "namespace in payload did not match header") - } else if namespace == "" { - namespace = args.RequestNamespace() - } - if jobID != "" && jobID != args.JobID { - return fmt.Errorf("job ID in payload did not match URL") - } - if groupName == "" { - return structs.NewErrRPCCoded(400, "missing task group name for scaling action") - } - if args.Error && args.Count != nil { - return structs.NewErrRPCCoded(400, "scaling action should not contain count if error is true") - } - if args.Count != nil && *args.Count < 0 { - return structs.NewErrRPCCoded(400, "scaling action count can't be negative") + namespace := args.RequestNamespace() + + // Authorize request + aclObj, err := j.srv.ResolveToken(args.AuthToken) + if err != nil { + return err } - // Check for submit-job permissions - if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { - return err - } else if aclObj != nil { - hasScaleJob := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityScaleJob) - hasSubmitJob := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) + if aclObj != nil { + hasScaleJob := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityScaleJob) + hasSubmitJob := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilitySubmitJob) if !(hasScaleJob || hasSubmitJob) { return structs.ErrPermissionDenied } } - // Lookup the job + // Validate args + err = args.Validate() + if err != nil { + return err + } + + // Find job snap, err := j.srv.fsm.State().Snapshot() if err != nil { return err } + ws := memdb.NewWatchSet() job, err := snap.JobByID(ws, namespace, args.JobID) if err != nil { j.logger.Error("unable to lookup job", "error", err) return err } + if job == nil { return structs.NewErrRPCCoded(404, fmt.Sprintf("job %q not found", args.JobID)) } - var found *structs.TaskGroup + // Find target group in job TaskGroups + groupName := args.Target[structs.ScalingTargetGroup] + var group *structs.TaskGroup for _, tg := range job.TaskGroups { - if groupName == tg.Name { - found = tg + if tg.Name == groupName { + group = tg break } } - if found == nil { + + if group == nil { return structs.NewErrRPCCoded(400, fmt.Sprintf("task group %q specified for scaling does not exist in job", groupName)) } now := time.Now().UnixNano() - - // If the count is present, commit the job update via Raft - // for now, we'll do this even if count didn't change - prevCount := found.Count - if args.Count != nil { - - // if there is a scaling policy, check that the new count is within bounds - if found.Scaling != nil { - if *args.Count < found.Scaling.Min { - return structs.NewErrRPCCoded(400, - fmt.Sprintf("group count was less than scaling policy minimum: %d < %d", - *args.Count, found.Scaling.Min)) - } - if found.Scaling.Max < *args.Count { - return structs.NewErrRPCCoded(400, - fmt.Sprintf("group count was greater than scaling policy maximum: %d > %d", - *args.Count, found.Scaling.Max)) - } - } - - // Lookup the latest deployment, to see whether this scaling event should be blocked - d, err := snap.LatestDeploymentByJobID(ws, namespace, args.JobID) - if err != nil { - j.logger.Error("unable to lookup latest deployment", "error", err) - return err - } - // explicitly filter deployment by JobCreateIndex to be safe, because LatestDeploymentByJobID doesn't - if d != nil && d.JobCreateIndex == job.CreateIndex && d.Active() { - // attempt to register the scaling event - JobScalingBlockedByActiveDeployment := "job scaling blocked due to active deployment" - event := &structs.ScalingEventRequest{ - Namespace: job.Namespace, - JobID: job.ID, - TaskGroup: groupName, - ScalingEvent: &structs.ScalingEvent{ - Time: now, - PreviousCount: int64(prevCount), - Message: JobScalingBlockedByActiveDeployment, - Error: true, - Meta: map[string]interface{}{ - "OriginalMessage": args.Message, - "OriginalCount": *args.Count, - "OriginalMeta": args.Meta, - }, - }, - } - if _, _, err := j.srv.raftApply(structs.ScalingEventRegisterRequestType, event); err != nil { - // just log the error, this was a best-effort attempt - j.logger.Error("scaling event create failed during block scaling action", "error", err) - } - return structs.NewErrRPCCoded(400, JobScalingBlockedByActiveDeployment) - } - - truncCount := int(*args.Count) - if int64(truncCount) != *args.Count { - return structs.NewErrRPCCoded(400, - fmt.Sprintf("new scaling count is too large for TaskGroup.Count (int): %v", args.Count)) - } - // update the task group count - found.Count = truncCount - - registerReq := structs.JobRegisterRequest{ - Job: job, - EnforceIndex: true, - JobModifyIndex: job.ModifyIndex, - PolicyOverride: args.PolicyOverride, - WriteRequest: args.WriteRequest, - } - _, jobModifyIndex, err := j.srv.raftApply(structs.JobRegisterRequestType, registerReq) - if err != nil { - j.logger.Error("job register for scale failed", "error", err) - return err - } - reply.JobModifyIndex = jobModifyIndex - } else { - reply.JobModifyIndex = job.ModifyIndex - } - - // Only create an eval for non-dispatch jobs and if the count was provided - // for now, we'll do this even if count didn't change - if !job.IsPeriodic() && !job.IsParameterized() && args.Count != nil { - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: args.RequestNamespace(), - Priority: structs.JobDefaultPriority, - Type: structs.JobTypeService, - TriggeredBy: structs.EvalTriggerScaling, - JobID: args.JobID, - JobModifyIndex: reply.JobModifyIndex, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - WriteRequest: structs.WriteRequest{Region: args.Region}, - } - - // Commit this evaluation via Raft - _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - j.logger.Error("eval create failed", "error", err, "method", "scale") - return err - } - - reply.EvalID = eval.ID - reply.EvalCreateIndex = evalIndex - } else { - reply.EvalID = "" - reply.EvalCreateIndex = 0 - } + prevCount := int64(group.Count) event := &structs.ScalingEventRequest{ Namespace: job.Namespace, @@ -1151,16 +1035,119 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes TaskGroup: groupName, ScalingEvent: &structs.ScalingEvent{ Time: now, - PreviousCount: int64(prevCount), + PreviousCount: prevCount, Count: args.Count, Message: args.Message, Error: args.Error, Meta: args.Meta, }, } - if reply.EvalID != "" { - event.ScalingEvent.EvalID = &reply.EvalID + + if args.Count != nil { + // Further validation for count-based scaling event + if group.Scaling != nil { + if *args.Count < group.Scaling.Min { + return structs.NewErrRPCCoded(400, + fmt.Sprintf("group count was less than scaling policy minimum: %d < %d", + *args.Count, group.Scaling.Min)) + } + if group.Scaling.Max < *args.Count { + return structs.NewErrRPCCoded(400, + fmt.Sprintf("group count was greater than scaling policy maximum: %d > %d", + *args.Count, group.Scaling.Max)) + } + } + + // Update group count + group.Count = int(*args.Count) + + // Block scaling event if there's an active deployment + deployment, err := snap.LatestDeploymentByJobID(ws, namespace, args.JobID) + if err != nil { + j.logger.Error("unable to lookup latest deployment", "error", err) + return err + } + + if deployment != nil && deployment.Active() && deployment.JobCreateIndex == job.CreateIndex { + msg := "job scaling blocked due to active deployment" + _, _, err := j.srv.raftApply( + structs.ScalingEventRegisterRequestType, + &structs.ScalingEventRequest{ + Namespace: job.Namespace, + JobID: job.ID, + TaskGroup: groupName, + ScalingEvent: &structs.ScalingEvent{ + Time: now, + PreviousCount: prevCount, + Message: msg, + Error: true, + Meta: map[string]interface{}{ + "OriginalMessage": args.Message, + "OriginalCount": *args.Count, + "OriginalMeta": args.Meta, + }, + }, + }, + ) + if err != nil { + // just log the error, this was a best-effort attempt + j.logger.Error("scaling event create failed during block scaling action", "error", err) + } + return structs.NewErrRPCCoded(400, msg) + } + + // Commit the job update + _, jobModifyIndex, err := j.srv.raftApply( + structs.JobRegisterRequestType, + structs.JobRegisterRequest{ + Job: job, + EnforceIndex: true, + JobModifyIndex: job.ModifyIndex, + PolicyOverride: args.PolicyOverride, + WriteRequest: args.WriteRequest, + }, + ) + if err != nil { + j.logger.Error("job register for scale failed", "error", err) + return err + } + reply.JobModifyIndex = jobModifyIndex + + // Create an eval for non-dispatch jobs + if !(job.IsPeriodic() || job.IsParameterized()) { + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: namespace, + Priority: structs.JobDefaultPriority, + Type: structs.JobTypeService, + TriggeredBy: structs.EvalTriggerScaling, + JobID: args.JobID, + JobModifyIndex: reply.JobModifyIndex, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + + _, evalIndex, err := j.srv.raftApply( + structs.EvalUpdateRequestType, + &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + WriteRequest: structs.WriteRequest{Region: args.Region}, + }, + ) + if err != nil { + j.logger.Error("eval create failed", "error", err, "method", "scale") + return err + } + + reply.EvalID = eval.ID + reply.EvalCreateIndex = evalIndex + event.ScalingEvent.EvalID = &reply.EvalID + } + } else { + reply.JobModifyIndex = job.ModifyIndex } + _, eventIndex, err := j.srv.raftApply(structs.ScalingEventRegisterRequestType, event) if err != nil { j.logger.Error("scaling event create failed", "error", err) @@ -1168,7 +1155,9 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes } reply.Index = eventIndex + j.srv.setQueryMeta(&reply.QueryMeta) + return nil } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 9cbfe22b8..fd4000c51 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -715,6 +715,42 @@ type JobScaleRequest struct { WriteRequest } +// Validate is used to validate the arguments in the request +func (r *JobScaleRequest) Validate() error { + namespace := r.Target[ScalingTargetNamespace] + if namespace != "" && namespace != r.RequestNamespace() { + return NewErrRPCCoded(400, "namespace in payload did not match header") + } + + jobID := r.Target[ScalingTargetJob] + if jobID != "" && jobID != r.JobID { + return fmt.Errorf("job ID in payload did not match URL") + } + + groupName := r.Target[ScalingTargetGroup] + if groupName == "" { + return NewErrRPCCoded(400, "missing task group name for scaling action") + } + + if r.Count != nil { + if *r.Count < 0 { + return NewErrRPCCoded(400, "scaling action count can't be negative") + } + + if r.Error { + return NewErrRPCCoded(400, "scaling action should not contain count if error is true") + } + + truncCount := int(*r.Count) + if int64(truncCount) != *r.Count { + return NewErrRPCCoded(400, + fmt.Sprintf("new scaling count is too large for TaskGroup.Count (int): %v", r.Count)) + } + } + + return nil +} + // JobSummaryRequest is used when we just need to get a specific job summary type JobSummaryRequest struct { JobID string