mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 19:05:42 +03:00
modify state store so that autoscaling policies are deleted from their
table as job is stopped (and recreated when job is started)
This commit is contained in:
@@ -1478,16 +1478,10 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
// Delete any job scaling policies
|
||||
numDeletedScalingPolicies, err := txn.DeleteAll("scaling_policy", "target_prefix", namespace, jobID)
|
||||
if err != nil {
|
||||
// Delete any remaining job scaling policies
|
||||
if err := s.deleteJobScalingPolicies(index, job, txn); err != nil {
|
||||
return fmt.Errorf("deleting job scaling policies failed: %v", err)
|
||||
}
|
||||
if numDeletedScalingPolicies > 0 {
|
||||
if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete the scaling events
|
||||
if _, err = txn.DeleteAll("scaling_event", "id", namespace, jobID); err != nil {
|
||||
@@ -1506,6 +1500,20 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteJobScalingPolicies deletes any scaling policies associated with the job
|
||||
func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error {
|
||||
numDeletedScalingPolicies, err := txn.DeleteAll("scaling_policy", "target_prefix", job.Namespace, job.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleting job scaling policies failed: %v", err)
|
||||
}
|
||||
if numDeletedScalingPolicies > 0 {
|
||||
if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteJobVersions deletes all versions of the given job.
|
||||
func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memdb.Txn) error {
|
||||
iter, err := txn.Get("job_version", "id_prefix", job.Namespace, job.ID)
|
||||
@@ -4247,6 +4255,13 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
|
||||
if job.Stop {
|
||||
if err := s.deleteJobScalingPolicies(index, job, txn); err != nil {
|
||||
return fmt.Errorf("deleting job scaling policies failed: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
scalingPolicies := job.GetScalingPolicies()
|
||||
newTargets := map[string]struct{}{}
|
||||
for _, p := range scalingPolicies {
|
||||
|
||||
@@ -8427,7 +8427,96 @@ func TestStateStore_DeleteScalingPolicies(t *testing.T) {
|
||||
require.False(watchFired(ws))
|
||||
}
|
||||
|
||||
func TestStateStore_DeleteJob_ChildScalingPolicies(t *testing.T) {
|
||||
func TestStateStore_StopJob_DeleteScalingPolicies(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
state := testStateStore(t)
|
||||
|
||||
job := mock.Job()
|
||||
|
||||
err := state.UpsertJob(1000, job)
|
||||
require.NoError(err)
|
||||
|
||||
policy := mock.ScalingPolicy()
|
||||
policy.Target[structs.ScalingTargetJob] = job.ID
|
||||
err = state.UpsertScalingPolicies(1100, []*structs.ScalingPolicy{policy})
|
||||
require.NoError(err)
|
||||
|
||||
// Ensure the scaling policy is present and start some watches
|
||||
wsGet := memdb.NewWatchSet()
|
||||
out, err := state.ScalingPolicyByTarget(wsGet, policy.Target)
|
||||
require.NoError(err)
|
||||
require.NotNil(out)
|
||||
wsList := memdb.NewWatchSet()
|
||||
_, err = state.ScalingPolicies(wsList)
|
||||
require.NoError(err)
|
||||
|
||||
// Stop the job
|
||||
job, err = state.JobByID(nil, job.Namespace, job.ID)
|
||||
require.NoError(err)
|
||||
job.Stop = true
|
||||
err = state.UpsertJob(1200, job)
|
||||
require.NoError(err)
|
||||
|
||||
// Ensure:
|
||||
// * the scaling policy was deleted
|
||||
// * the watches were fired
|
||||
// * the table index was advanced
|
||||
require.True(watchFired(wsGet))
|
||||
require.True(watchFired(wsList))
|
||||
out, err = state.ScalingPolicyByTarget(nil, policy.Target)
|
||||
require.NoError(err)
|
||||
require.Nil(out)
|
||||
index, err := state.Index("scaling_policy")
|
||||
require.GreaterOrEqual(index, uint64(1200))
|
||||
}
|
||||
|
||||
func TestStateStore_UnstopJob_UpsertScalingPolicies(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
state := testStateStore(t)
|
||||
|
||||
job, policy := mock.JobWithScalingPolicy()
|
||||
job.Stop = true
|
||||
|
||||
// establish watcher, verify there are no scaling policies yet
|
||||
ws := memdb.NewWatchSet()
|
||||
list, err := state.ScalingPolicies(ws)
|
||||
require.NoError(err)
|
||||
require.Nil(list.Next())
|
||||
|
||||
// upsert a stopped job, verify that we don't fire the watcher or add any scaling policies
|
||||
err = state.UpsertJob(1000, job)
|
||||
require.NoError(err)
|
||||
require.False(watchFired(ws))
|
||||
// stopped job should have no scaling policies, watcher doesn't fire
|
||||
list, err = state.ScalingPolicies(ws)
|
||||
require.NoError(err)
|
||||
require.Nil(list.Next())
|
||||
|
||||
// Establish a new watcher
|
||||
ws = memdb.NewWatchSet()
|
||||
_, err = state.ScalingPolicies(ws)
|
||||
require.NoError(err)
|
||||
// Unstop this job, say you'll run it again...
|
||||
job.Stop = false
|
||||
err = state.UpsertJob(1100, job)
|
||||
require.NoError(err)
|
||||
|
||||
// Ensure the scaling policy was added, watch was fired, index was advanced
|
||||
require.True(watchFired(ws))
|
||||
out, err := state.ScalingPolicyByTarget(nil, policy.Target)
|
||||
require.NoError(err)
|
||||
require.NotNil(out)
|
||||
index, err := state.Index("scaling_policy")
|
||||
require.GreaterOrEqual(index, uint64(1100))
|
||||
}
|
||||
|
||||
func TestStateStore_DeleteJob_DeleteScalingPolicies(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
Reference in New Issue
Block a user