More review comments

This commit is contained in:
Preetha Appan
2018-10-17 23:06:53 -05:00
parent c4a04eb1b2
commit 191b8626d2
3 changed files with 91 additions and 90 deletions

View File

@@ -77,7 +77,7 @@ func TestAPI_OperatorSchedulerGetSetConfiguration(t *testing.T) {
config, _, err = operator.SchedulerGetConfiguration(nil)
require.Nil(err)
require.True(config.PreemptionConfig.SystemSchedulerEnabled)
require.False(config.PreemptionConfig.SystemSchedulerEnabled)
}
func TestAPI_OperatorSchedulerCASConfiguration(t *testing.T) {

View File

@@ -191,9 +191,6 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
// Also gather jobids to create follow up evals
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
for _, alloc := range req.NodePreemptions {
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
alloc.ModifyTime = now
id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
_, ok := preemptedJobIDs[id]
@@ -368,13 +365,13 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan
alloc, err := snap.AllocByID(nil, preemptedAlloc.ID)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
if alloc != nil {
if !alloc.TerminalStatus() {
filteredNodePreemptions = append(filteredNodePreemptions, preemptedAlloc)
}
if alloc != nil && !alloc.TerminalStatus() {
filteredNodePreemptions = append(filteredNodePreemptions, preemptedAlloc)
}
}
result.NodePreemptions[nodeID] = filteredNodePreemptions
}

View File

@@ -219,6 +219,7 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
}
// Prepare preempted allocs in the plan results for update
var preemptedAllocs []*structs.Allocation
for _, preemptedAlloc := range results.NodePreemptions {
// Look for existing alloc
existing, err := txn.First("allocs", "id", preemptedAlloc.ID)
@@ -239,11 +240,14 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
copyAlloc.DesiredStatus = preemptedAlloc.DesiredStatus
copyAlloc.PreemptedByAllocation = preemptedAlloc.PreemptedByAllocation
copyAlloc.DesiredDescription = preemptedAlloc.DesiredDescription
copyAlloc.ModifyTime = preemptedAlloc.ModifyTime
preemptedAllocs = append(preemptedAllocs, copyAlloc)
// Upsert the preempted allocations
if err := s.upsertAllocsImpl(index, []*structs.Allocation{copyAlloc}, txn); err != nil {
return err
}
}
// Upsert the preempted allocations
if err := s.upsertAllocsImpl(index, preemptedAllocs, txn); err != nil {
return err
}
// Upsert followup evals for allocs that were preempted
@@ -3855,6 +3859,84 @@ func (s *StateStore) BootstrapACLTokens(index, resetIndex uint64, token *structs
return nil
}
// SchedulerConfig is used to get the current Scheduler configuration.
func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the scheduler config
c, err := tx.First("scheduler_config", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed scheduler config lookup: %s", err)
}
config, ok := c.(*structs.SchedulerConfiguration)
if !ok {
return 0, nil, nil
}
return config.ModifyIndex, config, nil
}
// SchedulerSetConfig is used to set the current Scheduler configuration.
func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerConfiguration) error {
tx := s.db.Txn(true)
defer tx.Abort()
s.schedulerSetConfigTxn(idx, tx, config)
tx.Commit()
return nil
}
// SchedulerCASConfig is used to try updating the scheduler configuration with a
// given Raft index. If the CAS index specified is not equal to the last observed index
// for the config, then the call is a noop,
func (s *StateStore) SchedulerCASConfig(idx, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
// Check for an existing config
existing, err := tx.First("scheduler_config", "id")
if err != nil {
return false, fmt.Errorf("failed scheduler config lookup: %s", err)
}
// If the existing index does not match the provided CAS
// index arg, then we shouldn't update anything and can safely
// return early here.
e, ok := existing.(*structs.SchedulerConfiguration)
if !ok || e.ModifyIndex != cidx {
return false, nil
}
s.schedulerSetConfigTxn(idx, tx, config)
tx.Commit()
return true, nil
}
func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.SchedulerConfiguration) error {
// Check for an existing config
existing, err := tx.First("scheduler_config", "id")
if err != nil {
return fmt.Errorf("failed scheduler config lookup: %s", err)
}
// Set the indexes.
if existing != nil {
config.CreateIndex = existing.(*structs.SchedulerConfiguration).CreateIndex
} else {
config.CreateIndex = idx
}
config.ModifyIndex = idx
if err := tx.Insert("scheduler_config", config); err != nil {
return fmt.Errorf("failed updating scheduler config: %s", err)
}
return nil
}
// StateSnapshot is used to provide a point-in-time snapshot
type StateSnapshot struct {
StateStore
@@ -3998,81 +4080,3 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) {
}
}
}
// SchedulerConfig is used to get the current Scheduler configuration.
func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the scheduler config
c, err := tx.First("scheduler_config", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed scheduler config lookup: %s", err)
}
config, ok := c.(*structs.SchedulerConfiguration)
if !ok {
return 0, nil, nil
}
return config.ModifyIndex, config, nil
}
// SchedulerSetConfig is used to set the current Scheduler configuration.
func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerConfiguration) error {
tx := s.db.Txn(true)
defer tx.Abort()
s.schedulerSetConfigTxn(idx, tx, config)
tx.Commit()
return nil
}
// SchedulerCASConfig is used to try updating the scheduler configuration with a
// given Raft index. If the CAS index specified is not equal to the last observed index
// for the config, then the call is a noop,
func (s *StateStore) SchedulerCASConfig(idx, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
// Check for an existing config
existing, err := tx.First("scheduler_config", "id")
if err != nil {
return false, fmt.Errorf("failed scheduler config lookup: %s", err)
}
// If the existing index does not match the provided CAS
// index arg, then we shouldn't update anything and can safely
// return early here.
e, ok := existing.(*structs.SchedulerConfiguration)
if !ok || e.ModifyIndex != cidx {
return false, nil
}
s.schedulerSetConfigTxn(idx, tx, config)
tx.Commit()
return true, nil
}
func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.SchedulerConfiguration) error {
// Check for an existing config
existing, err := tx.First("scheduler_config", "id")
if err != nil {
return fmt.Errorf("failed scheduler config lookup: %s", err)
}
// Set the indexes.
if existing != nil {
config.CreateIndex = existing.(*structs.SchedulerConfiguration).CreateIndex
} else {
config.CreateIndex = idx
}
config.ModifyIndex = idx
if err := tx.Insert("scheduler_config", config); err != nil {
return fmt.Errorf("failed updating scheduler config: %s", err)
}
return nil
}