From 3b4a1aecd94f5a2929e01fae07c6bfffb50cb968 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Fri, 21 Feb 2020 21:23:30 +0000 Subject: [PATCH] finished refactoring state store, schema, etc --- api/scaling.go | 10 +- api/scaling_test.go | 33 ++++-- command/agent/job_endpoint.go | 2 +- command/agent/scaling_endpoint.go | 11 +- nomad/job_endpoint.go | 33 +++++- nomad/mock/mock.go | 18 ++-- nomad/state/schema.go | 93 +++++++++++----- nomad/state/state_store.go | 69 +++++++----- nomad/state/state_store_test.go | 171 ++++++++++++++++++++++++++---- nomad/structs/funcs.go | 15 ++- nomad/structs/structs.go | 49 ++++++--- 11 files changed, 383 insertions(+), 121 deletions(-) diff --git a/api/scaling.go b/api/scaling.go index 579802ead..5107fdecf 100644 --- a/api/scaling.go +++ b/api/scaling.go @@ -39,6 +39,7 @@ type ScalingRequest struct { Value interface{} Reason string Error string + Meta map[string]interface{} WriteRequest // this is effectively a job update, so we need the ability to override policy. PolicyOverride bool @@ -48,8 +49,9 @@ type ScalingRequest struct { type ScalingPolicy struct { ID string Namespace string - Target string - JobID string + Target map[string]string + Min int64 + Max int64 Policy map[string]interface{} Enabled *bool CreateIndex uint64 @@ -60,8 +62,8 @@ type ScalingPolicy struct { // for the scaling policy list type ScalingPolicyListStub struct { ID string - JobID string - Target string + Enabled bool + Target map[string]string CreateIndex uint64 ModifyIndex uint64 } diff --git a/api/scaling_test.go b/api/scaling_test.go index 8b8e20526..6b7962414 100644 --- a/api/scaling_test.go +++ b/api/scaling_test.go @@ -1,13 +1,12 @@ package api import ( - "fmt" "testing" "github.com/stretchr/testify/require" ) -func TestScalingPolicies_List(t *testing.T) { +func TestScalingPolicies_ListPolicies(t *testing.T) { t.Parallel() require := require.New(t) @@ -36,12 +35,18 @@ func TestScalingPolicies_List(t *testing.T) { policy := policies[0] - // Check that the scaling policy references the right job - require.Equalf(policy.JobID, *job.ID, "expected JobID=%s, got: %s", *job.ID, policy.JobID) + // Check that the scaling policy references the right namespace + namespace := DefaultNamespace + if job.Namespace != nil && *job.Namespace != "" { + namespace = *job.Namespace + } + require.Equal(policy.Target["Namespace"], namespace) - // Check that the scaling policy references the right target - expectedTarget := fmt.Sprintf("/v1/job/%s/%s/scale", *job.ID, *job.TaskGroups[0].Name) - require.Equalf(expectedTarget, policy.Target, "expected Target=%s, got: %s", expectedTarget, policy.Target) + // Check that the scaling policy references the right job + require.Equal(policy.Target["Job"], *job.ID) + + // Check that the scaling policy references the right group + require.Equal(policy.Target["Group"], *job.TaskGroups[0].Name) } func TestScalingPolicies_GetPolicy(t *testing.T) { @@ -80,7 +85,7 @@ func TestScalingPolicies_GetPolicy(t *testing.T) { policies, _, err := scaling.ListPolicies(nil) require.NoError(err) for _, p := range policies { - if p.JobID == *job.ID { + if p.Target["Job"] == *job.ID { policyID = p.ID break } @@ -94,8 +99,16 @@ func TestScalingPolicies_GetPolicy(t *testing.T) { require.NoError(err) // Check that the scaling policy fields match - expectedTarget := fmt.Sprintf("/v1/job/%s/%s/scale", *job.ID, *job.TaskGroups[0].Name) - require.Equalf(expectedTarget, resp.Target, "expected Target=%s, got: %s", expectedTarget, policy.Target) + namespace := DefaultNamespace + if job.Namespace != nil && *job.Namespace != "" { + namespace = *job.Namespace + } + expectedTarget := map[string]string{ + "Namespace": namespace, + "Job": *job.ID, + "Group": *job.TaskGroups[0].Name, + } + require.Equal(expectedTarget, resp.Target) require.Equal(policy.Policy, resp.Policy) require.Equal(policy.Enabled, resp.Enabled) } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index ebee8bba2..b8d250ddc 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -824,7 +824,7 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta } if taskGroup.Scaling != nil { - tg.Scaling = ApiScalingPolicyToStructs(job, taskGroup.Scaling).TargetTaskGroup(job, tg) + tg.Scaling = ApiScalingPolicyToStructs(taskGroup.Scaling).TargetTaskGroup(job, tg) } tg.EphemeralDisk = &structs.EphemeralDisk{ diff --git a/command/agent/scaling_endpoint.go b/command/agent/scaling_endpoint.go index 92493fc41..04a806700 100644 --- a/command/agent/scaling_endpoint.go +++ b/command/agent/scaling_endpoint.go @@ -75,11 +75,12 @@ func (s *HTTPServer) scalingPolicyQuery(resp http.ResponseWriter, req *http.Requ return out.Policy, nil } -func ApiScalingPolicyToStructs(job *structs.Job, a1 *api.ScalingPolicy) *structs.ScalingPolicy { +func ApiScalingPolicyToStructs(ap *api.ScalingPolicy) *structs.ScalingPolicy { return &structs.ScalingPolicy{ - Namespace: job.Namespace, - JobID: job.ID, - Enabled: *a1.Enabled, - Policy: a1.Policy, + Enabled: *ap.Enabled, + Min: ap.Min, + Max: ap.Max, + Policy: ap.Policy, + Target: map[string]string{}, } } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 8a979b096..185ead74f 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -14,13 +14,14 @@ import ( "github.com/golang/snappy" "github.com/hashicorp/consul/lib" + "github.com/pkg/errors" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" - "github.com/pkg/errors" ) const ( @@ -189,6 +190,11 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return err } + // Ensure that all scaling policies have an appropriate ID + if err := propagateScalingPolicyIDs(existingJob, args.Job); err != nil { + return err + } + // Ensure that the job has permissions for the requested Vault tokens policies := args.Job.VaultPolicies() if len(policies) != 0 { @@ -332,6 +338,31 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return nil } +// propagateScalingPolicyIDs propagates scaling policy IDs from existing job +// to updated job, or generates random IDs in new job +func propagateScalingPolicyIDs(old, new *structs.Job) error { + + oldIDs := make(map[string]string) + if old != nil { + // jobs currently only have scaling policies on task groups, so we can + // find correspondences using task group names + for _, p := range old.GetScalingPolicies() { + oldIDs[p.Target[structs.ScalingTargetGroup]] = p.ID + } + } + + // ignore any existing ID in the policy, they should be empty + for _, p := range new.GetScalingPolicies() { + if id, ok := oldIDs[p.Target[structs.ScalingTargetGroup]]; ok { + p.ID = id + } else { + p.ID = uuid.Generate() + } + } + + return nil +} + // getSignalConstraint builds a suitable constraint based on the required // signals func getSignalConstraint(signals []string) *structs.Constraint { diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index fb5b24f3d..a52838ff9 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -1256,10 +1256,12 @@ func ACLManagementToken() *structs.ACLToken { func ScalingPolicy() *structs.ScalingPolicy { return &structs.ScalingPolicy{ - ID: uuid.Generate(), - Namespace: structs.DefaultNamespace, - Target: uuid.Generate(), - JobID: uuid.Generate(), + ID: uuid.Generate(), + Target: map[string]string{ + structs.ScalingTargetNamespace: structs.DefaultNamespace, + structs.ScalingTargetJob: uuid.Generate(), + structs.ScalingTargetGroup: uuid.Generate(), + }, Policy: map[string]interface{}{ "a": "b", }, @@ -1272,11 +1274,9 @@ func ScalingPolicy() *structs.ScalingPolicy { func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) { job := Job() policy := &structs.ScalingPolicy{ - ID: uuid.Generate(), - Namespace: job.Namespace, - JobID: job.ID, - Policy: map[string]interface{}{}, - Enabled: true, + ID: uuid.Generate(), + Policy: map[string]interface{}{}, + Enabled: true, } policy.TargetTaskGroup(job, job.TaskGroups[0]) job.TaskGroups[0].Scaling = policy diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 449fcd6f6..a87738c03 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -5,6 +5,7 @@ import ( "sync" memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/structs" ) @@ -730,6 +731,63 @@ func csiPluginTableSchema() *memdb.TableSchema { } } +// StringFieldIndex is used to extract a field from an object +// using reflection and builds an index on that field. +type ScalingPolicyTargetFieldIndex struct { + Field string +} + +// FromObject is used to extract an index value from an +// object or to indicate that the index value is missing. +func (s *ScalingPolicyTargetFieldIndex) FromObject(obj interface{}) (bool, []byte, error) { + policy, ok := obj.(*structs.ScalingPolicy) + if !ok { + return false, nil, fmt.Errorf("object %#v is not a ScalingPolicy", obj) + } + + if policy.Target == nil { + return false, nil, nil + } + + val, ok := policy.Target[s.Field] + if !ok { + return false, nil, nil + } + + // Add the null character as a terminator + val += "\x00" + return true, []byte(val), nil +} + +// FromArgs is used to build an exact index lookup based on arguments +func (s *ScalingPolicyTargetFieldIndex) FromArgs(args ...interface{}) ([]byte, error) { + if len(args) != 1 { + return nil, fmt.Errorf("must provide only a single argument") + } + arg, ok := args[0].(string) + if !ok { + return nil, fmt.Errorf("argument must be a string: %#v", args[0]) + } + // Add the null character as a terminator + arg += "\x00" + return []byte(arg), nil +} + +// PrefixFromArgs returns a prefix that should be used for scanning based on the arguments +func (s *ScalingPolicyTargetFieldIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) { + val, err := s.FromArgs(args...) + if err != nil { + return nil, err + } + + // Strip the null terminator, the rest is a prefix + n := len(val) + if n > 0 { + return val[:n-1], nil + } + return val, nil +} + // scalingPolicyTableSchema returns the MemDB schema for the policy table. // This table is used to store the policies which are referenced by tokens func scalingPolicyTableSchema() *memdb.TableSchema { @@ -742,49 +800,32 @@ func scalingPolicyTableSchema() *memdb.TableSchema { AllowMissing: false, Unique: true, - // Use a compound index so the tuple of (Namespace, Target) is - // uniquely identifying + // UUID is uniquely identifying Indexer: &memdb.StringFieldIndex{ Field: "ID", }, }, - // Target index is used for looking up by target or listing policies in namespace - // A target can only have a single scaling policy, so this is guaranteed to be unique. + // Target index is used for listing by namespace or job, or looking up a specific target. + // A given task group can have only a single scaling policies, so this is guaranteed to be unique. "target": { Name: "target", AllowMissing: false, Unique: true, - // Use a compound index so the tuple of (Namespace, Target) is + // Use a compound index so the tuple of (Namespace, Job, Group) is // uniquely identifying Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ + &ScalingPolicyTargetFieldIndex{ Field: "Namespace", }, - &memdb.StringFieldIndex{ - Field: "Target", - }, - }, - }, - }, - // Job index is used to lookup scaling policies by job - "job": { - Name: "job", - AllowMissing: false, - Unique: false, - - // Use a compound index so the tuple of (Namespace, JobID) is - // uniquely identifying - Indexer: &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "Namespace", + &ScalingPolicyTargetFieldIndex{ + Field: "Job", }, - &memdb.StringFieldIndex{ - Field: "JobID", + &ScalingPolicyTargetFieldIndex{ + Field: "Group", }, }, }, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 0e0316dc5..26d5dde6d 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -10,10 +10,10 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" multierror "github.com/hashicorp/go-multierror" - "github.com/hashicorp/nomad/helper" - "github.com/hashicorp/nomad/helper/uuid" - "github.com/hashicorp/nomad/nomad/structs" "github.com/pkg/errors" + + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/structs" ) // Txn is a transaction against a state store. @@ -1284,12 +1284,15 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn return fmt.Errorf("index update failed: %v", err) } - // Delete the job scaling policies - if _, err := txn.DeleteAll("scaling_policy", "job", namespace, jobID); err != nil { + // Delete any job scaling policies + numDeletedScalingPolicies, err := txn.DeleteAll("scaling_policy", "target_prefix", namespace, jobID) + if err != nil { return fmt.Errorf("deleting job scaling policies failed: %v", err) } - if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) + if numDeletedScalingPolicies > 0 { + if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } } return nil @@ -4002,7 +4005,7 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx scalingPolicies := job.GetScalingPolicies() newTargets := map[string]struct{}{} for _, p := range scalingPolicies { - newTargets[p.Target] = struct{}{} + newTargets[p.Target[structs.ScalingTargetGroup]] = struct{}{} } // find existing policies that need to be deleted deletedPolicies := []string{} @@ -4016,7 +4019,7 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx break } oldPolicy := raw.(*structs.ScalingPolicy) - if _, ok := newTargets[oldPolicy.Target]; !ok { + if _, ok := newTargets[oldPolicy.Target[structs.ScalingTargetGroup]]; !ok { deletedPolicies = append(deletedPolicies, oldPolicy.ID) } } @@ -4720,35 +4723,45 @@ func (s *StateStore) UpsertScalingPolicies(index uint64, scalingPolicies []*stru func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*structs.ScalingPolicy, txn *memdb.Txn) error { - for _, scalingPolicy := range scalingPolicies { + hadUpdates := false + + for _, policy := range scalingPolicies { // Check if the scaling policy already exists - existing, err := txn.First("scaling_policy", "target", scalingPolicy.Namespace, scalingPolicy.Target) + existing, err := txn.First("scaling_policy", "target", + policy.Target[structs.ScalingTargetNamespace], + policy.Target[structs.ScalingTargetJob], + policy.Target[structs.ScalingTargetGroup]) if err != nil { return fmt.Errorf("scaling policy lookup failed: %v", err) } // Setup the indexes correctly if existing != nil { - scalingPolicy.CreateIndex = existing.(*structs.ScalingPolicy).CreateIndex - scalingPolicy.ModifyIndex = index - scalingPolicy.ID = existing.(*structs.ScalingPolicy).ID - } else { - scalingPolicy.CreateIndex = index - scalingPolicy.ModifyIndex = index - if scalingPolicy.ID == "" { - scalingPolicy.ID = uuid.Generate() + p := existing.(*structs.ScalingPolicy) + if !p.Diff(policy) { + continue } + policy.ID = p.ID + policy.CreateIndex = p.CreateIndex + policy.ModifyIndex = index + } else { + // policy.ID must have been set already in Job.Register before log apply + policy.CreateIndex = index + policy.ModifyIndex = index } // Insert the scaling policy - if err := txn.Insert("scaling_policy", scalingPolicy); err != nil { + hadUpdates = true + if err := txn.Insert("scaling_policy", policy); err != nil { return err } } // Update the indexes table for scaling policy - if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) + if hadUpdates { + if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } } return nil @@ -4815,7 +4828,7 @@ func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID st func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID string, txn *memdb.Txn) (memdb.ResultIterator, error) { - iter, err := txn.Get("scaling_policy", "job", namespace, jobID) + iter, err := txn.Get("scaling_policy", "target_prefix", namespace, jobID) if err != nil { return nil, err } @@ -4840,10 +4853,16 @@ func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.S return nil, nil } -func (s *StateStore) ScalingPolicyByTarget(ws memdb.WatchSet, namespace, target string) (*structs.ScalingPolicy, error) { +func (s *StateStore) ScalingPolicyByTarget(ws memdb.WatchSet, target map[string]string) (*structs.ScalingPolicy, + error) { txn := s.db.Txn(false) - watchCh, existing, err := txn.FirstWatch("scaling_policy", "target", namespace, target) + // currently, only scaling policy type is against a task group + namespace := target[structs.ScalingTargetNamespace] + job := target[structs.ScalingTargetJob] + group := target[structs.ScalingTargetGroup] + + watchCh, existing, err := txn.FirstWatch("scaling_policy", "target", namespace, job, group) if err != nil { return nil, fmt.Errorf("scaling_policy lookup failed: %v", err) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 991e2e940..52dc41e76 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -10,12 +10,13 @@ import ( "time" "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func testStateStore(t *testing.T) *StateStore { @@ -7928,10 +7929,10 @@ func TestStateStore_UpsertScalingPolicy(t *testing.T) { policy2 := mock.ScalingPolicy() ws := memdb.NewWatchSet() - _, err := state.ScalingPolicyByTarget(ws, policy.Namespace, policy.Target) + _, err := state.ScalingPolicyByTarget(ws, policy.Target) require.NoError(err) - _, err = state.ScalingPolicyByTarget(ws, policy.Namespace, policy2.Target) + _, err = state.ScalingPolicyByTarget(ws, policy2.Target) require.NoError(err) err = state.UpsertScalingPolicies(1000, []*structs.ScalingPolicy{policy, policy2}) @@ -7939,15 +7940,15 @@ func TestStateStore_UpsertScalingPolicy(t *testing.T) { require.True(watchFired(ws)) ws = memdb.NewWatchSet() - out, err := state.ScalingPolicyByTarget(ws, policy.Namespace, policy.Target) + out, err := state.ScalingPolicyByTarget(ws, policy.Target) require.NoError(err) require.Equal(policy, out) - out, err = state.ScalingPolicyByTarget(ws, policy2.Namespace, policy2.Target) + out, err = state.ScalingPolicyByTarget(ws, policy2.Target) require.NoError(err) require.Equal(policy2, out) - iter, err := state.ScalingPoliciesByNamespace(ws, policy.Namespace) + iter, err := state.ScalingPoliciesByNamespace(ws, policy.Target[structs.ScalingTargetNamespace]) require.NoError(err) // Ensure we see both policies @@ -7977,7 +7978,7 @@ func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) { // Create a watchset so we can test that upsert fires the watch ws := memdb.NewWatchSet() - out, err := state.ScalingPolicyByTarget(ws, policy.Namespace, policy.Target) + out, err := state.ScalingPolicyByTarget(ws, policy.Target) require.NoError(err) require.Nil(out) @@ -7987,7 +7988,7 @@ func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) { require.True(watchFired(ws), "watch did not fire") ws = memdb.NewWatchSet() - out, err = state.ScalingPolicyByTarget(ws, policy.Namespace, policy.Target) + out, err = state.ScalingPolicyByTarget(ws, policy.Target) require.NoError(err) require.NotNil(out) require.Equal(newIndex, out.CreateIndex) @@ -7997,6 +7998,94 @@ func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) { require.Equal(newIndex, index) } +// Scaling Policy IDs are generated randomly during Job.Register +// Subsequent updates of the job should preserve the ID for the scaling policy +// associated with a given target. +func TestStateStore_UpsertJob_PreserveScalingPolicyIDsAndIndex(t *testing.T) { + t.Parallel() + + require := require.New(t) + + state := testStateStore(t) + job, policy := mock.JobWithScalingPolicy() + + var newIndex uint64 = 1000 + err := state.UpsertJob(newIndex, job) + require.NoError(err) + + ws := memdb.NewWatchSet() + p1, err := state.ScalingPolicyByTarget(ws, policy.Target) + require.NoError(err) + require.NotNil(p1) + require.Equal(newIndex, p1.CreateIndex) + require.Equal(newIndex, p1.ModifyIndex) + + index, err := state.Index("scaling_policy") + require.Equal(newIndex, index) + require.NotEmpty(p1.ID) + + // update the job + job.Meta["new-meta"] = "new-value" + newIndex += 100 + err = state.UpsertJob(newIndex, job) + require.NoError(err) + require.False(watchFired(ws), "watch should not have fired") + + p2, err := state.ScalingPolicyByTarget(nil, policy.Target) + require.NoError(err) + require.NotNil(p2) + require.Equal(p1.ID, p2.ID, "ID should not have changed") + require.Equal(p1.CreateIndex, p2.CreateIndex) + require.Equal(p1.ModifyIndex, p2.ModifyIndex) + + index, err = state.Index("scaling_policy") + require.Equal(index, p1.CreateIndex, "table index should not have changed") +} + +// Updating the scaling policy for a job should update the index table and fire the watch. +// This test is the converse of TestStateStore_UpsertJob_PreserveScalingPolicyIDsAndIndex +func TestStateStore_UpsertJob_UpdateScalingPolicy(t *testing.T) { + t.Parallel() + + require := require.New(t) + + state := testStateStore(t) + job, policy := mock.JobWithScalingPolicy() + + var oldIndex uint64 = 1000 + require.NoError(state.UpsertJob(oldIndex, job)) + + ws := memdb.NewWatchSet() + p1, err := state.ScalingPolicyByTarget(ws, policy.Target) + require.NoError(err) + require.NotNil(p1) + require.Equal(oldIndex, p1.CreateIndex) + require.Equal(oldIndex, p1.ModifyIndex) + prevId := p1.ID + + index, err := state.Index("scaling_policy") + require.Equal(oldIndex, index) + require.NotEmpty(p1.ID) + + // update the job with the updated scaling policy; make sure to use a different object + newPolicy := structs.CopyScalingPolicy(p1) + newPolicy.Policy["new-field"] = "new-value" + job.TaskGroups[0].Scaling = newPolicy + require.NoError(state.UpsertJob(oldIndex+100, job)) + require.True(watchFired(ws), "watch should have fired") + + p2, err := state.ScalingPolicyByTarget(nil, policy.Target) + require.NoError(err) + require.NotNil(p2) + require.Equal(p2.Policy["new-field"], "new-value") + require.Equal(prevId, p2.ID, "ID should not have changed") + require.Equal(oldIndex, p2.CreateIndex) + require.Greater(p2.ModifyIndex, oldIndex, "ModifyIndex should have advanced") + + index, err = state.Index("scaling_policy") + require.Greater(index, oldIndex, "table index should have advanced") +} + func TestStateStore_DeleteScalingPolicies(t *testing.T) { t.Parallel() @@ -8012,7 +8101,7 @@ func TestStateStore_DeleteScalingPolicies(t *testing.T) { // Create a watcher ws := memdb.NewWatchSet() - _, err = state.ScalingPolicyByTarget(ws, policy.Namespace, policy.Target) + _, err = state.ScalingPolicyByTarget(ws, policy.Target) require.NoError(err) // Delete the policy @@ -8024,17 +8113,17 @@ func TestStateStore_DeleteScalingPolicies(t *testing.T) { // Ensure we don't get the objects back ws = memdb.NewWatchSet() - out, err := state.ScalingPolicyByTarget(ws, policy.Namespace, policy.Target) + out, err := state.ScalingPolicyByTarget(ws, policy.Target) require.NoError(err) require.Nil(out) ws = memdb.NewWatchSet() - out, err = state.ScalingPolicyByTarget(ws, policy2.Namespace, policy2.Target) + out, err = state.ScalingPolicyByTarget(ws, policy2.Target) require.NoError(err) require.Nil(out) // Ensure we see both policies - iter, err := state.ScalingPoliciesByNamespace(ws, policy.Namespace) + iter, err := state.ScalingPoliciesByNamespace(ws, policy.Target[structs.ScalingTargetNamespace]) require.NoError(err) count := 0 for { @@ -8065,7 +8154,7 @@ func TestStateStore_DeleteJob_ChildScalingPolicies(t *testing.T) { require.NoError(err) policy := mock.ScalingPolicy() - policy.JobID = job.ID + policy.Target[structs.ScalingTargetJob] = job.ID err = state.UpsertScalingPolicies(1001, []*structs.ScalingPolicy{policy}) require.NoError(err) @@ -8075,13 +8164,44 @@ func TestStateStore_DeleteJob_ChildScalingPolicies(t *testing.T) { // Ensure the scaling policy was deleted ws := memdb.NewWatchSet() - out, err := state.ScalingPolicyByTarget(ws, policy.Namespace, policy.Target) + out, err := state.ScalingPolicyByTarget(ws, policy.Target) require.NoError(err) require.Nil(out) index, err := state.Index("scaling_policy") require.True(index > 1001) } +// This test ensures that deleting a job that doesn't have any scaling policies +// will not cause the scaling_policy table index to increase, on either job +// registration or deletion. +func TestStateStore_DeleteJob_ScalingPolicyIndexNoop(t *testing.T) { + t.Parallel() + + require := require.New(t) + + state := testStateStore(t) + + job := mock.Job() + + prevIndex, err := state.Index("scaling_policy") + require.NoError(err) + + err = state.UpsertJob(1000, job) + require.NoError(err) + + newIndex, err := state.Index("scaling_policy") + require.NoError(err) + require.Equal(prevIndex, newIndex) + + // Delete the job + err = state.DeleteJob(1002, job.Namespace, job.ID) + require.NoError(err) + + newIndex, err = state.Index("scaling_policy") + require.NoError(err) + require.Equal(prevIndex, newIndex) +} + func TestStateStore_ScalingPoliciesByJob(t *testing.T) { t.Parallel() @@ -8091,14 +8211,16 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) { policyA := mock.ScalingPolicy() policyB1 := mock.ScalingPolicy() policyB2 := mock.ScalingPolicy() - policyB1.JobID = policyB2.JobID + policyB1.Target[structs.ScalingTargetJob] = policyB2.Target[structs.ScalingTargetJob] // Create the policies var baseIndex uint64 = 1000 err := state.UpsertScalingPolicies(baseIndex, []*structs.ScalingPolicy{policyA, policyB1, policyB2}) require.NoError(err) - iter, err := state.ScalingPoliciesByJob(nil, policyA.Namespace, policyA.JobID) + iter, err := state.ScalingPoliciesByJob(nil, + policyA.Target[structs.ScalingTargetNamespace], + policyA.Target[structs.ScalingTargetJob]) require.NoError(err) // Ensure we see expected policies @@ -8110,15 +8232,17 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) { break } count++ - found = append(found, raw.(*structs.ScalingPolicy).Target) + found = append(found, raw.(*structs.ScalingPolicy).Target[structs.ScalingTargetGroup]) } require.Equal(1, count) sort.Strings(found) - expect := []string{policyA.Target} + expect := []string{policyA.Target[structs.ScalingTargetGroup]} sort.Strings(expect) require.Equal(expect, found) - iter, err = state.ScalingPoliciesByJob(nil, policyB1.Namespace, policyB1.JobID) + iter, err = state.ScalingPoliciesByJob(nil, + policyB1.Target[structs.ScalingTargetNamespace], + policyB1.Target[structs.ScalingTargetJob]) require.NoError(err) // Ensure we see expected policies @@ -8130,11 +8254,14 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) { break } count++ - found = append(found, raw.(*structs.ScalingPolicy).Target) + found = append(found, raw.(*structs.ScalingPolicy).Target[structs.ScalingTargetGroup]) } require.Equal(2, count) sort.Strings(found) - expect = []string{policyB1.Target, policyB2.Target} + expect = []string{ + policyB1.Target[structs.ScalingTargetGroup], + policyB2.Target[structs.ScalingTargetGroup], + } sort.Strings(expect) require.Equal(expect, found) } diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 11c48de04..8e1abc6e2 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -13,6 +13,7 @@ import ( multierror "github.com/hashicorp/go-multierror" lru "github.com/hashicorp/golang-lru" "github.com/hashicorp/nomad/acl" + "github.com/mitchellh/copystructure" "golang.org/x/crypto/blake2b" ) @@ -257,16 +258,22 @@ func CopyScalingPolicy(p *ScalingPolicy) *ScalingPolicy { return nil } + opaquePolicyConfig, err := copystructure.Copy(p.Policy) + if err != nil { + panic(err.Error()) + } + c := ScalingPolicy{ ID: p.ID, - Namespace: p.Namespace, - Target: p.Target, - JobID: p.JobID, - Policy: p.Policy, + Policy: opaquePolicyConfig.(map[string]interface{}), Enabled: p.Enabled, CreateIndex: p.CreateIndex, ModifyIndex: p.ModifyIndex, } + c.Target = make(map[string]string, len(p.Target)) + for k, v := range p.Target { + c.Target[k] = v + } return &c } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 8ec97290e..f0e86d80f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4619,18 +4619,18 @@ type ScalingPolicy struct { // ID is a generated UUID used for looking up the scaling policy ID string - // Namespace is the namespace for the containing job - Namespace string - - // Target is the scaling target; there can be only one policy per scaling target - Target string - - // JobID is the ID of the parent job; there can be multiple policies per job - JobID string + // Target contains information about the target of the scaling policy, like job and group + Target map[string]string // Policy is an opaque description of the scaling policy, passed to the autoscaler Policy map[string]interface{} + // Min is the minimum allowable scaling count for this target + Min int64 + + // Max is the maximum allowable scaling count for this target + Max int64 + // Enabled indicates whether this policy has been enabled/disabled Enabled bool @@ -4638,20 +4638,42 @@ type ScalingPolicy struct { ModifyIndex uint64 } +const ( + ScalingTargetNamespace = "Namespace" + ScalingTargetJob = "Job" + ScalingTargetGroup = "Group" +) + +// Diff indicates whether the specification for a given scaling policy has changed +func (p *ScalingPolicy) Diff(p2 *ScalingPolicy) bool { + copy := *p2 + copy.ID = p.ID + copy.CreateIndex = p.CreateIndex + copy.ModifyIndex = p.ModifyIndex + return !reflect.DeepEqual(*p, copy) +} + func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy { - p.Target = fmt.Sprintf("/v1/job/%s/%s/scale", job.ID, tg.Name) + p.Target = map[string]string{ + ScalingTargetNamespace: job.Namespace, + ScalingTargetJob: job.ID, + ScalingTargetGroup: tg.Name, + } return p } func (p *ScalingPolicy) Stub() *ScalingPolicyListStub { - return &ScalingPolicyListStub{ + stub := &ScalingPolicyListStub{ ID: p.ID, - JobID: p.JobID, - Target: p.Target, + Target: make(map[string]string), Enabled: p.Enabled, CreateIndex: p.CreateIndex, ModifyIndex: p.ModifyIndex, } + for k, v := range p.Target { + stub.Target[k] = v + } + return stub } // GetScalingPolicies returns a slice of all scaling scaling policies for this job @@ -4671,9 +4693,8 @@ func (j *Job) GetScalingPolicies() []*ScalingPolicy { // for the scaling policy list type ScalingPolicyListStub struct { ID string - JobID string - Target string Enabled bool + Target map[string]string CreateIndex uint64 ModifyIndex uint64 }