Removing the queued state of Job Summary and alloc desired status false

This commit is contained in:
Diptanu Choudhury
2016-07-13 13:20:46 -06:00
parent 3c1e657a06
commit 42ba68a201
9 changed files with 29 additions and 157 deletions

View File

@@ -115,23 +115,6 @@ func (m *monitor) update(update *evalState) {
for allocID, alloc := range update.allocs {
if existing, ok := existing.allocs[allocID]; !ok {
switch {
case alloc.desired == structs.AllocDesiredStatusFailed:
// New allocs with desired state failed indicate
// scheduling failure.
m.ui.Output(fmt.Sprintf("Scheduling error for group %q (%s)",
alloc.group, alloc.desiredDesc))
// Log the client status, if any provided
if alloc.clientDesc != "" {
m.ui.Output("Client reported status: " + alloc.clientDesc)
}
// Generate a more descriptive error for why the allocation
// failed and dump it to the screen
if alloc.full != nil {
dumpAllocStatus(m.ui, alloc.full, m.length)
}
case alloc.index < update.index:
// New alloc with create index lower than the eval
// create index indicates modification
@@ -275,18 +258,6 @@ func (m *monitor) monitor(evalID string, allowPrefix bool) int {
clientDesc: alloc.ClientDescription,
index: alloc.CreateIndex,
}
// If we have a scheduling error, query the full allocation
// to get the details.
if alloc.DesiredStatus == structs.AllocDesiredStatusFailed {
schedFailure = true
failed, _, err := m.client.Allocations().Info(alloc.ID, nil)
if err != nil {
m.ui.Error(fmt.Sprintf("Error querying allocation: %s", err))
return 1
}
state.allocs[alloc.ID].full = failed
}
}
// Update the state

View File

@@ -133,68 +133,6 @@ func TestMonitor_Update_Allocs(t *testing.T) {
}
}
func TestMonitor_Update_SchedulingFailure(t *testing.T) {
ui := new(cli.MockUi)
mon := newMonitor(ui, nil, shortId)
// New allocs with desired status failed warns
state := &evalState{
allocs: map[string]*allocState{
"alloc2": &allocState{
id: "87654321-dcba-efab-cdef-123456789abc",
group: "group2",
desired: structs.AllocDesiredStatusFailed,
desiredDesc: "something failed",
client: structs.AllocClientStatusFailed,
clientDesc: "client failed",
index: 1,
// Attach the full failed allocation
full: &api.Allocation{
ID: "87654321-dcba-efab-cdef-123456789abc",
TaskGroup: "group2",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusFailed,
Metrics: &api.AllocationMetric{
NodesEvaluated: 3,
NodesFiltered: 3,
ConstraintFiltered: map[string]int{
"$attr.kernel.name = linux": 3,
},
},
},
},
},
}
mon.update(state)
// Scheduling failure was logged
out := ui.OutputWriter.String()
if !strings.Contains(out, "group2") {
t.Fatalf("missing group\n\n%s", out)
}
if !strings.Contains(out, "Scheduling error") {
t.Fatalf("missing failure\n\n%s", out)
}
if !strings.Contains(out, "something failed") {
t.Fatalf("missing desired desc\n\n%s", out)
}
if !strings.Contains(out, "client failed") {
t.Fatalf("missing client desc\n\n%s", out)
}
// Check that the allocation details were dumped
if !strings.Contains(out, "3/3") {
t.Fatalf("missing filter stats\n\n%s", out)
}
if !strings.Contains(out, structs.AllocDesiredStatusFailed) {
t.Fatalf("missing alloc status\n\n%s", out)
}
if !strings.Contains(out, "$attr.kernel.name = linux") {
t.Fatalf("missing constraint\n\n%s", out)
}
}
func TestMonitor_Update_AllocModification(t *testing.T) {
ui := new(cli.MockUi)
mon := newMonitor(ui, nil, fullId)

View File

@@ -26,7 +26,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
// Insert "dead" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
@@ -98,7 +98,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
@@ -165,7 +165,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
// Insert "dead" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
@@ -241,7 +241,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) {
// Insert "dead" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
@@ -467,19 +467,19 @@ func TestCoreScheduler_JobGC(t *testing.T) {
{
test: "Terminal",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusFailed,
allocStatus: structs.AllocDesiredStatusStop,
shouldExist: false,
},
{
test: "Has Alloc",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusRun,
allocStatus: structs.AllocDesiredStatusStop,
shouldExist: true,
},
{
test: "Has Eval",
evalStatus: structs.EvalStatusPending,
allocStatus: structs.AllocDesiredStatusFailed,
allocStatus: structs.AllocDesiredStatusStop,
shouldExist: true,
},
}
@@ -678,7 +678,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
{
test: "Terminal",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusFailed,
allocStatus: structs.AllocDesiredStatusStop,
shouldExist: false,
},
{
@@ -690,7 +690,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
{
test: "Has Eval",
evalStatus: structs.EvalStatusPending,
allocStatus: structs.AllocDesiredStatusFailed,
allocStatus: structs.AllocDesiredStatusStop,
shouldExist: true,
},
}

View File

@@ -1227,18 +1227,14 @@ func (s *StateStore) updateSummaryWithJob(job *structs.Job, txn *memdb.Txn) erro
}
}
for _, tg := range job.TaskGroups {
if summary, ok := existing.Summary[tg.Name]; !ok {
if _, ok := existing.Summary[tg.Name]; !ok {
newSummary := structs.TaskGroupSummary{
Queued: tg.Count,
Complete: 0,
Failed: 0,
Running: 0,
Starting: 0,
}
existing.Summary[tg.Name] = newSummary
} else if summary.Queued > tg.Count {
summary.Queued = tg.Count
existing.Summary[tg.Name] = summary
}
}
@@ -1274,19 +1270,14 @@ func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation,
}
if existingAlloc == nil {
switch newAlloc.DesiredStatus {
case structs.AllocDesiredStatusFailed:
tgSummary.Failed += 1
case structs.AllocDesiredStatusStop:
tgSummary.Complete += 1
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
s.logger.Printf("[WARN]: new allocation inserted into state store with id: %v and state: %v", newAlloc.DesiredStatus)
}
switch newAlloc.ClientStatus {
case structs.AllocClientStatusPending:
tgSummary.Starting += 1
case structs.AllocClientStatusRunning:
tgSummary.Running += 1
}
if tgSummary.Queued > 0 {
tgSummary.Queued -= 1
case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed, structs.AllocClientStatusComplete:
s.logger.Printf("[WARN]: new allocation inserted into state store with id: %v and state: %v", newAlloc.ClientStatus)
}
} else if existingAlloc.ClientStatus != newAlloc.ClientStatus {
// Incrementing the clint of the bin of the current state

View File

@@ -402,13 +402,10 @@ func TestStateStore_UpsertJob_Job(t *testing.T) {
if summary.JobID != job.ID {
t.Fatalf("bad summary id: %v", summary.JobID)
}
webTgSummary, ok := summary.Summary["web"]
_, ok := summary.Summary["web"]
if !ok {
t.Fatalf("nil summary for task group")
}
if webTgSummary.Queued != 10 {
t.Fatalf("wrong summary: %#v", webTgSummary)
}
notify.verify(t)
}
@@ -469,13 +466,10 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
if summary.JobID != job.ID {
t.Fatalf("bad summary id: %v", summary.JobID)
}
webTgSummary, ok := summary.Summary["web"]
_, ok := summary.Summary["web"]
if !ok {
t.Fatalf("nil summary for task group")
}
if webTgSummary.Queued != 10 {
t.Fatalf("wrong summary: %#v", webTgSummary)
}
notify.verify(t)
}
@@ -1083,7 +1077,7 @@ func TestStateStore_RestoreJobSummary(t *testing.T) {
JobID: job.ID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Queued: 10,
Starting: 10,
},
},
}
@@ -1623,9 +1617,6 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) {
if tgSummary.Failed != 1 {
t.Fatalf("expected failed: %v, actual: %v, summary: %#v", 1, tgSummary.Failed, tgSummary)
}
if tgSummary.Queued != 9 {
t.Fatalf("expected queued: %v, actual: %v", 9, tgSummary.Running)
}
summary2, err := state.JobSummaryByID(alloc2.JobID)
if err != nil {
@@ -1635,9 +1626,6 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) {
if tgSummary2.Running != 1 {
t.Fatalf("expected running: %v, actual: %v", 1, tgSummary2.Failed)
}
if tgSummary2.Queued != 9 {
t.Fatalf("expected queued: %v, actual: %v", 9, tgSummary2.Running)
}
notify.verify(t)
}
@@ -1689,9 +1677,6 @@ func TestStateStore_UpsertAlloc_Alloc(t *testing.T) {
if !ok {
t.Fatalf("no summary for task group web")
}
if tgSummary.Queued != 9 {
t.Fatalf("expected queued: %v, actual: %v", 9, tgSummary.Queued)
}
if tgSummary.Starting != 1 {
t.Fatalf("expected queued: %v, actual: %v", 1, tgSummary.Starting)
}
@@ -1720,9 +1705,6 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
if tgSummary.Starting != 1 {
t.Fatalf("expected starting: %v, actual: %v", 1, tgSummary.Starting)
}
if tgSummary.Queued != 9 {
t.Fatalf("expected starting: %v, actual: %v", 9, tgSummary.Queued)
}
alloc2 := mock.Alloc()
alloc2.ID = alloc.ID
@@ -1774,9 +1756,6 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
if tgSummary.Starting != 1 {
t.Fatalf("expected starting: %v, actual: %v", 1, tgSummary.Starting)
}
if tgSummary.Queued != 9 {
t.Fatalf("expected starting: %v, actual: %v", 9, tgSummary.Queued)
}
notify.verify(t)
}
@@ -2338,7 +2317,7 @@ func TestStateJobSummary_UpdateJobCount(t *testing.T) {
t.Fatalf("err: %v", err)
}
summary, _ := state.JobSummaryByID(job.ID)
if summary.Summary["web"].Queued != 2 && summary.Summary["web"].Starting != 1 {
if summary.Summary["web"].Starting != 1 {
t.Fatalf("bad job summary: %v", summary)
}
@@ -2427,7 +2406,7 @@ func TestJobSummary_UpdateClientStatus(t *testing.T) {
t.Fatalf("err: %v", err)
}
summary, _ := state.JobSummaryByID(job.ID)
if summary.Summary["web"].Queued != 0 || summary.Summary["web"].Starting != 3 {
if summary.Summary["web"].Starting != 3 {
t.Fatalf("bad job summary: %v", summary)
}
@@ -2453,7 +2432,7 @@ func TestJobSummary_UpdateClientStatus(t *testing.T) {
t.Fatalf("err: %v", err)
}
summary, _ = state.JobSummaryByID(job.ID)
if summary.Summary["web"].Queued != 0 || summary.Summary["web"].Running != 1 || summary.Summary["web"].Failed != 1 || summary.Summary["web"].Complete != 1 {
if summary.Summary["web"].Running != 1 || summary.Summary["web"].Failed != 1 || summary.Summary["web"].Complete != 1 {
t.Fatalf("bad job summary: %v", summary)
}
@@ -2465,7 +2444,7 @@ func TestJobSummary_UpdateClientStatus(t *testing.T) {
t.Fatalf("err: %v", err)
}
summary, _ = state.JobSummaryByID(job.ID)
if summary.Summary["web"].Queued != 0 || summary.Summary["web"].Starting != 1 || summary.Summary["web"].Running != 1 || summary.Summary["web"].Failed != 1 || summary.Summary["web"].Complete != 1 {
if summary.Summary["web"].Starting != 1 || summary.Summary["web"].Running != 1 || summary.Summary["web"].Failed != 1 || summary.Summary["web"].Complete != 1 {
t.Fatalf("bad job summary: %v", summary)
}
}

View File

@@ -950,7 +950,6 @@ type JobSummary struct {
// TaskGroup summarizes the state of all the allocations of a particular
// TaskGroup
type TaskGroupSummary struct {
Queued int
Complete int
Failed int
Running int
@@ -958,11 +957,6 @@ type TaskGroupSummary struct {
Lost int
}
// Total returns the total number of allocations for the task group.
func (s *TaskGroupSummary) Total() int {
return s.Queued + s.Complete + s.Failed + s.Running + s.Starting
}
// Job is the scope of a scheduling request to Nomad. It is the largest
// scoped object, and is a named collection of task groups. Each task group
// is further composed of tasks. A task group (TG) is the unit of scheduling
@@ -2323,10 +2317,9 @@ func (c *Constraint) Validate() error {
}
const (
AllocDesiredStatusRun = "run" // Allocation should run
AllocDesiredStatusStop = "stop" // Allocation should stop
AllocDesiredStatusEvict = "evict" // Allocation should stop, and was evicted
AllocDesiredStatusFailed = "failed" // Allocation failed to be done
AllocDesiredStatusRun = "run" // Allocation should run
AllocDesiredStatusStop = "stop" // Allocation should stop
AllocDesiredStatusEvict = "evict" // Allocation should stop, and was evicted
)
const (
@@ -2435,7 +2428,7 @@ func (a *Allocation) TerminalStatus() bool {
// First check the desired state and if that isn't terminal, check client
// state.
switch a.DesiredStatus {
case AllocDesiredStatusStop, AllocDesiredStatusEvict, AllocDesiredStatusFailed:
case AllocDesiredStatusStop, AllocDesiredStatusEvict:
return true
default:
}

View File

@@ -273,7 +273,7 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) []
// status is failed so that they will be replaced. If they are
// complete but not failed, they shouldn't be replaced.
switch a.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict, structs.AllocDesiredStatusFailed:
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
return !a.RanSuccessfully()
default:
}

View File

@@ -644,7 +644,7 @@ func TestServiceSched_JobModify(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredStatus = structs.AllocDesiredStatusStop
terminal = append(terminal, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), terminal))
@@ -833,7 +833,7 @@ func TestServiceSched_JobModify_CountZero(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredStatus = structs.AllocDesiredStatusStop
terminal = append(terminal, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), terminal))

View File

@@ -304,7 +304,7 @@ func TestSystemSched_JobModify(t *testing.T) {
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = "my-job.web[0]"
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredStatus = structs.AllocDesiredStatusStop
terminal = append(terminal, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), terminal))