cli: non-service jobs on job restart -reschedule (#19147)

The `-reschedule` flag stops allocations and assumes the Nomad scheduler
will create new allocations to replace them. But this is only true for
service and batch jobs.

Restarting non-service jobs with the `-reschedule` flag causes the
command to loop forever waiting for the allocations to be replaced,
which never happens.

Allocations for system jobs may be replaced by triggering an evaluation
after each stop to cause the reconciler to run again.

Sysbatch jobs should not be allowed to be rescheduled as they are never
replaced by the scheduler.
This commit is contained in:
Luiz Aoqui
2023-11-29 13:01:19 -05:00
committed by GitHub
parent 4e7ad58d2d
commit d29ac461a7
4 changed files with 273 additions and 48 deletions

3
.changelog/19147.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
cli: Fixed the `nomad job restart` command to create replacements for batch and system jobs and to prevent sysbatch jobs from being rescheduled since they never create replacements
```

View File

@@ -187,7 +187,8 @@ Restart Options:
in-place. Since the group is not modified the restart does not create a new
deployment, and so values defined in 'update' blocks, such as
'max_parallel', are not taken into account. This option cannot be used with
'-task'.
'-task'. Only jobs of type 'batch', 'service', and 'system' can be
rescheduled.
-task=<task-name>
Specify the task to restart. Can be specified multiple times. If groups are
@@ -286,6 +287,16 @@ func (c *JobRestartCommand) Run(args []string) int {
go c.handleSignal(c.sigsCh, activeCh)
// Verify job type can be rescheduled.
if c.reschedule {
switch *job.Type {
case api.JobTypeBatch, api.JobTypeService, api.JobTypeSystem:
default:
c.Ui.Error(fmt.Sprintf("Jobs of type %q are not allowed to be rescheduled.", *job.Type))
return 1
}
}
// Confirm that we should restart a multi-region job in a single region.
if job.IsMultiregion() && !c.autoYes && !c.shouldRestartMultiregion() {
c.Ui.Output("\nJob restart canceled.")
@@ -952,6 +963,18 @@ func (c *JobRestartCommand) stopAlloc(alloc AllocationListStubWithJob) error {
return fmt.Errorf("Failed to stop allocation: %w", err)
}
// Allocations for system jobs do not get replaced by the scheduler after
// being stopped, so an eval is needed to trigger the reconciler.
if *alloc.Job.Type == api.JobTypeSystem {
opts := api.EvalOptions{
ForceReschedule: true,
}
_, _, err := c.client.Jobs().EvaluateWithOpts(*alloc.Job.ID, opts, nil)
if err != nil {
return fmt.Errorf("Failed evaluate job: %w", err)
}
}
// errCh receives an error if anything goes wrong or nil when the
// replacement allocation is running.
// Use a buffered channel to prevent both goroutine from blocking trying to

View File

@@ -259,8 +259,19 @@ func TestJobRestartCommand_Run(t *testing.T) {
SetConfig("run_for", "1m").
SetConfig("exit_code", 0)
jobID := "test_job_restart_cmd"
job := api.NewServiceJob(jobID, jobID, "global", 1).
batchJob := api.NewBatchJob("test_job_batch", "test_job_batch", "global", 1).
AddDatacenter("dc1").
AddTaskGroup(
api.NewTaskGroup("single_task", 3).
AddTask(mainTask),
).
AddTaskGroup(
api.NewTaskGroup("multiple_tasks", 2).
AddTask(prestartTask).
AddTask(sidecarTask).
AddTask(mainTask),
)
serviceJob := api.NewServiceJob("test_job_service", "test_job_service", "global", 1).
AddDatacenter("dc1").
AddTaskGroup(
api.NewTaskGroup("single_task", 3).
@@ -613,9 +624,10 @@ func TestJobRestartCommand_Run(t *testing.T) {
},
}
for _, job := range []*api.Job{batchJob, serviceJob} {
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Run(fmt.Sprintf("%s/%s", *job.Type, tc.name), func(t *testing.T) {
// Run each test case in parallel because they are fairly slow.
ci.Parallel(t)
@@ -636,9 +648,13 @@ func TestJobRestartCommand_Run(t *testing.T) {
must.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
allocStubs, _, err := client.Jobs().Allocations(jobID, true, nil)
allocStubs, _, err := client.Jobs().Allocations(*job.ID, true, nil)
must.NoError(t, err)
for _, alloc := range allocStubs {
waitForAllocRunning(t, client, alloc.ID)
@@ -646,13 +662,13 @@ func TestJobRestartCommand_Run(t *testing.T) {
// Fetch allocations before the restart so we know which ones are
// supposed to be affected in case the test reschedules allocs.
allocStubs, _, err = client.Jobs().Allocations(jobID, true, nil)
allocStubs, _, err = client.Jobs().Allocations(*job.ID, true, nil)
must.NoError(t, err)
// Prepend server URL and append job ID to the test case command.
args := []string{"-address", url, "-yes"}
args = append(args, tc.args...)
args = append(args, jobID)
args = append(args, *job.ID)
// Run job restart command.
code = cmd.Run(args)
@@ -664,6 +680,141 @@ func TestJobRestartCommand_Run(t *testing.T) {
}
})
}
}
}
func TestJobRestartCommand_Run_system_reschedule(t *testing.T) {
ci.Parallel(t)
// Create a system job.
job := api.NewSystemJob("test_job", "test_job", "global", 100).
AddDatacenter("dc1").
AddTaskGroup(
api.NewTaskGroup("group", 1).
AddTask(
api.NewTask("task", "mock_driver").
SetConfig("run_for", "1m").
SetConfig("exit_code", 0),
),
)
// Start a server and 3 clients.
srv, client, url := testServer(t, false, nil)
defer srv.Shutdown()
srvRPCAddr := srv.GetConfig().AdvertiseAddrs.RPC
testClient(t, "client1", newClientAgentConfigFunc("", "", srvRPCAddr))
testClient(t, "client2", newClientAgentConfigFunc("", "", srvRPCAddr))
testClient(t, "client3", newClientAgentConfigFunc("", "", srvRPCAddr))
waitForNodes(t, client)
// Initialize UI and command.
ui := cli.NewMockUi()
cmd := &JobRestartCommand{Meta: Meta{Ui: ui}}
// Register test job and wait for its allocs to be running.
resp, _, err := client.Jobs().Register(job, nil)
must.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
allocStubs, _, err := client.Jobs().Allocations(*job.ID, true, nil)
must.NoError(t, err)
for _, alloc := range allocStubs {
waitForAllocRunning(t, client, alloc.ID)
}
// Run job restart command.
args := []string{"-address", url, "-yes", "-verbose", "-reschedule", *job.ID}
code = cmd.Run(args)
must.Eq(t, code, 0)
reschedules := map[string]bool{}
for _, alloc := range allocStubs {
reschedules[alloc.ID] = true
}
waitAllocsRescheduled(t, client, reschedules)
// Check that allocations were rescheduled properly.
stdout := ui.OutputWriter.String()
must.StrContains(t, stdout, "Restarting 3 allocations")
for _, alloc := range allocStubs {
must.StrContains(t, stdout, fmt.Sprintf(`Rescheduling allocation "%s"`, alloc.ID))
must.StrContains(t, stdout, fmt.Sprintf(`Allocation "%s" replaced by`, alloc.ID))
}
}
func TestJobRestartCommand_Run_rescheduleNotSupported(t *testing.T) {
ci.Parallel(t)
// Create a sysbatch job.
sysbatchJob := api.NewSysbatchJob("test_sysbatch_job", "test_sysbatch_job", "global", 100).
AddDatacenter("dc1").
AddTaskGroup(
api.NewTaskGroup("group", 1).
AddTask(
api.NewTask("task", "mock_driver").
SetConfig("run_for", "1m").
SetConfig("exit_code", 0),
),
)
// Start a server and a client.
srv, client, url := testServer(t, false, nil)
defer srv.Shutdown()
srvRPCAddr := srv.GetConfig().AdvertiseAddrs.RPC
testClient(t, "client1", newClientAgentConfigFunc("", "", srvRPCAddr))
waitForNodes(t, client)
testCases := []struct {
name string
job *api.Job
}{
{
name: "sysbatch job",
job: sysbatchJob,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Initialize UI and command.
ui := cli.NewMockUi()
cmd := &JobRestartCommand{Meta: Meta{Ui: ui}}
// Register test job and wait for its allocs to be running.
resp, _, err := client.Jobs().Register(tc.job, nil)
must.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
allocStubs, _, err := client.Jobs().Allocations(*tc.job.ID, true, nil)
must.NoError(t, err)
for _, alloc := range allocStubs {
waitForAllocRunning(t, client, alloc.ID)
}
// Run job restart command and expect error.
args := []string{"-address", url, "-yes", "-verbose", "-reschedule", *tc.job.ID}
code = cmd.Run(args)
must.Eq(t, code, 1)
stderr := ui.ErrorWriter.String()
must.StrContains(t, stderr, "not allowed to be rescheduled")
})
}
}
func TestJobRestartCommand_jobPrefixAndNamespace(t *testing.T) {
@@ -752,10 +903,18 @@ func TestJobRestartCommand_jobPrefixAndNamespace(t *testing.T) {
code := cmd.Run(args)
if tc.expectedErr != "" {
must.NonZero(t, code)
must.NonZero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
must.StrContains(t, ui.ErrorWriter.String(), tc.expectedErr)
} else {
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
}
})
}
@@ -791,7 +950,11 @@ func TestJobRestartCommand_noAllocs(t *testing.T) {
"-yes",
jobID,
})
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
must.StrContains(t, ui.OutputWriter.String(), "No allocations to restart")
}
@@ -810,13 +973,19 @@ func TestJobRestartCommand_rescheduleFail(t *testing.T) {
// Register test job with 3 allocs.
jobID := "test_job_restart_reschedule_fail"
job := testJob(jobID)
job.Type = pointer.Of(api.JobTypeService)
job.TaskGroups[0].Count = pointer.Of(3)
job.TaskGroups[0].Tasks[0].Config = map[string]any{"run_for": "10m"}
resp, _, err := client.Jobs().Register(job, nil)
must.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
ui.OutputWriter.Reset()
// Wait for allocs to be running.
@@ -863,7 +1032,11 @@ func TestJobRestartCommand_monitorReplacementAlloc(t *testing.T) {
must.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
}
ui.OutputWriter.Reset()
@@ -1076,9 +1249,17 @@ namespace "default" {
// Run command.
code := cmd.Run(args)
if tc.expectedErr == "" {
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
} else {
must.One(t, code)
must.One(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
must.StrContains(t, ui.ErrorWriter.String(), tc.expectedErr)
}
})
@@ -1122,8 +1303,9 @@ func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) {
jobID := nonAlphaNum.ReplaceAllString(tc.name, "-")
job := testJob(jobID)
job.Type = pointer.Of(api.JobTypeService)
job.TaskGroups[0].Count = pointer.Of(2)
job.TaskGroups[0].Tasks[0].Config["run_for"] = "10m"
job.TaskGroups[0].Tasks[0].Config = map[string]any{"run_for": "10m"}
job.TaskGroups[0].Tasks[0].ShutdownDelay = shutdownDelay
job.TaskGroups[0].Tasks[0].Services = []*api.Service{{
Name: "service",
@@ -1134,7 +1316,11 @@ func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) {
must.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout:\n%s\n\nstderr:\n%s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
ui.OutputWriter.Reset()
// Wait for alloc to be running.
@@ -1155,7 +1341,11 @@ func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) {
args = append(args, jobID)
code = cmd.Run(args)
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout:\n%s\n\nstderr:\n%s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
// Wait for all allocs to restart.
reschedules := map[string]bool{}
@@ -1380,7 +1570,11 @@ func TestJobRestartCommand_filterAllocs(t *testing.T) {
args := append(tc.args, "-verbose", "-yes", "example")
code, err := cmd.parseAndValidate(args)
must.NoError(t, err)
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
got := cmd.filterAllocs(allAllocs)
must.SliceEqFunc(t, tc.expectedAllocs, got, func(a, b AllocationListStubWithJob) bool {
@@ -1423,7 +1617,11 @@ func TestJobRestartCommand_onErrorFail(t *testing.T) {
must.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
ui.OutputWriter.Reset()
// Create a proxy to inject an error after 2 allocation restarts.

View File

@@ -86,7 +86,8 @@ of the exact job ID.
restarted in-place. Since the group is not modified the restart does not
create a new deployment, and so values defined in [`update`][] blocks, such
as [`max_parallel`][], are not taken into account. This option cannot be used
with `-task`.
with `-task`. Only jobs of type `batch`, `service`, and `system` can be
rescheduled.
- `-on-error=<ask|fail>`: Determines what action to take when an error happens
during a restart batch. If `ask` the command stops and waits for user