diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index cda59456b..bd4d9b938 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2981,6 +2981,8 @@ func (t *Task) Copy() *Task { if i, err := copystructure.Copy(nt.Config); err != nil { nt.Config = i.(map[string]interface{}) + } else { + panic(err.Error()) } if t.Templates != nil { diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 7e84b35e5..ae80fac50 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -438,6 +438,9 @@ func (s *GenericScheduler) computeJobAllocs() error { for _, place := range results.place { s.queuedAllocs[place.taskGroup.Name] += 1 } + for _, destructive := range results.destructiveUpdate { + s.queuedAllocs[destructive.placeTaskGroup.Name] += 1 + } // Compute the placements place := make([]placementResult, 0, len(results.place)+len(results.destructiveUpdate)) @@ -482,6 +485,15 @@ func (s *GenericScheduler) computePlacements(place []placementResult) error { return err } + // Check if we should stop the previous allocation upon successful + // placement of its replacement. This allow atomic placements/stops. We + // stop the allocation before trying to find a replacement because this + // frees the resources currently used by the previous allocation. + stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc() + if stopPrevAlloc { + s.plan.AppendUpdate(missing.PreviousAllocation(), structs.AllocDesiredStatusStop, stopPrevAllocDesc, "") + } + // Attempt to match the task group var option *RankedNode if preferredNode != nil { @@ -531,19 +543,20 @@ func (s *GenericScheduler) computePlacements(place []placementResult) error { // Track the placement s.plan.AppendAlloc(alloc) - // Since we have placed check to see if we should stop any previous - // allocation - if stop, desc := missing.StopPreviousAlloc(); stop { - s.plan.AppendUpdate(missing.PreviousAllocation(), structs.AllocDesiredStatusStop, desc, "") - } - } else { // Lazy initialize the failed map if s.failedTGAllocs == nil { s.failedTGAllocs = make(map[string]*structs.AllocMetric) } + // Track the fact that we didn't find a placement s.failedTGAllocs[tg.Name] = s.ctx.Metrics() + + // If we weren't able to find a replacement for the allocation, back + // out the fact that we asked to stop the allocation. + if stopPrevAlloc { + s.plan.PopUpdate(missing.PreviousAllocation()) + } } } diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 37ad32f79..031353cf7 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -1481,6 +1481,110 @@ func TestServiceSched_JobModify_Rolling(t *testing.T) { } } +func TestJob_CanCopy(t *testing.T) { + job := mock.Job() + job.Copy() +} + +// This tests that the old allocation is stopped before placing. +func TestServiceSched_JobModify_Rolling_FullNode(t *testing.T) { + h := NewHarness(t) + + // Create a node + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + resourceAsk := node.Resources.Copy() + resourceAsk.CPU -= node.Reserved.CPU + resourceAsk.MemoryMB -= node.Reserved.MemoryMB + resourceAsk.DiskMB -= node.Reserved.DiskMB + resourceAsk.Networks = nil + + // Generate a fake job with one alloc that consumes the whole node + job := mock.Job() + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].Tasks[0].Resources = resourceAsk + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + alloc := mock.Alloc() + alloc.Resources = resourceAsk + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) + + // Update the job + job2 := job.Copy() + job2.TaskGroups[0].Update = &structs.UpdateStrategy{ + MaxParallel: 1, + HealthCheck: structs.UpdateStrategyHealthCheck_Checks, + MinHealthyTime: 10 * time.Second, + HealthyDeadline: 10 * time.Minute, + } + + // Update the task, such that it cannot be done in-place + job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted only MaxParallel + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + if len(update) != 1 { + t.Fatalf("bad: got %d; want %d: %#v", len(update), 1, plan) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 1 { + t.Fatalf("bad: %#v", plan) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) + + // Check that the deployment id is attached to the eval + if h.Evals[0].DeploymentID == "" { + t.Fatalf("Eval not annotated with deployment id") + } + + // Ensure a deployment was created + if plan.Deployment == nil { + t.Fatalf("bad: %#v", plan) + } + state, ok := plan.Deployment.TaskGroups[job.TaskGroups[0].Name] + if !ok { + t.Fatalf("bad: %#v", plan) + } + if state.DesiredTotal != 1 && state.DesiredCanaries != 0 { + t.Fatalf("bad: %#v", state) + } +} + func TestServiceSched_JobModify_Canaries(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/util.go b/scheduler/util.go index e36df13b6..841ffcad2 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -698,25 +698,27 @@ func desiredUpdates(diff *diffResult, inplaceUpdates, // adjustQueuedAllocations decrements the number of allocations pending per task // group based on the number of allocations successfully placed func adjustQueuedAllocations(logger *log.Logger, result *structs.PlanResult, queuedAllocs map[string]int) { - if result != nil { - for _, allocations := range result.NodeAllocation { - for _, allocation := range allocations { - // Ensure that the allocation is newly created. We check that - // the CreateIndex is equal to the ModifyIndex in order to check - // that the allocation was just created. We do not check that - // the CreateIndex is equal to the results AllocIndex because - // the allocations we get back have gone through the planner's - // optimistic snapshot and thus their indexes may not be - // correct, but they will be consistent. - if allocation.CreateIndex != allocation.ModifyIndex { - continue - } + if result == nil { + return + } - if _, ok := queuedAllocs[allocation.TaskGroup]; ok { - queuedAllocs[allocation.TaskGroup] -= 1 - } else { - logger.Printf("[ERR] sched: allocation %q placed but not in list of unplaced allocations", allocation.TaskGroup) - } + for _, allocations := range result.NodeAllocation { + for _, allocation := range allocations { + // Ensure that the allocation is newly created. We check that + // the CreateIndex is equal to the ModifyIndex in order to check + // that the allocation was just created. We do not check that + // the CreateIndex is equal to the results AllocIndex because + // the allocations we get back have gone through the planner's + // optimistic snapshot and thus their indexes may not be + // correct, but they will be consistent. + if allocation.CreateIndex != allocation.ModifyIndex { + continue + } + + if _, ok := queuedAllocs[allocation.TaskGroup]; ok { + queuedAllocs[allocation.TaskGroup] -= 1 + } else { + logger.Printf("[ERR] sched: allocation %q placed but not in list of unplaced allocations", allocation.TaskGroup) } } }