Merge pull request #663 from hashicorp/f-job-status

Populate Job Status
This commit is contained in:
Alex Dadgar
2016-01-12 09:58:16 -08:00
7 changed files with 396 additions and 15 deletions

View File

@@ -532,6 +532,7 @@ func TestJobEndpoint_GetJob(t *testing.T) {
}
job.CreateIndex = resp.JobModifyIndex
job.ModifyIndex = resp.JobModifyIndex
job.JobModifyIndex = resp.JobModifyIndex
// Lookup the job
get := &structs.JobSpecificRequest{

View File

@@ -124,9 +124,10 @@ func Job() *structs.Job {
Meta: map[string]string{
"owner": "armon",
},
Status: structs.JobStatusPending,
CreateIndex: 42,
ModifyIndex: 99,
Status: structs.JobStatusPending,
CreateIndex: 42,
ModifyIndex: 99,
JobModifyIndex: 99,
}
job.InitFields()
return job

View File

@@ -295,9 +295,26 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
if existing != nil {
job.CreateIndex = existing.(*structs.Job).CreateIndex
job.ModifyIndex = index
job.JobModifyIndex = index
// Compute the job status
var err error
job.Status, err = s.getJobStatus(txn, job, false)
if err != nil {
return fmt.Errorf("setting job status for %q failed: %v", job.ID, err)
}
} else {
job.CreateIndex = index
job.ModifyIndex = index
job.JobModifyIndex = index
// If we are inserting the job for the first time, we don't need to
// calculate the jobs status as it is known.
if job.IsPeriodic() {
job.Status = structs.JobStatusRunning
} else {
job.Status = structs.JobStatusPending
}
}
// Insert the job
@@ -524,11 +541,19 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
watcher.Add(watch.Item{Table: "evals"})
// Do a nested upsert
jobs := make(map[string]string, len(evals))
for _, eval := range evals {
watcher.Add(watch.Item{Eval: eval.ID})
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
return err
}
jobs[eval.JobID] = ""
}
// Set the job's status
if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
@@ -571,6 +596,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
watcher.Add(watch.Item{Table: "evals"})
watcher.Add(watch.Item{Table: "allocs"})
jobs := make(map[string]string, len(evals))
for _, eval := range evals {
existing, err := txn.First("evals", "id", eval)
if err != nil {
@@ -583,6 +609,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
return fmt.Errorf("eval delete failed: %v", err)
}
watcher.Add(watch.Item{Eval: eval})
jobs[existing.(*structs.Evaluation).JobID] = ""
}
for _, alloc := range allocs {
@@ -611,6 +638,11 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
return fmt.Errorf("index update failed: %v", err)
}
// Set the job's status
if err := s.setJobStatuses(index, watcher, txn, jobs, true); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
@@ -726,6 +758,16 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati
return fmt.Errorf("index update failed: %v", err)
}
// Set the job's status
forceStatus := ""
if !copyAlloc.TerminalStatus() {
forceStatus = structs.JobStatusRunning
}
jobs := map[string]string{alloc.JobID: forceStatus}
if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
@@ -741,6 +783,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
watcher.Add(watch.Item{Table: "allocs"})
// Handle the allocations
jobs := make(map[string]string, 1)
for _, alloc := range allocs {
existing, err := txn.First("allocs", "id", alloc.ID)
if err != nil {
@@ -761,6 +804,13 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("alloc insert failed: %v", err)
}
// If the allocation is running, force the job to running status.
forceStatus := ""
if !alloc.TerminalStatus() {
forceStatus = structs.JobStatusRunning
}
jobs[alloc.JobID] = forceStatus
watcher.Add(watch.Item{Alloc: alloc.ID})
watcher.Add(watch.Item{AllocEval: alloc.EvalID})
watcher.Add(watch.Item{AllocJob: alloc.JobID})
@@ -772,6 +822,11 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("index update failed: %v", err)
}
// Set the job's status
if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil {
return fmt.Errorf("setting job status failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
@@ -906,6 +961,117 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) {
return iter, nil
}
// setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID.
// It takes a map of job IDs to an optional forceStatus string. It returns an
// error if the job doesn't exist or setJobStatus fails.
func (s *StateStore) setJobStatuses(index uint64, watcher watch.Items, txn *memdb.Txn,
jobs map[string]string, evalDelete bool) error {
for job, forceStatus := range jobs {
existing, err := txn.First("jobs", "id", job)
if err != nil {
return fmt.Errorf("job lookup failed: %v", err)
}
if existing == nil {
continue
}
if err := s.setJobStatus(index, watcher, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil {
return err
}
}
return nil
}
// setJobStatus sets the status of the job by looking up associated evaluations
// and allocations. evalDelete should be set to true if setJobStatus is being
// called because an evaluation is being deleted (potentially because of garbage
// collection). If forceStatus is non-empty, the job's status will be set to the
// passed status.
func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb.Txn,
job *structs.Job, evalDelete bool, forceStatus string) error {
// Capture the current status so we can check if there is a change
oldStatus := job.Status
newStatus := forceStatus
// If forceStatus is not set, compute the jobs status.
if forceStatus == "" {
var err error
newStatus, err = s.getJobStatus(txn, job, evalDelete)
if err != nil {
return err
}
}
// Fast-path if nothing has changed.
if oldStatus == newStatus {
return nil
}
// The job has changed, so add to watcher.
watcher.Add(watch.Item{Table: "jobs"})
watcher.Add(watch.Item{Job: job.ID})
// Copy and update the existing job
updated := job.Copy()
updated.Status = newStatus
updated.ModifyIndex = index
// Insert the job
if err := txn.Insert("jobs", updated); err != nil {
return fmt.Errorf("job insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
return nil
}
func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete bool) (string, error) {
allocs, err := txn.Get("allocs", "job", job.ID)
if err != nil {
return "", err
}
// If there is a non-terminal allocation, the job is running.
hasAlloc := false
for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() {
hasAlloc = true
if !alloc.(*structs.Allocation).TerminalStatus() {
return structs.JobStatusRunning, nil
}
}
evals, err := txn.Get("evals", "job", job.ID)
if err != nil {
return "", err
}
hasEval := false
for eval := evals.Next(); eval != nil; eval = evals.Next() {
hasEval = true
if !eval.(*structs.Evaluation).TerminalStatus() {
return structs.JobStatusPending, nil
}
}
// The job is dead if all the allocations and evals are terminal or if there
// are no evals because of garbage collection.
if evalDelete || hasEval || hasAlloc {
return structs.JobStatusDead, nil
}
// If there are no allocations or evaluations it is a new job. If the job is
// periodic, we mark it as running as it will never have an
// allocation/evaluation against it.
if job.IsPeriodic() {
return structs.JobStatusRunning, nil
}
return structs.JobStatusPending, nil
}
// StateSnapshot is used to provide a point-in-time snapshot
type StateSnapshot struct {
StateStore

View File

@@ -1748,6 +1748,222 @@ func TestStateStore_RestoreAlloc(t *testing.T) {
notify.verify(t)
}
func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) {
state := testStateStore(t)
watcher := watch.NewItems()
txn := state.db.Txn(true)
// Create and insert a mock job.
job := mock.Job()
job.Status = ""
job.ModifyIndex = 0
if err := txn.Insert("jobs", job); err != nil {
t.Fatalf("job insert failed: %v", err)
}
exp := "foobar"
index := uint64(1000)
if err := state.setJobStatus(index, watcher, txn, job, false, exp); err != nil {
t.Fatalf("setJobStatus() failed: %v", err)
}
i, err := txn.First("jobs", "id", job.ID)
if err != nil {
t.Fatalf("job lookup failed: %v", err)
}
updated := i.(*structs.Job)
if updated.Status != exp {
t.Fatalf("setJobStatus() set %v; expected %v", updated.Status, exp)
}
if updated.ModifyIndex != index {
t.Fatalf("setJobStatus() set %d; expected %d", updated.ModifyIndex, index)
}
}
func TestStateStore_SetJobStatus_NoOp(t *testing.T) {
state := testStateStore(t)
watcher := watch.NewItems()
txn := state.db.Txn(true)
// Create and insert a mock job that should be pending.
job := mock.Job()
job.Status = structs.JobStatusPending
job.ModifyIndex = 10
if err := txn.Insert("jobs", job); err != nil {
t.Fatalf("job insert failed: %v", err)
}
index := uint64(1000)
if err := state.setJobStatus(index, watcher, txn, job, false, ""); err != nil {
t.Fatalf("setJobStatus() failed: %v", err)
}
i, err := txn.First("jobs", "id", job.ID)
if err != nil {
t.Fatalf("job lookup failed: %v", err)
}
updated := i.(*structs.Job)
if updated.ModifyIndex == index {
t.Fatalf("setJobStatus() should have been a no-op")
}
}
func TestStateStore_SetJobStatus(t *testing.T) {
state := testStateStore(t)
watcher := watch.NewItems()
txn := state.db.Txn(true)
// Create and insert a mock job that should be pending but has an incorrect
// status.
job := mock.Job()
job.Status = "foobar"
job.ModifyIndex = 10
if err := txn.Insert("jobs", job); err != nil {
t.Fatalf("job insert failed: %v", err)
}
index := uint64(1000)
if err := state.setJobStatus(index, watcher, txn, job, false, ""); err != nil {
t.Fatalf("setJobStatus() failed: %v", err)
}
i, err := txn.First("jobs", "id", job.ID)
if err != nil {
t.Fatalf("job lookup failed: %v", err)
}
updated := i.(*structs.Job)
if updated.Status != structs.JobStatusPending {
t.Fatalf("setJobStatus() set %v; expected %v", updated.Status, structs.JobStatusPending)
}
if updated.ModifyIndex != index {
t.Fatalf("setJobStatus() set %d; expected %d", updated.ModifyIndex, index)
}
}
func TestStateStore_GetJobStatus_NoEvalsOrAllocs(t *testing.T) {
job := mock.Job()
state := testStateStore(t)
txn := state.db.Txn(false)
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
}
if status != structs.JobStatusPending {
t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusPending)
}
}
func TestStateStore_GetJobStatus_NoEvalsOrAllocs_Periodic(t *testing.T) {
job := mock.PeriodicJob()
state := testStateStore(t)
txn := state.db.Txn(false)
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
}
if status != structs.JobStatusRunning {
t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning)
}
}
func TestStateStore_GetJobStatus_NoEvalsOrAllocs_EvalDelete(t *testing.T) {
job := mock.Job()
state := testStateStore(t)
txn := state.db.Txn(false)
status, err := state.getJobStatus(txn, job, true)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
}
if status != structs.JobStatusDead {
t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead)
}
}
func TestStateStore_GetJobStatus_DeadEvalsAndAllocs(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
// Create a mock alloc that is dead.
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
// Create a mock eval that is complete
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = structs.EvalStatusComplete
if err := state.UpsertEvals(1001, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}
txn := state.db.Txn(false)
status, err := state.getJobStatus(txn, job, false)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
}
if status != structs.JobStatusDead {
t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead)
}
}
func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
// Create a mock alloc that is running.
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun
if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
txn := state.db.Txn(false)
status, err := state.getJobStatus(txn, job, true)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
}
if status != structs.JobStatusRunning {
t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning)
}
}
func TestStateStore_SetJobStatus_PendingEval(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
// Create a mock eval that is pending.
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = structs.EvalStatusPending
if err := state.UpsertEvals(1000, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}
txn := state.db.Txn(false)
status, err := state.getJobStatus(txn, job, true)
if err != nil {
t.Fatalf("getJobStatus() failed: %v", err)
}
if status != structs.JobStatusPending {
t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusPending)
}
}
func TestStateWatch_watch(t *testing.T) {
sw := newStateWatch()
notify1 := make(chan struct{}, 1)

View File

@@ -689,10 +689,9 @@ const (
)
const (
JobStatusPending = "pending" // Pending means the job is waiting on scheduling
JobStatusRunning = "running" // Running means the entire job is running
JobStatusComplete = "complete" // Complete means there was a clean termination
JobStatusDead = "dead" // Dead means there was abnormal termination
JobStatusPending = "pending" // Pending means the job is waiting on scheduling
JobStatusRunning = "running" // Running means the job has non-terminal allocations
JobStatusDead = "dead" // Dead means all evaluation's and allocations are terminal
)
const (
@@ -778,8 +777,9 @@ type Job struct {
StatusDescription string
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
CreateIndex uint64
ModifyIndex uint64
JobModifyIndex uint64
}
// InitFields is used to initialize fields in the Job. This should be called

View File

@@ -92,10 +92,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool,
}
// If the definition is updated we need to update
// XXX: This is an extremely conservative approach. We can check
// if the job definition has changed in a way that affects
// this allocation and potentially ignore it.
if job.ModifyIndex != exist.Job.ModifyIndex {
if job.JobModifyIndex != exist.Job.JobModifyIndex {
result.update = append(result.update, allocTuple{
Name: name,
TaskGroup: tg,

View File

@@ -38,7 +38,7 @@ func TestDiffAllocs(t *testing.T) {
// The "old" job has a previous modify index
oldJob := new(structs.Job)
*oldJob = *job
oldJob.ModifyIndex -= 1
oldJob.JobModifyIndex -= 1
tainted := map[string]bool{
"dead": true,
@@ -119,7 +119,7 @@ func TestDiffSystemAllocs(t *testing.T) {
// The "old" job has a previous modify index
oldJob := new(structs.Job)
*oldJob = *job
oldJob.ModifyIndex -= 1
oldJob.JobModifyIndex -= 1
tainted := map[string]bool{
"dead": true,