From 191b8626d25c0558b168ceac58a278167b7ea0e1 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 17 Oct 2018 23:06:53 -0500 Subject: [PATCH] More review comments --- api/operator_test.go | 2 +- nomad/plan_apply.go | 11 +-- nomad/state/state_store.go | 168 +++++++++++++++++++------------------ 3 files changed, 91 insertions(+), 90 deletions(-) diff --git a/api/operator_test.go b/api/operator_test.go index 9c95228af..25dcf44a8 100644 --- a/api/operator_test.go +++ b/api/operator_test.go @@ -77,7 +77,7 @@ func TestAPI_OperatorSchedulerGetSetConfiguration(t *testing.T) { config, _, err = operator.SchedulerGetConfiguration(nil) require.Nil(err) - require.True(config.PreemptionConfig.SystemSchedulerEnabled) + require.False(config.PreemptionConfig.SystemSchedulerEnabled) } func TestAPI_OperatorSchedulerCASConfiguration(t *testing.T) { diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f48d7c4fb..205609ce5 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -191,9 +191,6 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap // Also gather jobids to create follow up evals preemptedJobIDs := make(map[structs.NamespacedID]struct{}) for _, alloc := range req.NodePreemptions { - if alloc.CreateTime == 0 { - alloc.CreateTime = now - } alloc.ModifyTime = now id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID} _, ok := preemptedJobIDs[id] @@ -368,13 +365,13 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan alloc, err := snap.AllocByID(nil, preemptedAlloc.ID) if err != nil { mErr.Errors = append(mErr.Errors, err) + continue } - if alloc != nil { - if !alloc.TerminalStatus() { - filteredNodePreemptions = append(filteredNodePreemptions, preemptedAlloc) - } + if alloc != nil && !alloc.TerminalStatus() { + filteredNodePreemptions = append(filteredNodePreemptions, preemptedAlloc) } } + result.NodePreemptions[nodeID] = filteredNodePreemptions } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 09b742750..793d01e83 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -219,6 +219,7 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR } // Prepare preempted allocs in the plan results for update + var preemptedAllocs []*structs.Allocation for _, preemptedAlloc := range results.NodePreemptions { // Look for existing alloc existing, err := txn.First("allocs", "id", preemptedAlloc.ID) @@ -239,11 +240,14 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR copyAlloc.DesiredStatus = preemptedAlloc.DesiredStatus copyAlloc.PreemptedByAllocation = preemptedAlloc.PreemptedByAllocation copyAlloc.DesiredDescription = preemptedAlloc.DesiredDescription + copyAlloc.ModifyTime = preemptedAlloc.ModifyTime + preemptedAllocs = append(preemptedAllocs, copyAlloc) - // Upsert the preempted allocations - if err := s.upsertAllocsImpl(index, []*structs.Allocation{copyAlloc}, txn); err != nil { - return err - } + } + + // Upsert the preempted allocations + if err := s.upsertAllocsImpl(index, preemptedAllocs, txn); err != nil { + return err } // Upsert followup evals for allocs that were preempted @@ -3855,6 +3859,84 @@ func (s *StateStore) BootstrapACLTokens(index, resetIndex uint64, token *structs return nil } +// SchedulerConfig is used to get the current Scheduler configuration. +func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the scheduler config + c, err := tx.First("scheduler_config", "id") + if err != nil { + return 0, nil, fmt.Errorf("failed scheduler config lookup: %s", err) + } + + config, ok := c.(*structs.SchedulerConfiguration) + if !ok { + return 0, nil, nil + } + + return config.ModifyIndex, config, nil +} + +// SchedulerSetConfig is used to set the current Scheduler configuration. +func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerConfiguration) error { + tx := s.db.Txn(true) + defer tx.Abort() + + s.schedulerSetConfigTxn(idx, tx, config) + + tx.Commit() + return nil +} + +// SchedulerCASConfig is used to try updating the scheduler configuration with a +// given Raft index. If the CAS index specified is not equal to the last observed index +// for the config, then the call is a noop, +func (s *StateStore) SchedulerCASConfig(idx, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Check for an existing config + existing, err := tx.First("scheduler_config", "id") + if err != nil { + return false, fmt.Errorf("failed scheduler config lookup: %s", err) + } + + // If the existing index does not match the provided CAS + // index arg, then we shouldn't update anything and can safely + // return early here. + e, ok := existing.(*structs.SchedulerConfiguration) + if !ok || e.ModifyIndex != cidx { + return false, nil + } + + s.schedulerSetConfigTxn(idx, tx, config) + + tx.Commit() + return true, nil +} + +func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.SchedulerConfiguration) error { + // Check for an existing config + existing, err := tx.First("scheduler_config", "id") + if err != nil { + return fmt.Errorf("failed scheduler config lookup: %s", err) + } + + // Set the indexes. + if existing != nil { + config.CreateIndex = existing.(*structs.SchedulerConfiguration).CreateIndex + } else { + config.CreateIndex = idx + } + config.ModifyIndex = idx + + if err := tx.Insert("scheduler_config", config); err != nil { + return fmt.Errorf("failed updating scheduler config: %s", err) + } + return nil +} + // StateSnapshot is used to provide a point-in-time snapshot type StateSnapshot struct { StateStore @@ -3998,81 +4080,3 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) { } } } - -// SchedulerConfig is used to get the current Scheduler configuration. -func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) { - tx := s.db.Txn(false) - defer tx.Abort() - - // Get the scheduler config - c, err := tx.First("scheduler_config", "id") - if err != nil { - return 0, nil, fmt.Errorf("failed scheduler config lookup: %s", err) - } - - config, ok := c.(*structs.SchedulerConfiguration) - if !ok { - return 0, nil, nil - } - - return config.ModifyIndex, config, nil -} - -// SchedulerSetConfig is used to set the current Scheduler configuration. -func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerConfiguration) error { - tx := s.db.Txn(true) - defer tx.Abort() - - s.schedulerSetConfigTxn(idx, tx, config) - - tx.Commit() - return nil -} - -// SchedulerCASConfig is used to try updating the scheduler configuration with a -// given Raft index. If the CAS index specified is not equal to the last observed index -// for the config, then the call is a noop, -func (s *StateStore) SchedulerCASConfig(idx, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) { - tx := s.db.Txn(true) - defer tx.Abort() - - // Check for an existing config - existing, err := tx.First("scheduler_config", "id") - if err != nil { - return false, fmt.Errorf("failed scheduler config lookup: %s", err) - } - - // If the existing index does not match the provided CAS - // index arg, then we shouldn't update anything and can safely - // return early here. - e, ok := existing.(*structs.SchedulerConfiguration) - if !ok || e.ModifyIndex != cidx { - return false, nil - } - - s.schedulerSetConfigTxn(idx, tx, config) - - tx.Commit() - return true, nil -} - -func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.SchedulerConfiguration) error { - // Check for an existing config - existing, err := tx.First("scheduler_config", "id") - if err != nil { - return fmt.Errorf("failed scheduler config lookup: %s", err) - } - - // Set the indexes. - if existing != nil { - config.CreateIndex = existing.(*structs.SchedulerConfiguration).CreateIndex - } else { - config.CreateIndex = idx - } - config.ModifyIndex = idx - - if err := tx.Insert("scheduler_config", config); err != nil { - return fmt.Errorf("failed updating scheduler config: %s", err) - } - return nil -}