From bdfd573fc4e3a88fcdc267228f330f5b0490879e Mon Sep 17 00:00:00 2001 From: Juana De La Cuesta Date: Mon, 2 Jun 2025 16:11:38 +0200 Subject: [PATCH] Update the scaling policies when deregistering a job (#25911) * func: Update the scaling policies when deregistering a job * func: Add tests for updating the policy * docs: add changelog * func: set back the old order * style: rearrange for clarity and to reuse the watchset * func: set the policies to teh last submitted when starting a job * func: expand tests of teh start job command to include job submission * func: Expand the tests to verify the correct state of the scaling policy after job start * Update command/job_start.go Co-authored-by: Tim Gross * Update nomad/fsm_test.go Co-authored-by: Tim Gross * func: add warning when there is no previous job submission --------- Co-authored-by: Tim Gross --- .changelog/25911.txt | 3 ++ api/jobs.go | 9 +++++ api/tasks.go | 6 +++ command/job_start.go | 53 +++++++++++++++++++++++++ command/job_start_test.go | 82 ++++++++++++++++++++++++++++++++++++++- command/testing_test.go | 6 ++- nomad/fsm.go | 44 ++++++++++++--------- nomad/fsm_test.go | 12 ++++++ 8 files changed, 195 insertions(+), 20 deletions(-) create mode 100644 .changelog/25911.txt diff --git a/.changelog/25911.txt b/.changelog/25911.txt new file mode 100644 index 000000000..9f9890d92 --- /dev/null +++ b/.changelog/25911.txt @@ -0,0 +1,3 @@ +```release-note:bug +scaling: Set the scaling policies to disabled when a job is stopped +``` diff --git a/api/jobs.go b/api/jobs.go index dc52270a3..e5c52f935 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -1420,6 +1420,15 @@ func (j *Job) AddSpread(s *Spread) *Job { return j } +func (j *Job) GetScalingPoliciesPerTaskGroup() map[string]*ScalingPolicy { + ret := map[string]*ScalingPolicy{} + for _, tg := range j.TaskGroups { + ret[*tg.Name] = tg.Scaling + } + + return ret +} + type WriteRequest struct { // The target region for this write Region string diff --git a/api/tasks.go b/api/tasks.go index e36ae130d..3c8c8554e 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -707,6 +707,12 @@ func (g *TaskGroup) AddSpread(s *Spread) *TaskGroup { return g } +// AddSpread is used to add a new spread preference to a task group. +func (g *TaskGroup) ScalingPolicy(sp *ScalingPolicy) *TaskGroup { + g.Scaling = sp + return g +} + // LogConfig provides configuration for log rotation type LogConfig struct { MaxFiles *int `mapstructure:"max_files" hcl:"max_files,optional"` diff --git a/command/job_start.go b/command/job_start.go index 09a99634a..bded5ec4b 100644 --- a/command/job_start.go +++ b/command/job_start.go @@ -4,11 +4,14 @@ package command import ( + "encoding/json" "fmt" "strings" "time" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/api/contexts" + "github.com/hashicorp/nomad/jobspec2" "github.com/posener/complete" ) @@ -132,6 +135,35 @@ func (c *JobStartCommand) Run(args []string) int { // register the job in a not stopped state *job.Stop = false + // When a job is stopped, all its scaling policies are disabled. Before + // starting the job again, set them back to the last user submitted state. + ps := job.GetScalingPoliciesPerTaskGroup() + if len(ps) > 0 { + sub, _, err := client.Jobs().Submission(*job.ID, int(*job.Version), &api.QueryOptions{ + Region: *job.Region, + Namespace: *job.Namespace, + }) + if err != nil { + if _, ok := err.(api.UnexpectedResponseError); !ok { + c.Ui.Error(fmt.Sprintf("%+T\n", err) + err.Error()) + return 1 + } + // If the job was submitted using the API, there are no submissions stored. + c.Ui.Warn("All scaling policies for this job were disabled when it was stopped, resubmit it to enable them again.") + } else { + lastJob, err := parseFromSubmission(sub) + if err != nil { + c.Ui.Error(err.Error()) + return 1 + } + + sps := lastJob.GetScalingPoliciesPerTaskGroup() + for _, tg := range job.TaskGroups { + tg.Scaling.Enabled = sps[*tg.Name].Enabled + } + } + } + resp, _, err := client.Jobs().Register(job, nil) // Check if the job is periodic or is a parameterized job @@ -162,3 +194,24 @@ func (c *JobStartCommand) Run(args []string) int { mon := newMonitor(c.Ui, client, length) return mon.monitor(resp.EvalID) } + +func parseFromSubmission(sub *api.JobSubmission) (*api.Job, error) { + var job *api.Job + var err error + + switch sub.Format { + case "hcl2": + job, err = jobspec2.Parse("", strings.NewReader(sub.Source)) + if err != nil { + return nil, fmt.Errorf("Unable to parse job submission to re-enable scaling policies: %w", err) + } + + case "json": + err = json.Unmarshal([]byte(sub.Source), &job) + if err != nil { + return nil, fmt.Errorf("Unable to parse job submission to re-enable scaling policies: %w", err) + } + } + + return job, nil +} diff --git a/command/job_start_test.go b/command/job_start_test.go index 191f1e542..fd8d9822b 100644 --- a/command/job_start_test.go +++ b/command/job_start_test.go @@ -4,12 +4,14 @@ package command import ( + "encoding/json" "testing" "github.com/hashicorp/cli" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/command/agent" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -41,7 +43,15 @@ func TestStartCommand(t *testing.T) { client, err := cmd.Meta.Client() must.NoError(t, err) - _, _, err = client.Jobs().Register(job, nil) + jsonBytes, err := json.Marshal(job) + must.NoError(t, err) + + _, _, err = client.Jobs().RegisterOpts(job, &api.RegisterOptions{ + Submission: &api.JobSubmission{ + Source: string(jsonBytes), + Format: "json", + }, + }, nil) must.NoError(t, err) waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusRunning, "") @@ -53,6 +63,76 @@ func TestStartCommand(t *testing.T) { res := cmd.Run([]string{"-address", addr, *job.ID}) must.Zero(t, res) + + pol, _, err := client.Scaling().ListPolicies(nil) + must.NoError(t, err) + must.One(t, len(pol)) + must.True(t, *job.TaskGroups[0].Scaling.Enabled) + + }) + + t.Run("succeeds when starting a stopped job with disabled scaling policies and no submissions", func(t *testing.T) { + job := testJob(uuid.Generate()) + + client, err := cmd.Meta.Client() + must.NoError(t, err) + + job.TaskGroups[0].Scaling.Enabled = pointer.Of(false) + + _, _, err = client.Jobs().RegisterOpts(job, &api.RegisterOptions{}, nil) + must.NoError(t, err) + + waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusRunning, "") + + _, _, err = client.Jobs().Deregister(*job.ID, false, nil) + must.Nil(t, err) + + waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusComplete, "") + + res := cmd.Run([]string{"-address", addr, *job.ID}) + must.Zero(t, res) + + pol, _, err := client.Scaling().ListPolicies(nil) + must.NoError(t, err) + must.One(t, len(pol)) + must.False(t, *job.TaskGroups[0].Scaling.Enabled) + + }) + + t.Run("succeeds when starting a stopped job with enabled scaling policies", func(t *testing.T) { + job := testJob(uuid.Generate()) + + client, err := cmd.Meta.Client() + must.NoError(t, err) + + job.TaskGroups[0].Scaling.Enabled = pointer.Of(true) + + jsonBytes, err := json.Marshal(job) + must.NoError(t, err) + + _, _, err = client.Jobs().RegisterOpts(job, &api.RegisterOptions{ + Submission: &api.JobSubmission{ + Source: string(jsonBytes), + Format: "json", + }, + }, nil) + must.NoError(t, err) + + waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusRunning, "") + + _, _, err = client.Jobs().Deregister(*job.ID, false, nil) + must.Nil(t, err) + + waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusComplete, "") + + res := cmd.Run([]string{"-address", addr, *job.ID}) + must.Zero(t, res) + + pol, _, err := client.Scaling().ListPolicies(nil) + must.NoError(t, err) + must.One(t, len(pol)) + must.True(t, *job.TaskGroups[0].Scaling.Enabled) + }) t.Run("fails to start a job not previously stopped", func(t *testing.T) { diff --git a/command/testing_test.go b/command/testing_test.go index ec4f611d1..ffc2fe6b4 100644 --- a/command/testing_test.go +++ b/command/testing_test.go @@ -74,7 +74,11 @@ func testJob(jobID string) *api.Job { AddTask(task). RequireDisk(&api.EphemeralDisk{ SizeMB: pointer.Of(20), - }) + }).ScalingPolicy(&api.ScalingPolicy{ + Min: pointer.Of(int64(1)), + Max: pointer.Of(int64(5)), + Enabled: pointer.Of(true), + }) job := api.NewBatchJob(jobID, jobID, "global", 1). AddDatacenter("dc1"). diff --git a/nomad/fsm.go b/nomad/fsm.go index f9811b8f1..1816a7747 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -800,6 +800,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by // handleJobDeregister is used to deregister a job. Leaves error logging up to // caller. func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, submitTime int64, noShutdownDelay bool, tx state.Txn) error { + // If it is periodic remove it from the dispatcher if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil { return fmt.Errorf("periodicDispatcher.Remove failed: %w", err) @@ -833,27 +834,34 @@ func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, pu // the job was updated to be non-periodic, thus checking if it is periodic // doesn't ensure we clean it up properly. n.state.DeletePeriodicLaunchTxn(index, namespace, jobID, tx) - } else { - // Get the current job and mark it as stopped and re-insert it. - ws := memdb.NewWatchSet() - current, err := n.state.JobByIDTxn(ws, namespace, jobID, tx) - if err != nil { - return fmt.Errorf("JobByID lookup failed: %w", err) - } + return nil + } - if current == nil { - return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", jobID, namespace) - } + // Get the current job and mark it as stopped and re-insert it. + ws := memdb.NewWatchSet() + current, err := n.state.JobByIDTxn(ws, namespace, jobID, tx) + if err != nil { + return fmt.Errorf("JobByID lookup failed: %w", err) + } - stopped := current.Copy() - stopped.Stop = true - if submitTime != 0 { - stopped.SubmitTime = submitTime - } + if current == nil { + return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", jobID, namespace) + } - if err := n.state.UpsertJobTxn(index, nil, stopped, tx); err != nil { - return fmt.Errorf("UpsertJob failed: %w", err) - } + stopped := current.Copy() + stopped.Stop = true + if submitTime != 0 { + stopped.SubmitTime = submitTime + } + + // Disable scaling policies to avoid monitoring stopped jobs + scalingPolicies := stopped.GetScalingPolicies() + for _, policy := range scalingPolicies { + policy.Enabled = false + } + + if err := n.state.UpsertJobTxn(index, nil, stopped, tx); err != nil { + return fmt.Errorf("UpsertJob failed: %w", err) } return nil diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 7ff6650ea..b75808596 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -977,12 +977,19 @@ func TestFSM_DeregisterJob_NoPurge(t *testing.T) { fsm := testFSM(t) job := mock.PeriodicJob() + + job.TaskGroups[0].Scaling = &structs.ScalingPolicy{ + ID: "mockID", + Enabled: true, + } + req := structs.JobRegisterRequest{ Job: job, WriteRequest: structs.WriteRequest{ Namespace: job.Namespace, }, } + buf, err := structs.Encode(structs.JobRegisterRequestType, req) if err != nil { t.Fatalf("err: %v", err) @@ -1040,6 +1047,11 @@ func TestFSM_DeregisterJob_NoPurge(t *testing.T) { if launchOut == nil { t.Fatalf("launch not found!") } + + // Verify the scaling policies were disabled + for _, policy := range jobOut.GetScalingPolicies() { + must.False(t, policy.Enabled) + } } func TestFSM_BatchDeregisterJob(t *testing.T) {