From d29ac461a7229ca61da9561c1058e14aecbe4b90 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 29 Nov 2023 13:01:19 -0500 Subject: [PATCH] cli: non-service jobs on `job restart -reschedule` (#19147) The `-reschedule` flag stops allocations and assumes the Nomad scheduler will create new allocations to replace them. But this is only true for service and batch jobs. Restarting non-service jobs with the `-reschedule` flag causes the command to loop forever waiting for the allocations to be replaced, which never happens. Allocations for system jobs may be replaced by triggering an evaluation after each stop to cause the reconciler to run again. Sysbatch jobs should not be allowed to be rescheduled as they are never replaced by the scheduler. --- .changelog/19147.txt | 3 + command/job_restart.go | 25 +- command/job_restart_test.go | 290 +++++++++++++++--- website/content/docs/commands/job/restart.mdx | 3 +- 4 files changed, 273 insertions(+), 48 deletions(-) create mode 100644 .changelog/19147.txt diff --git a/.changelog/19147.txt b/.changelog/19147.txt new file mode 100644 index 000000000..2122c8732 --- /dev/null +++ b/.changelog/19147.txt @@ -0,0 +1,3 @@ +```release-note:bug +cli: Fixed the `nomad job restart` command to create replacements for batch and system jobs and to prevent sysbatch jobs from being rescheduled since they never create replacements +``` diff --git a/command/job_restart.go b/command/job_restart.go index 476ac0798..f7e1d3db5 100644 --- a/command/job_restart.go +++ b/command/job_restart.go @@ -187,7 +187,8 @@ Restart Options: in-place. Since the group is not modified the restart does not create a new deployment, and so values defined in 'update' blocks, such as 'max_parallel', are not taken into account. This option cannot be used with - '-task'. + '-task'. Only jobs of type 'batch', 'service', and 'system' can be + rescheduled. -task= Specify the task to restart. Can be specified multiple times. If groups are @@ -286,6 +287,16 @@ func (c *JobRestartCommand) Run(args []string) int { go c.handleSignal(c.sigsCh, activeCh) + // Verify job type can be rescheduled. + if c.reschedule { + switch *job.Type { + case api.JobTypeBatch, api.JobTypeService, api.JobTypeSystem: + default: + c.Ui.Error(fmt.Sprintf("Jobs of type %q are not allowed to be rescheduled.", *job.Type)) + return 1 + } + } + // Confirm that we should restart a multi-region job in a single region. if job.IsMultiregion() && !c.autoYes && !c.shouldRestartMultiregion() { c.Ui.Output("\nJob restart canceled.") @@ -952,6 +963,18 @@ func (c *JobRestartCommand) stopAlloc(alloc AllocationListStubWithJob) error { return fmt.Errorf("Failed to stop allocation: %w", err) } + // Allocations for system jobs do not get replaced by the scheduler after + // being stopped, so an eval is needed to trigger the reconciler. + if *alloc.Job.Type == api.JobTypeSystem { + opts := api.EvalOptions{ + ForceReschedule: true, + } + _, _, err := c.client.Jobs().EvaluateWithOpts(*alloc.Job.ID, opts, nil) + if err != nil { + return fmt.Errorf("Failed evaluate job: %w", err) + } + } + // errCh receives an error if anything goes wrong or nil when the // replacement allocation is running. // Use a buffered channel to prevent both goroutine from blocking trying to diff --git a/command/job_restart_test.go b/command/job_restart_test.go index ee50f935f..1af1c76f6 100644 --- a/command/job_restart_test.go +++ b/command/job_restart_test.go @@ -259,8 +259,19 @@ func TestJobRestartCommand_Run(t *testing.T) { SetConfig("run_for", "1m"). SetConfig("exit_code", 0) - jobID := "test_job_restart_cmd" - job := api.NewServiceJob(jobID, jobID, "global", 1). + batchJob := api.NewBatchJob("test_job_batch", "test_job_batch", "global", 1). + AddDatacenter("dc1"). + AddTaskGroup( + api.NewTaskGroup("single_task", 3). + AddTask(mainTask), + ). + AddTaskGroup( + api.NewTaskGroup("multiple_tasks", 2). + AddTask(prestartTask). + AddTask(sidecarTask). + AddTask(mainTask), + ) + serviceJob := api.NewServiceJob("test_job_service", "test_job_service", "global", 1). AddDatacenter("dc1"). AddTaskGroup( api.NewTaskGroup("single_task", 3). @@ -613,55 +624,195 @@ func TestJobRestartCommand_Run(t *testing.T) { }, } - for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - // Run each test case in parallel because they are fairly slow. - ci.Parallel(t) + for _, job := range []*api.Job{batchJob, serviceJob} { + for _, tc := range testCases { + tc := tc + t.Run(fmt.Sprintf("%s/%s", *job.Type, tc.name), func(t *testing.T) { + // Run each test case in parallel because they are fairly slow. + ci.Parallel(t) + // Initialize UI and command. + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + // Start client and server and wait for node to be ready. + // User separate cluster for each test case so they can run in + // parallel without affecting each other. + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + waitForNodes(t, client) + + // Register test job and wait for its allocs to be running. + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) + + allocStubs, _, err := client.Jobs().Allocations(*job.ID, true, nil) + must.NoError(t, err) + for _, alloc := range allocStubs { + waitForAllocRunning(t, client, alloc.ID) + } + + // Fetch allocations before the restart so we know which ones are + // supposed to be affected in case the test reschedules allocs. + allocStubs, _, err = client.Jobs().Allocations(*job.ID, true, nil) + must.NoError(t, err) + + // Prepend server URL and append job ID to the test case command. + args := []string{"-address", url, "-yes"} + args = append(args, tc.args...) + args = append(args, *job.ID) + + // Run job restart command. + code = cmd.Run(args) + must.Eq(t, code, tc.expectedCode) + + // Run test case validation function. + if tc.validateFn != nil { + tc.validateFn(t, client, allocStubs, ui.OutputWriter.String(), ui.ErrorWriter.String()) + } + }) + } + } +} + +func TestJobRestartCommand_Run_system_reschedule(t *testing.T) { + ci.Parallel(t) + + // Create a system job. + job := api.NewSystemJob("test_job", "test_job", "global", 100). + AddDatacenter("dc1"). + AddTaskGroup( + api.NewTaskGroup("group", 1). + AddTask( + api.NewTask("task", "mock_driver"). + SetConfig("run_for", "1m"). + SetConfig("exit_code", 0), + ), + ) + + // Start a server and 3 clients. + srv, client, url := testServer(t, false, nil) + defer srv.Shutdown() + + srvRPCAddr := srv.GetConfig().AdvertiseAddrs.RPC + testClient(t, "client1", newClientAgentConfigFunc("", "", srvRPCAddr)) + testClient(t, "client2", newClientAgentConfigFunc("", "", srvRPCAddr)) + testClient(t, "client3", newClientAgentConfigFunc("", "", srvRPCAddr)) + + waitForNodes(t, client) + + // Initialize UI and command. + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + // Register test job and wait for its allocs to be running. + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) + + allocStubs, _, err := client.Jobs().Allocations(*job.ID, true, nil) + must.NoError(t, err) + for _, alloc := range allocStubs { + waitForAllocRunning(t, client, alloc.ID) + } + + // Run job restart command. + args := []string{"-address", url, "-yes", "-verbose", "-reschedule", *job.ID} + code = cmd.Run(args) + must.Eq(t, code, 0) + + reschedules := map[string]bool{} + for _, alloc := range allocStubs { + reschedules[alloc.ID] = true + } + waitAllocsRescheduled(t, client, reschedules) + + // Check that allocations were rescheduled properly. + stdout := ui.OutputWriter.String() + must.StrContains(t, stdout, "Restarting 3 allocations") + for _, alloc := range allocStubs { + must.StrContains(t, stdout, fmt.Sprintf(`Rescheduling allocation "%s"`, alloc.ID)) + must.StrContains(t, stdout, fmt.Sprintf(`Allocation "%s" replaced by`, alloc.ID)) + } +} + +func TestJobRestartCommand_Run_rescheduleNotSupported(t *testing.T) { + ci.Parallel(t) + + // Create a sysbatch job. + sysbatchJob := api.NewSysbatchJob("test_sysbatch_job", "test_sysbatch_job", "global", 100). + AddDatacenter("dc1"). + AddTaskGroup( + api.NewTaskGroup("group", 1). + AddTask( + api.NewTask("task", "mock_driver"). + SetConfig("run_for", "1m"). + SetConfig("exit_code", 0), + ), + ) + + // Start a server and a client. + srv, client, url := testServer(t, false, nil) + defer srv.Shutdown() + + srvRPCAddr := srv.GetConfig().AdvertiseAddrs.RPC + testClient(t, "client1", newClientAgentConfigFunc("", "", srvRPCAddr)) + waitForNodes(t, client) + + testCases := []struct { + name string + job *api.Job + }{ + { + name: "sysbatch job", + job: sysbatchJob, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { // Initialize UI and command. ui := cli.NewMockUi() cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} - // Start client and server and wait for node to be ready. - // User separate cluster for each test case so they can run in - // parallel without affecting each other. - srv, client, url := testServer(t, true, nil) - defer srv.Shutdown() - - waitForNodes(t, client) - // Register test job and wait for its allocs to be running. - resp, _, err := client.Jobs().Register(job, nil) + resp, _, err := client.Jobs().Register(tc.job, nil) must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) - allocStubs, _, err := client.Jobs().Allocations(jobID, true, nil) + allocStubs, _, err := client.Jobs().Allocations(*tc.job.ID, true, nil) must.NoError(t, err) for _, alloc := range allocStubs { waitForAllocRunning(t, client, alloc.ID) } - // Fetch allocations before the restart so we know which ones are - // supposed to be affected in case the test reschedules allocs. - allocStubs, _, err = client.Jobs().Allocations(jobID, true, nil) - must.NoError(t, err) - - // Prepend server URL and append job ID to the test case command. - args := []string{"-address", url, "-yes"} - args = append(args, tc.args...) - args = append(args, jobID) - - // Run job restart command. + // Run job restart command and expect error. + args := []string{"-address", url, "-yes", "-verbose", "-reschedule", *tc.job.ID} code = cmd.Run(args) - must.Eq(t, code, tc.expectedCode) + must.Eq(t, code, 1) - // Run test case validation function. - if tc.validateFn != nil { - tc.validateFn(t, client, allocStubs, ui.OutputWriter.String(), ui.ErrorWriter.String()) - } + stderr := ui.ErrorWriter.String() + must.StrContains(t, stderr, "not allowed to be rescheduled") }) } } @@ -752,10 +903,18 @@ func TestJobRestartCommand_jobPrefixAndNamespace(t *testing.T) { code := cmd.Run(args) if tc.expectedErr != "" { - must.NonZero(t, code) + must.NonZero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) must.StrContains(t, ui.ErrorWriter.String(), tc.expectedErr) } else { - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) } }) } @@ -791,7 +950,11 @@ func TestJobRestartCommand_noAllocs(t *testing.T) { "-yes", jobID, }) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) must.StrContains(t, ui.OutputWriter.String(), "No allocations to restart") } @@ -810,13 +973,19 @@ func TestJobRestartCommand_rescheduleFail(t *testing.T) { // Register test job with 3 allocs. jobID := "test_job_restart_reschedule_fail" job := testJob(jobID) + job.Type = pointer.Of(api.JobTypeService) job.TaskGroups[0].Count = pointer.Of(3) + job.TaskGroups[0].Tasks[0].Config = map[string]any{"run_for": "10m"} resp, _, err := client.Jobs().Register(job, nil) must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) ui.OutputWriter.Reset() // Wait for allocs to be running. @@ -863,7 +1032,11 @@ func TestJobRestartCommand_monitorReplacementAlloc(t *testing.T) { must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) } ui.OutputWriter.Reset() @@ -1076,9 +1249,17 @@ namespace "default" { // Run command. code := cmd.Run(args) if tc.expectedErr == "" { - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) } else { - must.One(t, code) + must.One(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) must.StrContains(t, ui.ErrorWriter.String(), tc.expectedErr) } }) @@ -1122,8 +1303,9 @@ func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) { jobID := nonAlphaNum.ReplaceAllString(tc.name, "-") job := testJob(jobID) + job.Type = pointer.Of(api.JobTypeService) job.TaskGroups[0].Count = pointer.Of(2) - job.TaskGroups[0].Tasks[0].Config["run_for"] = "10m" + job.TaskGroups[0].Tasks[0].Config = map[string]any{"run_for": "10m"} job.TaskGroups[0].Tasks[0].ShutdownDelay = shutdownDelay job.TaskGroups[0].Tasks[0].Services = []*api.Service{{ Name: "service", @@ -1134,7 +1316,11 @@ func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) { must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout:\n%s\n\nstderr:\n%s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) ui.OutputWriter.Reset() // Wait for alloc to be running. @@ -1155,7 +1341,11 @@ func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) { args = append(args, jobID) code = cmd.Run(args) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout:\n%s\n\nstderr:\n%s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) // Wait for all allocs to restart. reschedules := map[string]bool{} @@ -1380,7 +1570,11 @@ func TestJobRestartCommand_filterAllocs(t *testing.T) { args := append(tc.args, "-verbose", "-yes", "example") code, err := cmd.parseAndValidate(args) must.NoError(t, err) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) got := cmd.filterAllocs(allAllocs) must.SliceEqFunc(t, tc.expectedAllocs, got, func(a, b AllocationListStubWithJob) bool { @@ -1423,7 +1617,11 @@ func TestJobRestartCommand_onErrorFail(t *testing.T) { must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) ui.OutputWriter.Reset() // Create a proxy to inject an error after 2 allocation restarts. diff --git a/website/content/docs/commands/job/restart.mdx b/website/content/docs/commands/job/restart.mdx index 601fce1bf..6f87e40a8 100644 --- a/website/content/docs/commands/job/restart.mdx +++ b/website/content/docs/commands/job/restart.mdx @@ -86,7 +86,8 @@ of the exact job ID. restarted in-place. Since the group is not modified the restart does not create a new deployment, and so values defined in [`update`][] blocks, such as [`max_parallel`][], are not taken into account. This option cannot be used - with `-task`. + with `-task`. Only jobs of type `batch`, `service`, and `system` can be + rescheduled. - `-on-error=`: Determines what action to take when an error happens during a restart batch. If `ask` the command stops and waits for user