Applying changes to job updates via FSM

This commit is contained in:
Diptanu Choudhury
2016-07-19 16:15:57 -07:00
parent 5b2f2ee6c5
commit 36abb97d87
2 changed files with 56 additions and 28 deletions

View File

@@ -239,6 +239,11 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
return err
}
if err := n.state.UpdateSummaryWithJob(req.Job, index); err != nil {
n.logger.Printf("[ERR] nomad.fsm: Updating job summary failed: %v", err)
return err
}
// We always add the job to the periodic dispatcher because there is the
// possibility that the periodic spec was removed and then we should stop
// tracking it.
@@ -393,6 +398,10 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
}
}
if err := n.updateJobSummary(index, req.Alloc); err != nil {
return err
}
if err := n.state.UpsertAllocs(index, req.Alloc); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpsertAllocs failed: %v", err)
return err
@@ -410,6 +419,10 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
return nil
}
if err := n.updateJobSummary(index, req.Alloc); err != nil {
return err
}
// Update all the client allocations
if err := n.state.UpdateAllocsFromClient(index, req.Alloc); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateAllocFromClient failed: %v", err)
@@ -435,6 +448,20 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
return nil
}
func (n *nomadFSM) updateJobSummary(index uint64, allocations []*structs.Allocation) error {
for _, alloc := range allocations {
existingAlloc, err := n.state.AllocByID(alloc.ID)
if err != nil {
return fmt.Errorf("unable to get allocation from state store: %v", err)
}
if err := n.state.UpdateSummaryWithAlloc(alloc, existingAlloc, index); err != nil {
return err
}
}
return nil
}
func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
// Create a new snapshot
snap, err := n.state.Snapshot()

View File

@@ -344,11 +344,6 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
return fmt.Errorf("index update failed: %v", err)
}
// Update the job summary
if err := s.updateSummaryWithJob(job, index, watcher, txn); err != nil {
return fmt.Errorf("job summary update failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
@@ -856,11 +851,6 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, watcher watch.I
}
exist := existing.(*structs.Allocation)
// Update the job summary
if err := s.updateSummaryWithAlloc(alloc, exist, index, watcher, txn); err != nil {
return fmt.Errorf("unable to update job summary: %v", err)
}
// Trigger the watcher
watcher.Add(watch.Item{Alloc: alloc.ID})
watcher.Add(watch.Item{AllocEval: exist.EvalID})
@@ -914,9 +904,6 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
}
exist, _ := existing.(*structs.Allocation)
if err := s.updateSummaryWithAlloc(alloc, exist, index, watcher, txn); err != nil {
return fmt.Errorf("updating job summary failed: %v", err)
}
if exist == nil {
alloc.CreateIndex = index
alloc.ModifyIndex = index
@@ -1250,7 +1237,12 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
// updateSummaryWithJob creates or updates job summaries when new jobs are
// upserted or existing ones are updated
func (s *StateStore) updateSummaryWithJob(job *structs.Job, index uint64, watcher watch.Items, txn *memdb.Txn) error {
func (s *StateStore) UpdateSummaryWithJob(job *structs.Job, index uint64) error {
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
existing, err := s.JobSummaryByID(job.ID)
if err != nil {
return fmt.Errorf("unable to retrieve summary for job: %v", err)
@@ -1293,32 +1285,38 @@ func (s *StateStore) updateSummaryWithJob(job *structs.Job, index uint64, watche
if err := txn.Insert("job_summary", existing); err != nil {
return err
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
// updateSummaryWithAlloc updates the job summary when allocations are updated
// or inserted
func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation,
existingAlloc *structs.Allocation, index uint64, watcher watch.Items, txn *memdb.Txn) error {
func (s *StateStore) UpdateSummaryWithAlloc(newAlloc *structs.Allocation,
existingAlloc *structs.Allocation, index uint64) error {
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
existing, err := s.JobSummaryByID(newAlloc.JobID)
jobID := newAlloc.JobID
taskGroup := newAlloc.TaskGroup
if existingAlloc != nil {
jobID = existingAlloc.JobID
taskGroup = existingAlloc.TaskGroup
}
existing, err := s.JobSummaryByID(jobID)
if err != nil {
return fmt.Errorf("lookup of job summary failed: %v", err)
}
// If we can't find an existing job summary entry then we are not going to create a
// new job summary entry for an allocation with that job id since we don't
// know the task group counts for that job
// TODO May be we can query the job and scan all the allocations for that
// job and create the summary before applying the change of summary state
// that this allocation would cause.
if existing == nil {
return nil
return fmt.Errorf("unable to find job summary")
}
tgSummary, ok := existing.Summary[newAlloc.TaskGroup]
tgSummary, ok := existing.Summary[taskGroup]
if !ok {
return nil
return fmt.Errorf("unable to find task group in the job summary: %v", taskGroup)
}
var hasSummaryChanged bool
if existingAlloc == nil {
@@ -1378,10 +1376,13 @@ func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation,
}
}
existing.Summary[newAlloc.TaskGroup] = tgSummary
existing.Summary[taskGroup] = tgSummary
if err := txn.Insert("job_summary", existing); err != nil {
return fmt.Errorf("inserting job summary failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}