mirror of
https://github.com/kemko/nomad.git
synced 2026-01-10 12:25:42 +03:00
Merge pull request #1516 from hashicorp/f-lost-state-sched
Make scheduler mark allocations as lost
This commit is contained in:
@@ -29,7 +29,14 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
|
||||
alloc.EvalID = eval.ID
|
||||
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
alloc.JobID = eval.JobID
|
||||
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc})
|
||||
|
||||
// Insert "lost" alloc
|
||||
alloc2 := mock.Alloc()
|
||||
alloc2.EvalID = eval.ID
|
||||
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
||||
alloc2.ClientStatus = structs.AllocClientStatusLost
|
||||
alloc2.JobID = eval.JobID
|
||||
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc, alloc2})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -68,6 +75,14 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
|
||||
if outA != nil {
|
||||
t.Fatalf("bad: %v", outA)
|
||||
}
|
||||
|
||||
outA2, err := state.AllocByID(alloc2.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if outA2 != nil {
|
||||
t.Fatalf("bad: %v", outA2)
|
||||
}
|
||||
}
|
||||
|
||||
// An EvalGC should never reap a batch job
|
||||
@@ -101,7 +116,15 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
|
||||
alloc.JobID = job.ID
|
||||
alloc.EvalID = eval.ID
|
||||
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
|
||||
|
||||
// Insert "lost" alloc
|
||||
alloc2 := mock.Alloc()
|
||||
alloc2.JobID = job.ID
|
||||
alloc2.EvalID = eval.ID
|
||||
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
||||
alloc2.ClientStatus = structs.AllocClientStatusLost
|
||||
|
||||
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc2})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -141,6 +164,14 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
|
||||
t.Fatalf("bad: %v", outA)
|
||||
}
|
||||
|
||||
outA2, err := state.AllocByID(alloc2.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if outA2 == nil {
|
||||
t.Fatalf("bad: %v", outA2)
|
||||
}
|
||||
|
||||
outB, err := state.JobByID(job.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
@@ -170,16 +201,24 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
|
||||
alloc.EvalID = eval.ID
|
||||
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
state.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID))
|
||||
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
|
||||
|
||||
// Insert "lost" alloc
|
||||
alloc2 := mock.Alloc()
|
||||
alloc2.JobID = alloc.JobID
|
||||
alloc2.EvalID = eval.ID
|
||||
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
||||
alloc2.ClientStatus = structs.AllocClientStatusLost
|
||||
|
||||
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc2})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Insert "running" alloc
|
||||
alloc2 := mock.Alloc()
|
||||
alloc2.EvalID = eval.ID
|
||||
state.UpsertJobSummary(1003, mock.JobSummary(alloc2.JobID))
|
||||
err = state.UpsertAllocs(1004, []*structs.Allocation{alloc2})
|
||||
alloc3 := mock.Alloc()
|
||||
alloc3.EvalID = eval.ID
|
||||
state.UpsertJobSummary(1003, mock.JobSummary(alloc3.JobID))
|
||||
err = state.UpsertAllocs(1004, []*structs.Allocation{alloc3})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -211,7 +250,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
|
||||
outA, err := state.AllocByID(alloc2.ID)
|
||||
outA, err := state.AllocByID(alloc3.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -227,6 +266,14 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
|
||||
if outB != nil {
|
||||
t.Fatalf("bad: %v", outB)
|
||||
}
|
||||
|
||||
outC, err := state.AllocByID(alloc2.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if outC != nil {
|
||||
t.Fatalf("bad: %v", outC)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoreScheduler_EvalGC_Force(t *testing.T) {
|
||||
|
||||
@@ -228,30 +228,6 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
// Update the state of the allocations which are in running state to lost
|
||||
if status == structs.NodeStatusDown {
|
||||
allocs, err := s.AllocsByNode(nodeID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving any allocations for the node: %v", nodeID)
|
||||
}
|
||||
for _, alloc := range allocs {
|
||||
copyAlloc := alloc.Copy()
|
||||
if alloc.ClientStatus == structs.AllocClientStatusPending ||
|
||||
alloc.ClientStatus == structs.AllocClientStatusRunning {
|
||||
copyAlloc.ClientStatus = structs.AllocClientStatusLost
|
||||
|
||||
// Updating the summary since we are changing the state of the
|
||||
// allocation to lost
|
||||
if err := s.updateSummaryWithAlloc(index, copyAlloc, alloc, watcher, txn); err != nil {
|
||||
return fmt.Errorf("error updating job summary: %v", err)
|
||||
}
|
||||
if err := txn.Insert("allocs", copyAlloc); err != nil {
|
||||
return fmt.Errorf("alloc insert failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
txn.Defer(func() { s.watch.notify(watcher) })
|
||||
txn.Commit()
|
||||
return nil
|
||||
@@ -963,8 +939,13 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
|
||||
alloc.CreateIndex = exist.CreateIndex
|
||||
alloc.ModifyIndex = index
|
||||
alloc.AllocModifyIndex = index
|
||||
alloc.ClientStatus = exist.ClientStatus
|
||||
alloc.ClientDescription = exist.ClientDescription
|
||||
|
||||
// If the scheduler is marking this allocation as lost we do not
|
||||
// want to reuse the status of the existing allocation.
|
||||
if alloc.ClientStatus != structs.AllocClientStatusLost {
|
||||
alloc.ClientStatus = exist.ClientStatus
|
||||
alloc.ClientDescription = exist.ClientDescription
|
||||
}
|
||||
|
||||
// The job has been denormalized so re-attach the original job
|
||||
if alloc.Job == nil {
|
||||
|
||||
@@ -136,114 +136,6 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
|
||||
t.Fatalf("bad: %d", index)
|
||||
}
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc1 := mock.Alloc()
|
||||
alloc2 := mock.Alloc()
|
||||
alloc.NodeID = node.ID
|
||||
alloc1.NodeID = node.ID
|
||||
alloc2.NodeID = node.ID
|
||||
alloc.ClientStatus = structs.AllocClientStatusPending
|
||||
alloc1.ClientStatus = structs.AllocClientStatusPending
|
||||
alloc2.ClientStatus = structs.AllocClientStatusPending
|
||||
|
||||
if err := state.UpsertJob(850, alloc.Job); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := state.UpsertJob(851, alloc1.Job); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := state.UpsertJob(852, alloc2.Job); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc1, alloc2}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Change the state of the allocs to running and failed
|
||||
newAlloc := alloc.Copy()
|
||||
newAlloc.ClientStatus = structs.AllocClientStatusRunning
|
||||
|
||||
newAlloc1 := alloc1.Copy()
|
||||
newAlloc1.ClientStatus = structs.AllocClientStatusFailed
|
||||
|
||||
if err = state.UpdateAllocsFromClient(1003, []*structs.Allocation{newAlloc, newAlloc1}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Change the state of the node to down
|
||||
if err = state.UpdateNodeStatus(1004, node.ID, structs.NodeStatusDown); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
allocOut, err := state.AllocByID(alloc.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if allocOut.ClientStatus != structs.AllocClientStatusLost {
|
||||
t.Fatalf("expected alloc status: %v, actual: %v", structs.AllocClientStatusLost, allocOut.ClientStatus)
|
||||
}
|
||||
|
||||
alloc1Out, err := state.AllocByID(alloc1.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if alloc1Out.ClientStatus != structs.AllocClientStatusFailed {
|
||||
t.Fatalf("expected alloc status: %v, actual: %v", structs.AllocClientStatusFailed, alloc1Out.ClientStatus)
|
||||
}
|
||||
|
||||
alloc2Out, err := state.AllocByID(alloc2.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if alloc2Out.ClientStatus != structs.AllocClientStatusLost {
|
||||
t.Fatalf("expected alloc status: %v, actual: %v", structs.AllocClientStatusLost, alloc2Out.ClientStatus)
|
||||
}
|
||||
|
||||
js1, _ := state.JobSummaryByID(alloc.JobID)
|
||||
js2, _ := state.JobSummaryByID(alloc1.JobID)
|
||||
js3, _ := state.JobSummaryByID(alloc2.JobID)
|
||||
|
||||
expectedSummary1 := structs.JobSummary{
|
||||
JobID: alloc.JobID,
|
||||
Summary: map[string]structs.TaskGroupSummary{
|
||||
"web": structs.TaskGroupSummary{
|
||||
Lost: 1,
|
||||
},
|
||||
},
|
||||
CreateIndex: 850,
|
||||
ModifyIndex: 1004,
|
||||
}
|
||||
expectedSummary2 := structs.JobSummary{
|
||||
JobID: alloc1.JobID,
|
||||
Summary: map[string]structs.TaskGroupSummary{
|
||||
"web": structs.TaskGroupSummary{
|
||||
Failed: 1,
|
||||
},
|
||||
},
|
||||
CreateIndex: 851,
|
||||
ModifyIndex: 1003,
|
||||
}
|
||||
expectedSummary3 := structs.JobSummary{
|
||||
JobID: alloc2.JobID,
|
||||
Summary: map[string]structs.TaskGroupSummary{
|
||||
"web": structs.TaskGroupSummary{
|
||||
Lost: 1,
|
||||
},
|
||||
},
|
||||
CreateIndex: 852,
|
||||
ModifyIndex: 1004,
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(js1, &expectedSummary1) {
|
||||
t.Fatalf("expected: %v, got: %v", expectedSummary1, js1)
|
||||
}
|
||||
if !reflect.DeepEqual(js2, &expectedSummary2) {
|
||||
t.Fatalf("expected: %v, got: %#v", expectedSummary2, js2)
|
||||
}
|
||||
if !reflect.DeepEqual(js3, &expectedSummary3) {
|
||||
t.Fatalf("expected: %v, got: %v", expectedSummary3, js3)
|
||||
}
|
||||
|
||||
notify.verify(t)
|
||||
}
|
||||
|
||||
@@ -1898,6 +1790,39 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
|
||||
notify.verify(t)
|
||||
}
|
||||
|
||||
// This test ensures that the state store will mark the clients status as lost
|
||||
// when set rather than preferring the existing status.
|
||||
func TestStateStore_UpdateAlloc_Lost(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
alloc := mock.Alloc()
|
||||
alloc.ClientStatus = "foo"
|
||||
|
||||
if err := state.UpsertJob(999, alloc.Job); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
alloc2 := new(structs.Allocation)
|
||||
*alloc2 = *alloc
|
||||
alloc2.ClientStatus = structs.AllocClientStatusLost
|
||||
if err := state.UpsertAllocs(1001, []*structs.Allocation{alloc2}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
out, err := state.AllocByID(alloc2.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if out.ClientStatus != structs.AllocClientStatusLost {
|
||||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
}
|
||||
|
||||
// This test ensures an allocation can be updated when there is no job
|
||||
// associated with it. This will happen when a job is stopped by an user which
|
||||
// has non-terminal allocations on clients
|
||||
|
||||
@@ -2537,7 +2537,7 @@ func (a *Allocation) TerminalStatus() bool {
|
||||
}
|
||||
|
||||
switch a.ClientStatus {
|
||||
case AllocClientStatusComplete, AllocClientStatusFailed:
|
||||
case AllocClientStatusComplete, AllocClientStatusFailed, AllocClientStatusLost:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
@@ -3022,7 +3022,9 @@ type Plan struct {
|
||||
Annotations *PlanAnnotations
|
||||
}
|
||||
|
||||
func (p *Plan) AppendUpdate(alloc *Allocation, status, desc string) {
|
||||
// AppendUpdate marks the allocation for eviction. The clientStatus of the
|
||||
// allocation may be optionally set by passing in a non-empty value.
|
||||
func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clientStatus string) {
|
||||
newAlloc := new(Allocation)
|
||||
*newAlloc = *alloc
|
||||
|
||||
@@ -3038,8 +3040,13 @@ func (p *Plan) AppendUpdate(alloc *Allocation, status, desc string) {
|
||||
// Strip the resources as it can be rebuilt.
|
||||
newAlloc.Resources = nil
|
||||
|
||||
newAlloc.DesiredStatus = status
|
||||
newAlloc.DesiredDescription = desc
|
||||
newAlloc.DesiredStatus = desiredStatus
|
||||
newAlloc.DesiredDescription = desiredDesc
|
||||
|
||||
if clientStatus != "" {
|
||||
newAlloc.ClientStatus = clientStatus
|
||||
}
|
||||
|
||||
node := alloc.NodeID
|
||||
existing := p.NodeUpdate[node]
|
||||
p.NodeUpdate[node] = append(existing, newAlloc)
|
||||
|
||||
@@ -26,6 +26,9 @@ const (
|
||||
// allocUpdating is the status used when a job requires an update
|
||||
allocUpdating = "alloc is being updated due to job update"
|
||||
|
||||
// allocLost is the status used when an allocation is lost
|
||||
allocLost = "alloc is lost since its node is down"
|
||||
|
||||
// allocInPlace is the status used when speculating on an in-place update
|
||||
allocInPlace = "alloc updating in-place"
|
||||
|
||||
@@ -362,7 +365,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
|
||||
|
||||
// Add all the allocs to stop
|
||||
for _, e := range diff.stop {
|
||||
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded)
|
||||
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded, "")
|
||||
}
|
||||
|
||||
// Attempt to do the upgrades in place
|
||||
@@ -376,7 +379,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
|
||||
}
|
||||
|
||||
// Check if a rolling upgrade strategy is being used
|
||||
limit := len(diff.update) + len(diff.migrate)
|
||||
limit := len(diff.update) + len(diff.migrate) + len(diff.lost)
|
||||
if s.job != nil && s.job.Update.Rolling() {
|
||||
limit = s.job.Update.MaxParallel
|
||||
}
|
||||
@@ -387,6 +390,10 @@ func (s *GenericScheduler) computeJobAllocs() error {
|
||||
// Treat non in-place updates as an eviction and new placement.
|
||||
s.limitReached = s.limitReached || evictAndPlace(s.ctx, diff, diff.update, allocUpdating, &limit)
|
||||
|
||||
// Lost allocations should be transistioned to desired status stop and client
|
||||
// status lost and a new placement should be made
|
||||
s.limitReached = s.limitReached || markLostAndPlace(s.ctx, diff, diff.lost, allocLost, &limit)
|
||||
|
||||
// Nothing remaining to do if placement is not required
|
||||
if len(diff.place) == 0 {
|
||||
if s.job != nil {
|
||||
|
||||
@@ -1253,6 +1253,15 @@ func TestServiceSched_NodeDown(t *testing.T) {
|
||||
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
|
||||
allocs = append(allocs, alloc)
|
||||
}
|
||||
|
||||
// Cover each terminal case and ensure it doesn't change to lost
|
||||
allocs[7].DesiredStatus = structs.AllocDesiredStatusRun
|
||||
allocs[7].ClientStatus = structs.AllocClientStatusLost
|
||||
allocs[8].DesiredStatus = structs.AllocDesiredStatusRun
|
||||
allocs[8].ClientStatus = structs.AllocClientStatusFailed
|
||||
allocs[9].DesiredStatus = structs.AllocDesiredStatusRun
|
||||
allocs[9].ClientStatus = structs.AllocClientStatusComplete
|
||||
|
||||
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
|
||||
|
||||
// Mark some allocs as running
|
||||
@@ -1280,13 +1289,19 @@ func TestServiceSched_NodeDown(t *testing.T) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Test the corretness of the old allocation states
|
||||
for _, alloc := range allocs {
|
||||
out, err := h.State.AllocByID(alloc.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out.ClientStatus != structs.AllocClientStatusLost || out.DesiredStatus != structs.AllocDesiredStatusStop {
|
||||
// Ensure a single plan
|
||||
if len(h.Plans) != 1 {
|
||||
t.Fatalf("bad: %#v", h.Plans)
|
||||
}
|
||||
plan := h.Plans[0]
|
||||
|
||||
// Test the scheduler marked all non-terminal allocations as lost
|
||||
if len(plan.NodeUpdate[node.ID]) != 7 {
|
||||
t.Fatalf("bad: %#v", plan)
|
||||
}
|
||||
|
||||
for _, out := range plan.NodeUpdate[node.ID] {
|
||||
if out.ClientStatus != structs.AllocClientStatusLost && out.DesiredStatus != structs.AllocDesiredStatusStop {
|
||||
t.Fatalf("bad alloc: %#v", out)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -200,7 +200,13 @@ func (s *SystemScheduler) computeJobAllocs() error {
|
||||
|
||||
// Add all the allocs to stop
|
||||
for _, e := range diff.stop {
|
||||
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded)
|
||||
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded, "")
|
||||
}
|
||||
|
||||
// Lost allocations should be transistioned to desired status stop and client
|
||||
// status lost.
|
||||
for _, e := range diff.lost {
|
||||
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocLost, structs.AllocClientStatusLost)
|
||||
}
|
||||
|
||||
// Attempt to do the upgrades in place
|
||||
|
||||
@@ -689,6 +689,69 @@ func TestSystemSched_JobDeregister(t *testing.T) {
|
||||
h.AssertEvalStatus(t, structs.EvalStatusComplete)
|
||||
}
|
||||
|
||||
func TestSystemSched_NodeDown(t *testing.T) {
|
||||
h := NewHarness(t)
|
||||
|
||||
// Register a down node
|
||||
node := mock.Node()
|
||||
node.Status = structs.NodeStatusDown
|
||||
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
|
||||
|
||||
// Generate a fake job allocated on that node.
|
||||
job := mock.SystemJob()
|
||||
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.Job = job
|
||||
alloc.JobID = job.ID
|
||||
alloc.NodeID = node.ID
|
||||
alloc.Name = "my-job.web[0]"
|
||||
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
|
||||
|
||||
// Create a mock evaluation to deal with drain
|
||||
eval := &structs.Evaluation{
|
||||
ID: structs.GenerateUUID(),
|
||||
Priority: 50,
|
||||
TriggeredBy: structs.EvalTriggerNodeUpdate,
|
||||
JobID: job.ID,
|
||||
NodeID: node.ID,
|
||||
}
|
||||
|
||||
// Process the evaluation
|
||||
err := h.Process(NewSystemScheduler, eval)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure a single plan
|
||||
if len(h.Plans) != 1 {
|
||||
t.Fatalf("bad: %#v", h.Plans)
|
||||
}
|
||||
plan := h.Plans[0]
|
||||
|
||||
// Ensure the plan evicted all allocs
|
||||
if len(plan.NodeUpdate[node.ID]) != 1 {
|
||||
t.Fatalf("bad: %#v", plan)
|
||||
}
|
||||
|
||||
// Ensure the plan updated the allocation.
|
||||
var planned []*structs.Allocation
|
||||
for _, allocList := range plan.NodeUpdate {
|
||||
planned = append(planned, allocList...)
|
||||
}
|
||||
if len(planned) != 1 {
|
||||
t.Fatalf("bad: %#v", plan)
|
||||
}
|
||||
|
||||
// Ensure the allocations is stopped
|
||||
if p := planned[0]; p.DesiredStatus != structs.AllocDesiredStatusStop &&
|
||||
p.ClientStatus != structs.AllocClientStatusLost {
|
||||
t.Fatalf("bad: %#v", planned[0])
|
||||
}
|
||||
|
||||
h.AssertEvalStatus(t, structs.EvalStatusComplete)
|
||||
}
|
||||
|
||||
func TestSystemSched_NodeDrain(t *testing.T) {
|
||||
h := NewHarness(t)
|
||||
|
||||
@@ -744,13 +807,9 @@ func TestSystemSched_NodeDrain(t *testing.T) {
|
||||
t.Fatalf("bad: %#v", plan)
|
||||
}
|
||||
|
||||
// Lookup the allocations by JobID
|
||||
out, err := h.State.AllocsByJob(job.ID)
|
||||
noErr(t, err)
|
||||
|
||||
// Ensure the allocations is stopped
|
||||
if planned[0].DesiredStatus != structs.AllocDesiredStatusStop {
|
||||
t.Fatalf("bad: %#v", out)
|
||||
t.Fatalf("bad: %#v", planned[0])
|
||||
}
|
||||
|
||||
h.AssertEvalStatus(t, structs.EvalStatusComplete)
|
||||
|
||||
@@ -35,12 +35,12 @@ func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup {
|
||||
|
||||
// diffResult is used to return the sets that result from the diff
|
||||
type diffResult struct {
|
||||
place, update, migrate, stop, ignore []allocTuple
|
||||
place, update, migrate, stop, ignore, lost []allocTuple
|
||||
}
|
||||
|
||||
func (d *diffResult) GoString() string {
|
||||
return fmt.Sprintf("allocs: (place %d) (update %d) (migrate %d) (stop %d) (ignore %d)",
|
||||
len(d.place), len(d.update), len(d.migrate), len(d.stop), len(d.ignore))
|
||||
return fmt.Sprintf("allocs: (place %d) (update %d) (migrate %d) (stop %d) (ignore %d) (lost %d)",
|
||||
len(d.place), len(d.update), len(d.migrate), len(d.stop), len(d.ignore), len(d.lost))
|
||||
}
|
||||
|
||||
func (d *diffResult) Append(other *diffResult) {
|
||||
@@ -49,15 +49,17 @@ func (d *diffResult) Append(other *diffResult) {
|
||||
d.migrate = append(d.migrate, other.migrate...)
|
||||
d.stop = append(d.stop, other.stop...)
|
||||
d.ignore = append(d.ignore, other.ignore...)
|
||||
d.lost = append(d.lost, other.lost...)
|
||||
}
|
||||
|
||||
// diffAllocs is used to do a set difference between the target allocations
|
||||
// and the existing allocations. This returns 5 sets of results, the list of
|
||||
// and the existing allocations. This returns 6 sets of results, the list of
|
||||
// named task groups that need to be placed (no existing allocation), the
|
||||
// allocations that need to be updated (job definition is newer), allocs that
|
||||
// need to be migrated (node is draining), the allocs that need to be evicted
|
||||
// (no longer required), and those that should be ignored.
|
||||
func diffAllocs(job *structs.Job, taintedNodes map[string]bool,
|
||||
// (no longer required), those that should be ignored and those that are lost
|
||||
// that need to be replaced (running on a lost node).
|
||||
func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,
|
||||
required map[string]*structs.TaskGroup, allocs []*structs.Allocation) *diffResult {
|
||||
result := &diffResult{}
|
||||
|
||||
@@ -83,20 +85,30 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool,
|
||||
|
||||
// If we are on a tainted node, we must migrate if we are a service or
|
||||
// if the batch allocation did not finish
|
||||
if taintedNodes[exist.NodeID] {
|
||||
if node, ok := taintedNodes[exist.NodeID]; ok {
|
||||
// If the job is batch and finished successfully, the fact that the
|
||||
// node is tainted does not mean it should be migrated as the work
|
||||
// was already successfully finished. However for service/system
|
||||
// jobs, tasks should never complete. The check of batch type,
|
||||
// defends against client bugs.
|
||||
// node is tainted does not mean it should be migrated or marked as
|
||||
// lost as the work was already successfully finished. However for
|
||||
// service/system jobs, tasks should never complete. The check of
|
||||
// batch type, defends against client bugs.
|
||||
if exist.Job.Type == structs.JobTypeBatch && exist.RanSuccessfully() {
|
||||
goto IGNORE
|
||||
}
|
||||
result.migrate = append(result.migrate, allocTuple{
|
||||
Name: name,
|
||||
TaskGroup: tg,
|
||||
Alloc: exist,
|
||||
})
|
||||
|
||||
if node == nil || node.TerminalStatus() {
|
||||
result.lost = append(result.lost, allocTuple{
|
||||
Name: name,
|
||||
TaskGroup: tg,
|
||||
Alloc: exist,
|
||||
})
|
||||
} else {
|
||||
// This is the drain case
|
||||
result.migrate = append(result.migrate, allocTuple{
|
||||
Name: name,
|
||||
TaskGroup: tg,
|
||||
Alloc: exist,
|
||||
})
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -139,7 +151,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool,
|
||||
|
||||
// diffSystemAllocs is like diffAllocs however, the allocations in the
|
||||
// diffResult contain the specific nodeID they should be allocated on.
|
||||
func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[string]bool,
|
||||
func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[string]*structs.Node,
|
||||
allocs []*structs.Allocation) *diffResult {
|
||||
|
||||
// Build a mapping of nodes to all their allocs.
|
||||
@@ -254,8 +266,9 @@ func progressMade(result *structs.PlanResult) bool {
|
||||
|
||||
// taintedNodes is used to scan the allocations and then check if the
|
||||
// underlying nodes are tainted, and should force a migration of the allocation.
|
||||
func taintedNodes(state State, allocs []*structs.Allocation) (map[string]bool, error) {
|
||||
out := make(map[string]bool)
|
||||
// All the nodes returned in the map are tainted.
|
||||
func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*structs.Node, error) {
|
||||
out := make(map[string]*structs.Node)
|
||||
for _, alloc := range allocs {
|
||||
if _, ok := out[alloc.NodeID]; ok {
|
||||
continue
|
||||
@@ -268,11 +281,12 @@ func taintedNodes(state State, allocs []*structs.Allocation) (map[string]bool, e
|
||||
|
||||
// If the node does not exist, we should migrate
|
||||
if node == nil {
|
||||
out[alloc.NodeID] = true
|
||||
out[alloc.NodeID] = nil
|
||||
continue
|
||||
}
|
||||
|
||||
out[alloc.NodeID] = structs.ShouldDrainNode(node.Status) || node.Drain
|
||||
if structs.ShouldDrainNode(node.Status) || node.Drain {
|
||||
out[alloc.NodeID] = node
|
||||
}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
@@ -426,7 +440,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
|
||||
// Otherwise we would be trying to fit the tasks current resources and
|
||||
// updated resources. After select is called we can remove the evict.
|
||||
ctx.Plan().AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop,
|
||||
allocInPlace)
|
||||
allocInPlace, "")
|
||||
|
||||
// Attempt to match the task group
|
||||
option, _ := stack.Select(update.TaskGroup)
|
||||
@@ -479,7 +493,25 @@ func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc stri
|
||||
n := len(allocs)
|
||||
for i := 0; i < n && i < *limit; i++ {
|
||||
a := allocs[i]
|
||||
ctx.Plan().AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc)
|
||||
ctx.Plan().AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc, "")
|
||||
diff.place = append(diff.place, a)
|
||||
}
|
||||
if n <= *limit {
|
||||
*limit -= n
|
||||
return false
|
||||
}
|
||||
*limit = 0
|
||||
return true
|
||||
}
|
||||
|
||||
// markLostAndPlace is used to mark allocations as lost and add them to the
|
||||
// placement queue. evictAndPlace modifies both the the diffResult and the
|
||||
// limit. It returns true if the limit has been reached.
|
||||
func markLostAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc string, limit *int) bool {
|
||||
n := len(allocs)
|
||||
for i := 0; i < n && i < *limit; i++ {
|
||||
a := allocs[i]
|
||||
ctx.Plan().AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc, structs.AllocClientStatusLost)
|
||||
diff.place = append(diff.place, a)
|
||||
}
|
||||
if n <= *limit {
|
||||
|
||||
@@ -47,9 +47,15 @@ func TestDiffAllocs(t *testing.T) {
|
||||
*oldJob = *job
|
||||
oldJob.JobModifyIndex -= 1
|
||||
|
||||
tainted := map[string]bool{
|
||||
"dead": true,
|
||||
"zip": false,
|
||||
drainNode := mock.Node()
|
||||
drainNode.Drain = true
|
||||
|
||||
deadNode := mock.Node()
|
||||
deadNode.Status = structs.NodeStatusDown
|
||||
|
||||
tainted := map[string]*structs.Node{
|
||||
"dead": deadNode,
|
||||
"drainNode": drainNode,
|
||||
}
|
||||
|
||||
allocs := []*structs.Allocation{
|
||||
@@ -80,10 +86,17 @@ func TestDiffAllocs(t *testing.T) {
|
||||
// Migrate the 3rd
|
||||
&structs.Allocation{
|
||||
ID: structs.GenerateUUID(),
|
||||
NodeID: "dead",
|
||||
NodeID: "drainNode",
|
||||
Name: "my-job.web[2]",
|
||||
Job: oldJob,
|
||||
},
|
||||
// Mark the 4th lost
|
||||
&structs.Allocation{
|
||||
ID: structs.GenerateUUID(),
|
||||
NodeID: "dead",
|
||||
Name: "my-job.web[3]",
|
||||
Job: oldJob,
|
||||
},
|
||||
}
|
||||
|
||||
diff := diffAllocs(job, tainted, required, allocs)
|
||||
@@ -92,6 +105,7 @@ func TestDiffAllocs(t *testing.T) {
|
||||
migrate := diff.migrate
|
||||
stop := diff.stop
|
||||
ignore := diff.ignore
|
||||
lost := diff.lost
|
||||
|
||||
// We should update the first alloc
|
||||
if len(update) != 1 || update[0].Alloc != allocs[0] {
|
||||
@@ -113,8 +127,13 @@ func TestDiffAllocs(t *testing.T) {
|
||||
t.Fatalf("bad: %#v", migrate)
|
||||
}
|
||||
|
||||
// We should place 7
|
||||
if len(place) != 7 {
|
||||
// We should mark the 5th alloc as lost
|
||||
if len(lost) != 1 || lost[0].Alloc != allocs[4] {
|
||||
t.Fatalf("bad: %#v", migrate)
|
||||
}
|
||||
|
||||
// We should place 6
|
||||
if len(place) != 6 {
|
||||
t.Fatalf("bad: %#v", place)
|
||||
}
|
||||
}
|
||||
@@ -130,9 +149,15 @@ func TestDiffSystemAllocs(t *testing.T) {
|
||||
*oldJob = *job
|
||||
oldJob.JobModifyIndex -= 1
|
||||
|
||||
tainted := map[string]bool{
|
||||
"dead": true,
|
||||
"baz": false,
|
||||
drainNode := mock.Node()
|
||||
drainNode.Drain = true
|
||||
|
||||
deadNode := mock.Node()
|
||||
deadNode.Status = structs.NodeStatusDown
|
||||
|
||||
tainted := map[string]*structs.Node{
|
||||
"dead": deadNode,
|
||||
"drainNode": drainNode,
|
||||
}
|
||||
|
||||
allocs := []*structs.Allocation{
|
||||
@@ -152,7 +177,14 @@ func TestDiffSystemAllocs(t *testing.T) {
|
||||
Job: job,
|
||||
},
|
||||
|
||||
// Stop allocation on dead.
|
||||
// Stop allocation on draining node.
|
||||
&structs.Allocation{
|
||||
ID: structs.GenerateUUID(),
|
||||
NodeID: "drainNode",
|
||||
Name: "my-job.web[0]",
|
||||
Job: oldJob,
|
||||
},
|
||||
// Mark as lost on a dead node
|
||||
&structs.Allocation{
|
||||
ID: structs.GenerateUUID(),
|
||||
NodeID: "dead",
|
||||
@@ -167,6 +199,7 @@ func TestDiffSystemAllocs(t *testing.T) {
|
||||
migrate := diff.migrate
|
||||
stop := diff.stop
|
||||
ignore := diff.ignore
|
||||
lost := diff.lost
|
||||
|
||||
// We should update the first alloc
|
||||
if len(update) != 1 || update[0].Alloc != allocs[0] {
|
||||
@@ -188,6 +221,11 @@ func TestDiffSystemAllocs(t *testing.T) {
|
||||
t.Fatalf("bad: %#v", migrate)
|
||||
}
|
||||
|
||||
// We should mark the 5th alloc as lost
|
||||
if len(lost) != 1 || lost[0].Alloc != allocs[3] {
|
||||
t.Fatalf("bad: %#v", migrate)
|
||||
}
|
||||
|
||||
// We should place 1
|
||||
if len(place) != 1 {
|
||||
t.Fatalf("bad: %#v", place)
|
||||
@@ -309,13 +347,26 @@ func TestTaintedNodes(t *testing.T) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if len(tainted) != 5 {
|
||||
if len(tainted) != 3 {
|
||||
t.Fatalf("bad: %v", tainted)
|
||||
}
|
||||
if tainted[node1.ID] || tainted[node2.ID] {
|
||||
|
||||
if _, ok := tainted[node1.ID]; ok {
|
||||
t.Fatalf("Bad: %v", tainted)
|
||||
}
|
||||
if !tainted[node3.ID] || !tainted[node4.ID] || !tainted["12345678-abcd-efab-cdef-123456789abc"] {
|
||||
if _, ok := tainted[node2.ID]; ok {
|
||||
t.Fatalf("Bad: %v", tainted)
|
||||
}
|
||||
|
||||
if node, ok := tainted[node3.ID]; !ok || node == nil {
|
||||
t.Fatalf("Bad: %v", tainted)
|
||||
}
|
||||
|
||||
if node, ok := tainted[node4.ID]; !ok || node == nil {
|
||||
t.Fatalf("Bad: %v", tainted)
|
||||
}
|
||||
|
||||
if node, ok := tainted["12345678-abcd-efab-cdef-123456789abc"]; !ok || node != nil {
|
||||
t.Fatalf("Bad: %v", tainted)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user