Update the scaling policies when deregistering a job (#25911)

* func: Update the scaling policies when deregistering a job

* func: Add tests for updating the policy

* docs: add changelog

* func: set back the old order

* style: rearrange for clarity and to reuse the watchset

* func: set the policies to teh last submitted when starting a job

* func: expand tests  of teh start job command to include job submission

* func: Expand the tests to verify the correct state of the scaling policy after job start

* Update command/job_start.go

Co-authored-by: Tim Gross <tgross@hashicorp.com>

* Update nomad/fsm_test.go

Co-authored-by: Tim Gross <tgross@hashicorp.com>

* func: add warning when there is no previous job submission

---------

Co-authored-by: Tim Gross <tgross@hashicorp.com>
This commit is contained in:
Juana De La Cuesta
2025-06-02 16:11:38 +02:00
committed by GitHub
parent ae3eaf80d1
commit bdfd573fc4
8 changed files with 195 additions and 20 deletions

3
.changelog/25911.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
scaling: Set the scaling policies to disabled when a job is stopped
```

View File

@@ -1420,6 +1420,15 @@ func (j *Job) AddSpread(s *Spread) *Job {
return j return j
} }
func (j *Job) GetScalingPoliciesPerTaskGroup() map[string]*ScalingPolicy {
ret := map[string]*ScalingPolicy{}
for _, tg := range j.TaskGroups {
ret[*tg.Name] = tg.Scaling
}
return ret
}
type WriteRequest struct { type WriteRequest struct {
// The target region for this write // The target region for this write
Region string Region string

View File

@@ -707,6 +707,12 @@ func (g *TaskGroup) AddSpread(s *Spread) *TaskGroup {
return g return g
} }
// AddSpread is used to add a new spread preference to a task group.
func (g *TaskGroup) ScalingPolicy(sp *ScalingPolicy) *TaskGroup {
g.Scaling = sp
return g
}
// LogConfig provides configuration for log rotation // LogConfig provides configuration for log rotation
type LogConfig struct { type LogConfig struct {
MaxFiles *int `mapstructure:"max_files" hcl:"max_files,optional"` MaxFiles *int `mapstructure:"max_files" hcl:"max_files,optional"`

View File

@@ -4,11 +4,14 @@
package command package command
import ( import (
"encoding/json"
"fmt" "fmt"
"strings" "strings"
"time" "time"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts" "github.com/hashicorp/nomad/api/contexts"
"github.com/hashicorp/nomad/jobspec2"
"github.com/posener/complete" "github.com/posener/complete"
) )
@@ -132,6 +135,35 @@ func (c *JobStartCommand) Run(args []string) int {
// register the job in a not stopped state // register the job in a not stopped state
*job.Stop = false *job.Stop = false
// When a job is stopped, all its scaling policies are disabled. Before
// starting the job again, set them back to the last user submitted state.
ps := job.GetScalingPoliciesPerTaskGroup()
if len(ps) > 0 {
sub, _, err := client.Jobs().Submission(*job.ID, int(*job.Version), &api.QueryOptions{
Region: *job.Region,
Namespace: *job.Namespace,
})
if err != nil {
if _, ok := err.(api.UnexpectedResponseError); !ok {
c.Ui.Error(fmt.Sprintf("%+T\n", err) + err.Error())
return 1
}
// If the job was submitted using the API, there are no submissions stored.
c.Ui.Warn("All scaling policies for this job were disabled when it was stopped, resubmit it to enable them again.")
} else {
lastJob, err := parseFromSubmission(sub)
if err != nil {
c.Ui.Error(err.Error())
return 1
}
sps := lastJob.GetScalingPoliciesPerTaskGroup()
for _, tg := range job.TaskGroups {
tg.Scaling.Enabled = sps[*tg.Name].Enabled
}
}
}
resp, _, err := client.Jobs().Register(job, nil) resp, _, err := client.Jobs().Register(job, nil)
// Check if the job is periodic or is a parameterized job // Check if the job is periodic or is a parameterized job
@@ -162,3 +194,24 @@ func (c *JobStartCommand) Run(args []string) int {
mon := newMonitor(c.Ui, client, length) mon := newMonitor(c.Ui, client, length)
return mon.monitor(resp.EvalID) return mon.monitor(resp.EvalID)
} }
func parseFromSubmission(sub *api.JobSubmission) (*api.Job, error) {
var job *api.Job
var err error
switch sub.Format {
case "hcl2":
job, err = jobspec2.Parse("", strings.NewReader(sub.Source))
if err != nil {
return nil, fmt.Errorf("Unable to parse job submission to re-enable scaling policies: %w", err)
}
case "json":
err = json.Unmarshal([]byte(sub.Source), &job)
if err != nil {
return nil, fmt.Errorf("Unable to parse job submission to re-enable scaling policies: %w", err)
}
}
return job, nil
}

View File

@@ -4,12 +4,14 @@
package command package command
import ( import (
"encoding/json"
"testing" "testing"
"github.com/hashicorp/cli" "github.com/hashicorp/cli"
"github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/command/agent" "github.com/hashicorp/nomad/command/agent"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
@@ -41,7 +43,15 @@ func TestStartCommand(t *testing.T) {
client, err := cmd.Meta.Client() client, err := cmd.Meta.Client()
must.NoError(t, err) must.NoError(t, err)
_, _, err = client.Jobs().Register(job, nil) jsonBytes, err := json.Marshal(job)
must.NoError(t, err)
_, _, err = client.Jobs().RegisterOpts(job, &api.RegisterOptions{
Submission: &api.JobSubmission{
Source: string(jsonBytes),
Format: "json",
},
}, nil)
must.NoError(t, err) must.NoError(t, err)
waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusRunning, "") waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusRunning, "")
@@ -53,6 +63,76 @@ func TestStartCommand(t *testing.T) {
res := cmd.Run([]string{"-address", addr, *job.ID}) res := cmd.Run([]string{"-address", addr, *job.ID})
must.Zero(t, res) must.Zero(t, res)
pol, _, err := client.Scaling().ListPolicies(nil)
must.NoError(t, err)
must.One(t, len(pol))
must.True(t, *job.TaskGroups[0].Scaling.Enabled)
})
t.Run("succeeds when starting a stopped job with disabled scaling policies and no submissions", func(t *testing.T) {
job := testJob(uuid.Generate())
client, err := cmd.Meta.Client()
must.NoError(t, err)
job.TaskGroups[0].Scaling.Enabled = pointer.Of(false)
_, _, err = client.Jobs().RegisterOpts(job, &api.RegisterOptions{}, nil)
must.NoError(t, err)
waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusRunning, "")
_, _, err = client.Jobs().Deregister(*job.ID, false, nil)
must.Nil(t, err)
waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusComplete, "")
res := cmd.Run([]string{"-address", addr, *job.ID})
must.Zero(t, res)
pol, _, err := client.Scaling().ListPolicies(nil)
must.NoError(t, err)
must.One(t, len(pol))
must.False(t, *job.TaskGroups[0].Scaling.Enabled)
})
t.Run("succeeds when starting a stopped job with enabled scaling policies", func(t *testing.T) {
job := testJob(uuid.Generate())
client, err := cmd.Meta.Client()
must.NoError(t, err)
job.TaskGroups[0].Scaling.Enabled = pointer.Of(true)
jsonBytes, err := json.Marshal(job)
must.NoError(t, err)
_, _, err = client.Jobs().RegisterOpts(job, &api.RegisterOptions{
Submission: &api.JobSubmission{
Source: string(jsonBytes),
Format: "json",
},
}, nil)
must.NoError(t, err)
waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusRunning, "")
_, _, err = client.Jobs().Deregister(*job.ID, false, nil)
must.Nil(t, err)
waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusComplete, "")
res := cmd.Run([]string{"-address", addr, *job.ID})
must.Zero(t, res)
pol, _, err := client.Scaling().ListPolicies(nil)
must.NoError(t, err)
must.One(t, len(pol))
must.True(t, *job.TaskGroups[0].Scaling.Enabled)
}) })
t.Run("fails to start a job not previously stopped", func(t *testing.T) { t.Run("fails to start a job not previously stopped", func(t *testing.T) {

View File

@@ -74,7 +74,11 @@ func testJob(jobID string) *api.Job {
AddTask(task). AddTask(task).
RequireDisk(&api.EphemeralDisk{ RequireDisk(&api.EphemeralDisk{
SizeMB: pointer.Of(20), SizeMB: pointer.Of(20),
}) }).ScalingPolicy(&api.ScalingPolicy{
Min: pointer.Of(int64(1)),
Max: pointer.Of(int64(5)),
Enabled: pointer.Of(true),
})
job := api.NewBatchJob(jobID, jobID, "global", 1). job := api.NewBatchJob(jobID, jobID, "global", 1).
AddDatacenter("dc1"). AddDatacenter("dc1").

View File

@@ -800,6 +800,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by
// handleJobDeregister is used to deregister a job. Leaves error logging up to // handleJobDeregister is used to deregister a job. Leaves error logging up to
// caller. // caller.
func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, submitTime int64, noShutdownDelay bool, tx state.Txn) error { func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, submitTime int64, noShutdownDelay bool, tx state.Txn) error {
// If it is periodic remove it from the dispatcher // If it is periodic remove it from the dispatcher
if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil { if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil {
return fmt.Errorf("periodicDispatcher.Remove failed: %w", err) return fmt.Errorf("periodicDispatcher.Remove failed: %w", err)
@@ -833,27 +834,34 @@ func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, pu
// the job was updated to be non-periodic, thus checking if it is periodic // the job was updated to be non-periodic, thus checking if it is periodic
// doesn't ensure we clean it up properly. // doesn't ensure we clean it up properly.
n.state.DeletePeriodicLaunchTxn(index, namespace, jobID, tx) n.state.DeletePeriodicLaunchTxn(index, namespace, jobID, tx)
} else { return nil
// Get the current job and mark it as stopped and re-insert it. }
ws := memdb.NewWatchSet()
current, err := n.state.JobByIDTxn(ws, namespace, jobID, tx)
if err != nil {
return fmt.Errorf("JobByID lookup failed: %w", err)
}
if current == nil { // Get the current job and mark it as stopped and re-insert it.
return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", jobID, namespace) ws := memdb.NewWatchSet()
} current, err := n.state.JobByIDTxn(ws, namespace, jobID, tx)
if err != nil {
return fmt.Errorf("JobByID lookup failed: %w", err)
}
stopped := current.Copy() if current == nil {
stopped.Stop = true return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", jobID, namespace)
if submitTime != 0 { }
stopped.SubmitTime = submitTime
}
if err := n.state.UpsertJobTxn(index, nil, stopped, tx); err != nil { stopped := current.Copy()
return fmt.Errorf("UpsertJob failed: %w", err) stopped.Stop = true
} if submitTime != 0 {
stopped.SubmitTime = submitTime
}
// Disable scaling policies to avoid monitoring stopped jobs
scalingPolicies := stopped.GetScalingPolicies()
for _, policy := range scalingPolicies {
policy.Enabled = false
}
if err := n.state.UpsertJobTxn(index, nil, stopped, tx); err != nil {
return fmt.Errorf("UpsertJob failed: %w", err)
} }
return nil return nil

View File

@@ -977,12 +977,19 @@ func TestFSM_DeregisterJob_NoPurge(t *testing.T) {
fsm := testFSM(t) fsm := testFSM(t)
job := mock.PeriodicJob() job := mock.PeriodicJob()
job.TaskGroups[0].Scaling = &structs.ScalingPolicy{
ID: "mockID",
Enabled: true,
}
req := structs.JobRegisterRequest{ req := structs.JobRegisterRequest{
Job: job, Job: job,
WriteRequest: structs.WriteRequest{ WriteRequest: structs.WriteRequest{
Namespace: job.Namespace, Namespace: job.Namespace,
}, },
} }
buf, err := structs.Encode(structs.JobRegisterRequestType, req) buf, err := structs.Encode(structs.JobRegisterRequestType, req)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@@ -1040,6 +1047,11 @@ func TestFSM_DeregisterJob_NoPurge(t *testing.T) {
if launchOut == nil { if launchOut == nil {
t.Fatalf("launch not found!") t.Fatalf("launch not found!")
} }
// Verify the scaling policies were disabled
for _, policy := range jobOut.GetScalingPolicies() {
must.False(t, policy.Enabled)
}
} }
func TestFSM_BatchDeregisterJob(t *testing.T) { func TestFSM_BatchDeregisterJob(t *testing.T) {