From 36abb97d87eac2872d69be6fc2ba8a549878b3f4 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 19 Jul 2016 16:15:57 -0700 Subject: [PATCH] Applying changes to job updates via FSM --- nomad/fsm.go | 27 ++++++++++++++++++ nomad/state/state_store.go | 57 +++++++++++++++++++------------------- 2 files changed, 56 insertions(+), 28 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index f6af44004..6b2c28a6f 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -239,6 +239,11 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { return err } + if err := n.state.UpdateSummaryWithJob(req.Job, index); err != nil { + n.logger.Printf("[ERR] nomad.fsm: Updating job summary failed: %v", err) + return err + } + // We always add the job to the periodic dispatcher because there is the // possibility that the periodic spec was removed and then we should stop // tracking it. @@ -393,6 +398,10 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} { } } + if err := n.updateJobSummary(index, req.Alloc); err != nil { + return err + } + if err := n.state.UpsertAllocs(index, req.Alloc); err != nil { n.logger.Printf("[ERR] nomad.fsm: UpsertAllocs failed: %v", err) return err @@ -410,6 +419,10 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return nil } + if err := n.updateJobSummary(index, req.Alloc); err != nil { + return err + } + // Update all the client allocations if err := n.state.UpdateAllocsFromClient(index, req.Alloc); err != nil { n.logger.Printf("[ERR] nomad.fsm: UpdateAllocFromClient failed: %v", err) @@ -435,6 +448,20 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return nil } +func (n *nomadFSM) updateJobSummary(index uint64, allocations []*structs.Allocation) error { + for _, alloc := range allocations { + existingAlloc, err := n.state.AllocByID(alloc.ID) + if err != nil { + return fmt.Errorf("unable to get allocation from state store: %v", err) + } + if err := n.state.UpdateSummaryWithAlloc(alloc, existingAlloc, index); err != nil { + return err + } + } + + return nil +} + func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { // Create a new snapshot snap, err := n.state.Snapshot() diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 5f194e41b..2ab39dc27 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -344,11 +344,6 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return fmt.Errorf("index update failed: %v", err) } - // Update the job summary - if err := s.updateSummaryWithJob(job, index, watcher, txn); err != nil { - return fmt.Errorf("job summary update failed: %v", err) - } - txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil @@ -856,11 +851,6 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, watcher watch.I } exist := existing.(*structs.Allocation) - // Update the job summary - if err := s.updateSummaryWithAlloc(alloc, exist, index, watcher, txn); err != nil { - return fmt.Errorf("unable to update job summary: %v", err) - } - // Trigger the watcher watcher.Add(watch.Item{Alloc: alloc.ID}) watcher.Add(watch.Item{AllocEval: exist.EvalID}) @@ -914,9 +904,6 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er } exist, _ := existing.(*structs.Allocation) - if err := s.updateSummaryWithAlloc(alloc, exist, index, watcher, txn); err != nil { - return fmt.Errorf("updating job summary failed: %v", err) - } if exist == nil { alloc.CreateIndex = index alloc.ModifyIndex = index @@ -1250,7 +1237,12 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b // updateSummaryWithJob creates or updates job summaries when new jobs are // upserted or existing ones are updated -func (s *StateStore) updateSummaryWithJob(job *structs.Job, index uint64, watcher watch.Items, txn *memdb.Txn) error { +func (s *StateStore) UpdateSummaryWithJob(job *structs.Job, index uint64) error { + txn := s.db.Txn(true) + defer txn.Abort() + + watcher := watch.NewItems() + existing, err := s.JobSummaryByID(job.ID) if err != nil { return fmt.Errorf("unable to retrieve summary for job: %v", err) @@ -1293,32 +1285,38 @@ func (s *StateStore) updateSummaryWithJob(job *structs.Job, index uint64, watche if err := txn.Insert("job_summary", existing); err != nil { return err } + + txn.Defer(func() { s.watch.notify(watcher) }) + txn.Commit() return nil } // updateSummaryWithAlloc updates the job summary when allocations are updated // or inserted -func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation, - existingAlloc *structs.Allocation, index uint64, watcher watch.Items, txn *memdb.Txn) error { +func (s *StateStore) UpdateSummaryWithAlloc(newAlloc *structs.Allocation, + existingAlloc *structs.Allocation, index uint64) error { + txn := s.db.Txn(true) + defer txn.Abort() + watcher := watch.NewItems() - existing, err := s.JobSummaryByID(newAlloc.JobID) + jobID := newAlloc.JobID + taskGroup := newAlloc.TaskGroup + if existingAlloc != nil { + jobID = existingAlloc.JobID + taskGroup = existingAlloc.TaskGroup + } + + existing, err := s.JobSummaryByID(jobID) if err != nil { return fmt.Errorf("lookup of job summary failed: %v", err) } - - // If we can't find an existing job summary entry then we are not going to create a - // new job summary entry for an allocation with that job id since we don't - // know the task group counts for that job - // TODO May be we can query the job and scan all the allocations for that - // job and create the summary before applying the change of summary state - // that this allocation would cause. if existing == nil { - return nil + return fmt.Errorf("unable to find job summary") } - tgSummary, ok := existing.Summary[newAlloc.TaskGroup] + tgSummary, ok := existing.Summary[taskGroup] if !ok { - return nil + return fmt.Errorf("unable to find task group in the job summary: %v", taskGroup) } var hasSummaryChanged bool if existingAlloc == nil { @@ -1378,10 +1376,13 @@ func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation, } } - existing.Summary[newAlloc.TaskGroup] = tgSummary + existing.Summary[taskGroup] = tgSummary if err := txn.Insert("job_summary", existing); err != nil { return fmt.Errorf("inserting job summary failed: %v", err) } + + txn.Defer(func() { s.watch.notify(watcher) }) + txn.Commit() return nil }