mirror of
https://github.com/kemko/nomad.git
synced 2026-01-08 11:25:41 +03:00
Refactor Job.Scale() (#9771)
This commit is contained in:
@@ -972,178 +972,62 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "job", "scale"}, time.Now())
|
||||
|
||||
// Validate the arguments
|
||||
namespace := args.Target[structs.ScalingTargetNamespace]
|
||||
jobID := args.Target[structs.ScalingTargetJob]
|
||||
groupName := args.Target[structs.ScalingTargetGroup]
|
||||
if namespace != "" && namespace != args.RequestNamespace() {
|
||||
return structs.NewErrRPCCoded(400, "namespace in payload did not match header")
|
||||
} else if namespace == "" {
|
||||
namespace = args.RequestNamespace()
|
||||
}
|
||||
if jobID != "" && jobID != args.JobID {
|
||||
return fmt.Errorf("job ID in payload did not match URL")
|
||||
}
|
||||
if groupName == "" {
|
||||
return structs.NewErrRPCCoded(400, "missing task group name for scaling action")
|
||||
}
|
||||
if args.Error && args.Count != nil {
|
||||
return structs.NewErrRPCCoded(400, "scaling action should not contain count if error is true")
|
||||
}
|
||||
if args.Count != nil && *args.Count < 0 {
|
||||
return structs.NewErrRPCCoded(400, "scaling action count can't be negative")
|
||||
namespace := args.RequestNamespace()
|
||||
|
||||
// Authorize request
|
||||
aclObj, err := j.srv.ResolveToken(args.AuthToken)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check for submit-job permissions
|
||||
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
|
||||
return err
|
||||
} else if aclObj != nil {
|
||||
hasScaleJob := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityScaleJob)
|
||||
hasSubmitJob := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob)
|
||||
if aclObj != nil {
|
||||
hasScaleJob := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityScaleJob)
|
||||
hasSubmitJob := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilitySubmitJob)
|
||||
if !(hasScaleJob || hasSubmitJob) {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
}
|
||||
|
||||
// Lookup the job
|
||||
// Validate args
|
||||
err = args.Validate()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Find job
|
||||
snap, err := j.srv.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
job, err := snap.JobByID(ws, namespace, args.JobID)
|
||||
if err != nil {
|
||||
j.logger.Error("unable to lookup job", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if job == nil {
|
||||
return structs.NewErrRPCCoded(404, fmt.Sprintf("job %q not found", args.JobID))
|
||||
}
|
||||
|
||||
var found *structs.TaskGroup
|
||||
// Find target group in job TaskGroups
|
||||
groupName := args.Target[structs.ScalingTargetGroup]
|
||||
var group *structs.TaskGroup
|
||||
for _, tg := range job.TaskGroups {
|
||||
if groupName == tg.Name {
|
||||
found = tg
|
||||
if tg.Name == groupName {
|
||||
group = tg
|
||||
break
|
||||
}
|
||||
}
|
||||
if found == nil {
|
||||
|
||||
if group == nil {
|
||||
return structs.NewErrRPCCoded(400,
|
||||
fmt.Sprintf("task group %q specified for scaling does not exist in job", groupName))
|
||||
}
|
||||
|
||||
now := time.Now().UnixNano()
|
||||
|
||||
// If the count is present, commit the job update via Raft
|
||||
// for now, we'll do this even if count didn't change
|
||||
prevCount := found.Count
|
||||
if args.Count != nil {
|
||||
|
||||
// if there is a scaling policy, check that the new count is within bounds
|
||||
if found.Scaling != nil {
|
||||
if *args.Count < found.Scaling.Min {
|
||||
return structs.NewErrRPCCoded(400,
|
||||
fmt.Sprintf("group count was less than scaling policy minimum: %d < %d",
|
||||
*args.Count, found.Scaling.Min))
|
||||
}
|
||||
if found.Scaling.Max < *args.Count {
|
||||
return structs.NewErrRPCCoded(400,
|
||||
fmt.Sprintf("group count was greater than scaling policy maximum: %d > %d",
|
||||
*args.Count, found.Scaling.Max))
|
||||
}
|
||||
}
|
||||
|
||||
// Lookup the latest deployment, to see whether this scaling event should be blocked
|
||||
d, err := snap.LatestDeploymentByJobID(ws, namespace, args.JobID)
|
||||
if err != nil {
|
||||
j.logger.Error("unable to lookup latest deployment", "error", err)
|
||||
return err
|
||||
}
|
||||
// explicitly filter deployment by JobCreateIndex to be safe, because LatestDeploymentByJobID doesn't
|
||||
if d != nil && d.JobCreateIndex == job.CreateIndex && d.Active() {
|
||||
// attempt to register the scaling event
|
||||
JobScalingBlockedByActiveDeployment := "job scaling blocked due to active deployment"
|
||||
event := &structs.ScalingEventRequest{
|
||||
Namespace: job.Namespace,
|
||||
JobID: job.ID,
|
||||
TaskGroup: groupName,
|
||||
ScalingEvent: &structs.ScalingEvent{
|
||||
Time: now,
|
||||
PreviousCount: int64(prevCount),
|
||||
Message: JobScalingBlockedByActiveDeployment,
|
||||
Error: true,
|
||||
Meta: map[string]interface{}{
|
||||
"OriginalMessage": args.Message,
|
||||
"OriginalCount": *args.Count,
|
||||
"OriginalMeta": args.Meta,
|
||||
},
|
||||
},
|
||||
}
|
||||
if _, _, err := j.srv.raftApply(structs.ScalingEventRegisterRequestType, event); err != nil {
|
||||
// just log the error, this was a best-effort attempt
|
||||
j.logger.Error("scaling event create failed during block scaling action", "error", err)
|
||||
}
|
||||
return structs.NewErrRPCCoded(400, JobScalingBlockedByActiveDeployment)
|
||||
}
|
||||
|
||||
truncCount := int(*args.Count)
|
||||
if int64(truncCount) != *args.Count {
|
||||
return structs.NewErrRPCCoded(400,
|
||||
fmt.Sprintf("new scaling count is too large for TaskGroup.Count (int): %v", args.Count))
|
||||
}
|
||||
// update the task group count
|
||||
found.Count = truncCount
|
||||
|
||||
registerReq := structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
EnforceIndex: true,
|
||||
JobModifyIndex: job.ModifyIndex,
|
||||
PolicyOverride: args.PolicyOverride,
|
||||
WriteRequest: args.WriteRequest,
|
||||
}
|
||||
_, jobModifyIndex, err := j.srv.raftApply(structs.JobRegisterRequestType, registerReq)
|
||||
if err != nil {
|
||||
j.logger.Error("job register for scale failed", "error", err)
|
||||
return err
|
||||
}
|
||||
reply.JobModifyIndex = jobModifyIndex
|
||||
} else {
|
||||
reply.JobModifyIndex = job.ModifyIndex
|
||||
}
|
||||
|
||||
// Only create an eval for non-dispatch jobs and if the count was provided
|
||||
// for now, we'll do this even if count didn't change
|
||||
if !job.IsPeriodic() && !job.IsParameterized() && args.Count != nil {
|
||||
eval := &structs.Evaluation{
|
||||
ID: uuid.Generate(),
|
||||
Namespace: args.RequestNamespace(),
|
||||
Priority: structs.JobDefaultPriority,
|
||||
Type: structs.JobTypeService,
|
||||
TriggeredBy: structs.EvalTriggerScaling,
|
||||
JobID: args.JobID,
|
||||
JobModifyIndex: reply.JobModifyIndex,
|
||||
Status: structs.EvalStatusPending,
|
||||
CreateTime: now,
|
||||
ModifyTime: now,
|
||||
}
|
||||
update := &structs.EvalUpdateRequest{
|
||||
Evals: []*structs.Evaluation{eval},
|
||||
WriteRequest: structs.WriteRequest{Region: args.Region},
|
||||
}
|
||||
|
||||
// Commit this evaluation via Raft
|
||||
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
|
||||
if err != nil {
|
||||
j.logger.Error("eval create failed", "error", err, "method", "scale")
|
||||
return err
|
||||
}
|
||||
|
||||
reply.EvalID = eval.ID
|
||||
reply.EvalCreateIndex = evalIndex
|
||||
} else {
|
||||
reply.EvalID = ""
|
||||
reply.EvalCreateIndex = 0
|
||||
}
|
||||
prevCount := int64(group.Count)
|
||||
|
||||
event := &structs.ScalingEventRequest{
|
||||
Namespace: job.Namespace,
|
||||
@@ -1151,16 +1035,119 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
|
||||
TaskGroup: groupName,
|
||||
ScalingEvent: &structs.ScalingEvent{
|
||||
Time: now,
|
||||
PreviousCount: int64(prevCount),
|
||||
PreviousCount: prevCount,
|
||||
Count: args.Count,
|
||||
Message: args.Message,
|
||||
Error: args.Error,
|
||||
Meta: args.Meta,
|
||||
},
|
||||
}
|
||||
if reply.EvalID != "" {
|
||||
event.ScalingEvent.EvalID = &reply.EvalID
|
||||
|
||||
if args.Count != nil {
|
||||
// Further validation for count-based scaling event
|
||||
if group.Scaling != nil {
|
||||
if *args.Count < group.Scaling.Min {
|
||||
return structs.NewErrRPCCoded(400,
|
||||
fmt.Sprintf("group count was less than scaling policy minimum: %d < %d",
|
||||
*args.Count, group.Scaling.Min))
|
||||
}
|
||||
if group.Scaling.Max < *args.Count {
|
||||
return structs.NewErrRPCCoded(400,
|
||||
fmt.Sprintf("group count was greater than scaling policy maximum: %d > %d",
|
||||
*args.Count, group.Scaling.Max))
|
||||
}
|
||||
}
|
||||
|
||||
// Update group count
|
||||
group.Count = int(*args.Count)
|
||||
|
||||
// Block scaling event if there's an active deployment
|
||||
deployment, err := snap.LatestDeploymentByJobID(ws, namespace, args.JobID)
|
||||
if err != nil {
|
||||
j.logger.Error("unable to lookup latest deployment", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if deployment != nil && deployment.Active() && deployment.JobCreateIndex == job.CreateIndex {
|
||||
msg := "job scaling blocked due to active deployment"
|
||||
_, _, err := j.srv.raftApply(
|
||||
structs.ScalingEventRegisterRequestType,
|
||||
&structs.ScalingEventRequest{
|
||||
Namespace: job.Namespace,
|
||||
JobID: job.ID,
|
||||
TaskGroup: groupName,
|
||||
ScalingEvent: &structs.ScalingEvent{
|
||||
Time: now,
|
||||
PreviousCount: prevCount,
|
||||
Message: msg,
|
||||
Error: true,
|
||||
Meta: map[string]interface{}{
|
||||
"OriginalMessage": args.Message,
|
||||
"OriginalCount": *args.Count,
|
||||
"OriginalMeta": args.Meta,
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
// just log the error, this was a best-effort attempt
|
||||
j.logger.Error("scaling event create failed during block scaling action", "error", err)
|
||||
}
|
||||
return structs.NewErrRPCCoded(400, msg)
|
||||
}
|
||||
|
||||
// Commit the job update
|
||||
_, jobModifyIndex, err := j.srv.raftApply(
|
||||
structs.JobRegisterRequestType,
|
||||
structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
EnforceIndex: true,
|
||||
JobModifyIndex: job.ModifyIndex,
|
||||
PolicyOverride: args.PolicyOverride,
|
||||
WriteRequest: args.WriteRequest,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
j.logger.Error("job register for scale failed", "error", err)
|
||||
return err
|
||||
}
|
||||
reply.JobModifyIndex = jobModifyIndex
|
||||
|
||||
// Create an eval for non-dispatch jobs
|
||||
if !(job.IsPeriodic() || job.IsParameterized()) {
|
||||
eval := &structs.Evaluation{
|
||||
ID: uuid.Generate(),
|
||||
Namespace: namespace,
|
||||
Priority: structs.JobDefaultPriority,
|
||||
Type: structs.JobTypeService,
|
||||
TriggeredBy: structs.EvalTriggerScaling,
|
||||
JobID: args.JobID,
|
||||
JobModifyIndex: reply.JobModifyIndex,
|
||||
Status: structs.EvalStatusPending,
|
||||
CreateTime: now,
|
||||
ModifyTime: now,
|
||||
}
|
||||
|
||||
_, evalIndex, err := j.srv.raftApply(
|
||||
structs.EvalUpdateRequestType,
|
||||
&structs.EvalUpdateRequest{
|
||||
Evals: []*structs.Evaluation{eval},
|
||||
WriteRequest: structs.WriteRequest{Region: args.Region},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
j.logger.Error("eval create failed", "error", err, "method", "scale")
|
||||
return err
|
||||
}
|
||||
|
||||
reply.EvalID = eval.ID
|
||||
reply.EvalCreateIndex = evalIndex
|
||||
event.ScalingEvent.EvalID = &reply.EvalID
|
||||
}
|
||||
} else {
|
||||
reply.JobModifyIndex = job.ModifyIndex
|
||||
}
|
||||
|
||||
_, eventIndex, err := j.srv.raftApply(structs.ScalingEventRegisterRequestType, event)
|
||||
if err != nil {
|
||||
j.logger.Error("scaling event create failed", "error", err)
|
||||
@@ -1168,7 +1155,9 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
|
||||
}
|
||||
|
||||
reply.Index = eventIndex
|
||||
|
||||
j.srv.setQueryMeta(&reply.QueryMeta)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -715,6 +715,42 @@ type JobScaleRequest struct {
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// Validate is used to validate the arguments in the request
|
||||
func (r *JobScaleRequest) Validate() error {
|
||||
namespace := r.Target[ScalingTargetNamespace]
|
||||
if namespace != "" && namespace != r.RequestNamespace() {
|
||||
return NewErrRPCCoded(400, "namespace in payload did not match header")
|
||||
}
|
||||
|
||||
jobID := r.Target[ScalingTargetJob]
|
||||
if jobID != "" && jobID != r.JobID {
|
||||
return fmt.Errorf("job ID in payload did not match URL")
|
||||
}
|
||||
|
||||
groupName := r.Target[ScalingTargetGroup]
|
||||
if groupName == "" {
|
||||
return NewErrRPCCoded(400, "missing task group name for scaling action")
|
||||
}
|
||||
|
||||
if r.Count != nil {
|
||||
if *r.Count < 0 {
|
||||
return NewErrRPCCoded(400, "scaling action count can't be negative")
|
||||
}
|
||||
|
||||
if r.Error {
|
||||
return NewErrRPCCoded(400, "scaling action should not contain count if error is true")
|
||||
}
|
||||
|
||||
truncCount := int(*r.Count)
|
||||
if int64(truncCount) != *r.Count {
|
||||
return NewErrRPCCoded(400,
|
||||
fmt.Sprintf("new scaling count is too large for TaskGroup.Count (int): %v", r.Count))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// JobSummaryRequest is used when we just need to get a specific job summary
|
||||
type JobSummaryRequest struct {
|
||||
JobID string
|
||||
|
||||
Reference in New Issue
Block a user