finished refactoring state store, schema, etc

This commit is contained in:
Chris Baker
2020-02-21 21:23:30 +00:00
parent c095f41111
commit 3b4a1aecd9
11 changed files with 383 additions and 121 deletions

View File

@@ -39,6 +39,7 @@ type ScalingRequest struct {
Value interface{}
Reason string
Error string
Meta map[string]interface{}
WriteRequest
// this is effectively a job update, so we need the ability to override policy.
PolicyOverride bool
@@ -48,8 +49,9 @@ type ScalingRequest struct {
type ScalingPolicy struct {
ID string
Namespace string
Target string
JobID string
Target map[string]string
Min int64
Max int64
Policy map[string]interface{}
Enabled *bool
CreateIndex uint64
@@ -60,8 +62,8 @@ type ScalingPolicy struct {
// for the scaling policy list
type ScalingPolicyListStub struct {
ID string
JobID string
Target string
Enabled bool
Target map[string]string
CreateIndex uint64
ModifyIndex uint64
}

View File

@@ -1,13 +1,12 @@
package api
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
)
func TestScalingPolicies_List(t *testing.T) {
func TestScalingPolicies_ListPolicies(t *testing.T) {
t.Parallel()
require := require.New(t)
@@ -36,12 +35,18 @@ func TestScalingPolicies_List(t *testing.T) {
policy := policies[0]
// Check that the scaling policy references the right job
require.Equalf(policy.JobID, *job.ID, "expected JobID=%s, got: %s", *job.ID, policy.JobID)
// Check that the scaling policy references the right namespace
namespace := DefaultNamespace
if job.Namespace != nil && *job.Namespace != "" {
namespace = *job.Namespace
}
require.Equal(policy.Target["Namespace"], namespace)
// Check that the scaling policy references the right target
expectedTarget := fmt.Sprintf("/v1/job/%s/%s/scale", *job.ID, *job.TaskGroups[0].Name)
require.Equalf(expectedTarget, policy.Target, "expected Target=%s, got: %s", expectedTarget, policy.Target)
// Check that the scaling policy references the right job
require.Equal(policy.Target["Job"], *job.ID)
// Check that the scaling policy references the right group
require.Equal(policy.Target["Group"], *job.TaskGroups[0].Name)
}
func TestScalingPolicies_GetPolicy(t *testing.T) {
@@ -80,7 +85,7 @@ func TestScalingPolicies_GetPolicy(t *testing.T) {
policies, _, err := scaling.ListPolicies(nil)
require.NoError(err)
for _, p := range policies {
if p.JobID == *job.ID {
if p.Target["Job"] == *job.ID {
policyID = p.ID
break
}
@@ -94,8 +99,16 @@ func TestScalingPolicies_GetPolicy(t *testing.T) {
require.NoError(err)
// Check that the scaling policy fields match
expectedTarget := fmt.Sprintf("/v1/job/%s/%s/scale", *job.ID, *job.TaskGroups[0].Name)
require.Equalf(expectedTarget, resp.Target, "expected Target=%s, got: %s", expectedTarget, policy.Target)
namespace := DefaultNamespace
if job.Namespace != nil && *job.Namespace != "" {
namespace = *job.Namespace
}
expectedTarget := map[string]string{
"Namespace": namespace,
"Job": *job.ID,
"Group": *job.TaskGroups[0].Name,
}
require.Equal(expectedTarget, resp.Target)
require.Equal(policy.Policy, resp.Policy)
require.Equal(policy.Enabled, resp.Enabled)
}

View File

@@ -824,7 +824,7 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
}
if taskGroup.Scaling != nil {
tg.Scaling = ApiScalingPolicyToStructs(job, taskGroup.Scaling).TargetTaskGroup(job, tg)
tg.Scaling = ApiScalingPolicyToStructs(taskGroup.Scaling).TargetTaskGroup(job, tg)
}
tg.EphemeralDisk = &structs.EphemeralDisk{

View File

@@ -75,11 +75,12 @@ func (s *HTTPServer) scalingPolicyQuery(resp http.ResponseWriter, req *http.Requ
return out.Policy, nil
}
func ApiScalingPolicyToStructs(job *structs.Job, a1 *api.ScalingPolicy) *structs.ScalingPolicy {
func ApiScalingPolicyToStructs(ap *api.ScalingPolicy) *structs.ScalingPolicy {
return &structs.ScalingPolicy{
Namespace: job.Namespace,
JobID: job.ID,
Enabled: *a1.Enabled,
Policy: a1.Policy,
Enabled: *ap.Enabled,
Min: ap.Min,
Max: ap.Max,
Policy: ap.Policy,
Target: map[string]string{},
}
}

View File

@@ -14,13 +14,14 @@ import (
"github.com/golang/snappy"
"github.com/hashicorp/consul/lib"
"github.com/pkg/errors"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
"github.com/pkg/errors"
)
const (
@@ -189,6 +190,11 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}
// Ensure that all scaling policies have an appropriate ID
if err := propagateScalingPolicyIDs(existingJob, args.Job); err != nil {
return err
}
// Ensure that the job has permissions for the requested Vault tokens
policies := args.Job.VaultPolicies()
if len(policies) != 0 {
@@ -332,6 +338,31 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return nil
}
// propagateScalingPolicyIDs propagates scaling policy IDs from existing job
// to updated job, or generates random IDs in new job
func propagateScalingPolicyIDs(old, new *structs.Job) error {
oldIDs := make(map[string]string)
if old != nil {
// jobs currently only have scaling policies on task groups, so we can
// find correspondences using task group names
for _, p := range old.GetScalingPolicies() {
oldIDs[p.Target[structs.ScalingTargetGroup]] = p.ID
}
}
// ignore any existing ID in the policy, they should be empty
for _, p := range new.GetScalingPolicies() {
if id, ok := oldIDs[p.Target[structs.ScalingTargetGroup]]; ok {
p.ID = id
} else {
p.ID = uuid.Generate()
}
}
return nil
}
// getSignalConstraint builds a suitable constraint based on the required
// signals
func getSignalConstraint(signals []string) *structs.Constraint {

View File

@@ -1256,10 +1256,12 @@ func ACLManagementToken() *structs.ACLToken {
func ScalingPolicy() *structs.ScalingPolicy {
return &structs.ScalingPolicy{
ID: uuid.Generate(),
Namespace: structs.DefaultNamespace,
Target: uuid.Generate(),
JobID: uuid.Generate(),
ID: uuid.Generate(),
Target: map[string]string{
structs.ScalingTargetNamespace: structs.DefaultNamespace,
structs.ScalingTargetJob: uuid.Generate(),
structs.ScalingTargetGroup: uuid.Generate(),
},
Policy: map[string]interface{}{
"a": "b",
},
@@ -1272,11 +1274,9 @@ func ScalingPolicy() *structs.ScalingPolicy {
func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) {
job := Job()
policy := &structs.ScalingPolicy{
ID: uuid.Generate(),
Namespace: job.Namespace,
JobID: job.ID,
Policy: map[string]interface{}{},
Enabled: true,
ID: uuid.Generate(),
Policy: map[string]interface{}{},
Enabled: true,
}
policy.TargetTaskGroup(job, job.TaskGroups[0])
job.TaskGroups[0].Scaling = policy

View File

@@ -5,6 +5,7 @@ import (
"sync"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -730,6 +731,63 @@ func csiPluginTableSchema() *memdb.TableSchema {
}
}
// StringFieldIndex is used to extract a field from an object
// using reflection and builds an index on that field.
type ScalingPolicyTargetFieldIndex struct {
Field string
}
// FromObject is used to extract an index value from an
// object or to indicate that the index value is missing.
func (s *ScalingPolicyTargetFieldIndex) FromObject(obj interface{}) (bool, []byte, error) {
policy, ok := obj.(*structs.ScalingPolicy)
if !ok {
return false, nil, fmt.Errorf("object %#v is not a ScalingPolicy", obj)
}
if policy.Target == nil {
return false, nil, nil
}
val, ok := policy.Target[s.Field]
if !ok {
return false, nil, nil
}
// Add the null character as a terminator
val += "\x00"
return true, []byte(val), nil
}
// FromArgs is used to build an exact index lookup based on arguments
func (s *ScalingPolicyTargetFieldIndex) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) != 1 {
return nil, fmt.Errorf("must provide only a single argument")
}
arg, ok := args[0].(string)
if !ok {
return nil, fmt.Errorf("argument must be a string: %#v", args[0])
}
// Add the null character as a terminator
arg += "\x00"
return []byte(arg), nil
}
// PrefixFromArgs returns a prefix that should be used for scanning based on the arguments
func (s *ScalingPolicyTargetFieldIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
val, err := s.FromArgs(args...)
if err != nil {
return nil, err
}
// Strip the null terminator, the rest is a prefix
n := len(val)
if n > 0 {
return val[:n-1], nil
}
return val, nil
}
// scalingPolicyTableSchema returns the MemDB schema for the policy table.
// This table is used to store the policies which are referenced by tokens
func scalingPolicyTableSchema() *memdb.TableSchema {
@@ -742,49 +800,32 @@ func scalingPolicyTableSchema() *memdb.TableSchema {
AllowMissing: false,
Unique: true,
// Use a compound index so the tuple of (Namespace, Target) is
// uniquely identifying
// UUID is uniquely identifying
Indexer: &memdb.StringFieldIndex{
Field: "ID",
},
},
// Target index is used for looking up by target or listing policies in namespace
// A target can only have a single scaling policy, so this is guaranteed to be unique.
// Target index is used for listing by namespace or job, or looking up a specific target.
// A given task group can have only a single scaling policies, so this is guaranteed to be unique.
"target": {
Name: "target",
AllowMissing: false,
Unique: true,
// Use a compound index so the tuple of (Namespace, Target) is
// Use a compound index so the tuple of (Namespace, Job, Group) is
// uniquely identifying
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
&ScalingPolicyTargetFieldIndex{
Field: "Namespace",
},
&memdb.StringFieldIndex{
Field: "Target",
},
},
},
},
// Job index is used to lookup scaling policies by job
"job": {
Name: "job",
AllowMissing: false,
Unique: false,
// Use a compound index so the tuple of (Namespace, JobID) is
// uniquely identifying
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
&ScalingPolicyTargetFieldIndex{
Field: "Job",
},
&memdb.StringFieldIndex{
Field: "JobID",
&ScalingPolicyTargetFieldIndex{
Field: "Group",
},
},
},

View File

@@ -10,10 +10,10 @@ import (
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/pkg/errors"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
// Txn is a transaction against a state store.
@@ -1284,12 +1284,15 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
return fmt.Errorf("index update failed: %v", err)
}
// Delete the job scaling policies
if _, err := txn.DeleteAll("scaling_policy", "job", namespace, jobID); err != nil {
// Delete any job scaling policies
numDeletedScalingPolicies, err := txn.DeleteAll("scaling_policy", "target_prefix", namespace, jobID)
if err != nil {
return fmt.Errorf("deleting job scaling policies failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
if numDeletedScalingPolicies > 0 {
if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}
return nil
@@ -4002,7 +4005,7 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx
scalingPolicies := job.GetScalingPolicies()
newTargets := map[string]struct{}{}
for _, p := range scalingPolicies {
newTargets[p.Target] = struct{}{}
newTargets[p.Target[structs.ScalingTargetGroup]] = struct{}{}
}
// find existing policies that need to be deleted
deletedPolicies := []string{}
@@ -4016,7 +4019,7 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx
break
}
oldPolicy := raw.(*structs.ScalingPolicy)
if _, ok := newTargets[oldPolicy.Target]; !ok {
if _, ok := newTargets[oldPolicy.Target[structs.ScalingTargetGroup]]; !ok {
deletedPolicies = append(deletedPolicies, oldPolicy.ID)
}
}
@@ -4720,35 +4723,45 @@ func (s *StateStore) UpsertScalingPolicies(index uint64, scalingPolicies []*stru
func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*structs.ScalingPolicy,
txn *memdb.Txn) error {
for _, scalingPolicy := range scalingPolicies {
hadUpdates := false
for _, policy := range scalingPolicies {
// Check if the scaling policy already exists
existing, err := txn.First("scaling_policy", "target", scalingPolicy.Namespace, scalingPolicy.Target)
existing, err := txn.First("scaling_policy", "target",
policy.Target[structs.ScalingTargetNamespace],
policy.Target[structs.ScalingTargetJob],
policy.Target[structs.ScalingTargetGroup])
if err != nil {
return fmt.Errorf("scaling policy lookup failed: %v", err)
}
// Setup the indexes correctly
if existing != nil {
scalingPolicy.CreateIndex = existing.(*structs.ScalingPolicy).CreateIndex
scalingPolicy.ModifyIndex = index
scalingPolicy.ID = existing.(*structs.ScalingPolicy).ID
} else {
scalingPolicy.CreateIndex = index
scalingPolicy.ModifyIndex = index
if scalingPolicy.ID == "" {
scalingPolicy.ID = uuid.Generate()
p := existing.(*structs.ScalingPolicy)
if !p.Diff(policy) {
continue
}
policy.ID = p.ID
policy.CreateIndex = p.CreateIndex
policy.ModifyIndex = index
} else {
// policy.ID must have been set already in Job.Register before log apply
policy.CreateIndex = index
policy.ModifyIndex = index
}
// Insert the scaling policy
if err := txn.Insert("scaling_policy", scalingPolicy); err != nil {
hadUpdates = true
if err := txn.Insert("scaling_policy", policy); err != nil {
return err
}
}
// Update the indexes table for scaling policy
if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
if hadUpdates {
if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}
return nil
@@ -4815,7 +4828,7 @@ func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID st
func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID string,
txn *memdb.Txn) (memdb.ResultIterator, error) {
iter, err := txn.Get("scaling_policy", "job", namespace, jobID)
iter, err := txn.Get("scaling_policy", "target_prefix", namespace, jobID)
if err != nil {
return nil, err
}
@@ -4840,10 +4853,16 @@ func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.S
return nil, nil
}
func (s *StateStore) ScalingPolicyByTarget(ws memdb.WatchSet, namespace, target string) (*structs.ScalingPolicy, error) {
func (s *StateStore) ScalingPolicyByTarget(ws memdb.WatchSet, target map[string]string) (*structs.ScalingPolicy,
error) {
txn := s.db.Txn(false)
watchCh, existing, err := txn.FirstWatch("scaling_policy", "target", namespace, target)
// currently, only scaling policy type is against a task group
namespace := target[structs.ScalingTargetNamespace]
job := target[structs.ScalingTargetJob]
group := target[structs.ScalingTargetGroup]
watchCh, existing, err := txn.FirstWatch("scaling_policy", "target", namespace, job, group)
if err != nil {
return nil, fmt.Errorf("scaling_policy lookup failed: %v", err)
}

View File

@@ -10,12 +10,13 @@ import (
"time"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func testStateStore(t *testing.T) *StateStore {
@@ -7928,10 +7929,10 @@ func TestStateStore_UpsertScalingPolicy(t *testing.T) {
policy2 := mock.ScalingPolicy()
ws := memdb.NewWatchSet()
_, err := state.ScalingPolicyByTarget(ws, policy.Namespace, policy.Target)
_, err := state.ScalingPolicyByTarget(ws, policy.Target)
require.NoError(err)
_, err = state.ScalingPolicyByTarget(ws, policy.Namespace, policy2.Target)
_, err = state.ScalingPolicyByTarget(ws, policy2.Target)
require.NoError(err)
err = state.UpsertScalingPolicies(1000, []*structs.ScalingPolicy{policy, policy2})
@@ -7939,15 +7940,15 @@ func TestStateStore_UpsertScalingPolicy(t *testing.T) {
require.True(watchFired(ws))
ws = memdb.NewWatchSet()
out, err := state.ScalingPolicyByTarget(ws, policy.Namespace, policy.Target)
out, err := state.ScalingPolicyByTarget(ws, policy.Target)
require.NoError(err)
require.Equal(policy, out)
out, err = state.ScalingPolicyByTarget(ws, policy2.Namespace, policy2.Target)
out, err = state.ScalingPolicyByTarget(ws, policy2.Target)
require.NoError(err)
require.Equal(policy2, out)
iter, err := state.ScalingPoliciesByNamespace(ws, policy.Namespace)
iter, err := state.ScalingPoliciesByNamespace(ws, policy.Target[structs.ScalingTargetNamespace])
require.NoError(err)
// Ensure we see both policies
@@ -7977,7 +7978,7 @@ func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) {
// Create a watchset so we can test that upsert fires the watch
ws := memdb.NewWatchSet()
out, err := state.ScalingPolicyByTarget(ws, policy.Namespace, policy.Target)
out, err := state.ScalingPolicyByTarget(ws, policy.Target)
require.NoError(err)
require.Nil(out)
@@ -7987,7 +7988,7 @@ func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) {
require.True(watchFired(ws), "watch did not fire")
ws = memdb.NewWatchSet()
out, err = state.ScalingPolicyByTarget(ws, policy.Namespace, policy.Target)
out, err = state.ScalingPolicyByTarget(ws, policy.Target)
require.NoError(err)
require.NotNil(out)
require.Equal(newIndex, out.CreateIndex)
@@ -7997,6 +7998,94 @@ func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) {
require.Equal(newIndex, index)
}
// Scaling Policy IDs are generated randomly during Job.Register
// Subsequent updates of the job should preserve the ID for the scaling policy
// associated with a given target.
func TestStateStore_UpsertJob_PreserveScalingPolicyIDsAndIndex(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)
job, policy := mock.JobWithScalingPolicy()
var newIndex uint64 = 1000
err := state.UpsertJob(newIndex, job)
require.NoError(err)
ws := memdb.NewWatchSet()
p1, err := state.ScalingPolicyByTarget(ws, policy.Target)
require.NoError(err)
require.NotNil(p1)
require.Equal(newIndex, p1.CreateIndex)
require.Equal(newIndex, p1.ModifyIndex)
index, err := state.Index("scaling_policy")
require.Equal(newIndex, index)
require.NotEmpty(p1.ID)
// update the job
job.Meta["new-meta"] = "new-value"
newIndex += 100
err = state.UpsertJob(newIndex, job)
require.NoError(err)
require.False(watchFired(ws), "watch should not have fired")
p2, err := state.ScalingPolicyByTarget(nil, policy.Target)
require.NoError(err)
require.NotNil(p2)
require.Equal(p1.ID, p2.ID, "ID should not have changed")
require.Equal(p1.CreateIndex, p2.CreateIndex)
require.Equal(p1.ModifyIndex, p2.ModifyIndex)
index, err = state.Index("scaling_policy")
require.Equal(index, p1.CreateIndex, "table index should not have changed")
}
// Updating the scaling policy for a job should update the index table and fire the watch.
// This test is the converse of TestStateStore_UpsertJob_PreserveScalingPolicyIDsAndIndex
func TestStateStore_UpsertJob_UpdateScalingPolicy(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)
job, policy := mock.JobWithScalingPolicy()
var oldIndex uint64 = 1000
require.NoError(state.UpsertJob(oldIndex, job))
ws := memdb.NewWatchSet()
p1, err := state.ScalingPolicyByTarget(ws, policy.Target)
require.NoError(err)
require.NotNil(p1)
require.Equal(oldIndex, p1.CreateIndex)
require.Equal(oldIndex, p1.ModifyIndex)
prevId := p1.ID
index, err := state.Index("scaling_policy")
require.Equal(oldIndex, index)
require.NotEmpty(p1.ID)
// update the job with the updated scaling policy; make sure to use a different object
newPolicy := structs.CopyScalingPolicy(p1)
newPolicy.Policy["new-field"] = "new-value"
job.TaskGroups[0].Scaling = newPolicy
require.NoError(state.UpsertJob(oldIndex+100, job))
require.True(watchFired(ws), "watch should have fired")
p2, err := state.ScalingPolicyByTarget(nil, policy.Target)
require.NoError(err)
require.NotNil(p2)
require.Equal(p2.Policy["new-field"], "new-value")
require.Equal(prevId, p2.ID, "ID should not have changed")
require.Equal(oldIndex, p2.CreateIndex)
require.Greater(p2.ModifyIndex, oldIndex, "ModifyIndex should have advanced")
index, err = state.Index("scaling_policy")
require.Greater(index, oldIndex, "table index should have advanced")
}
func TestStateStore_DeleteScalingPolicies(t *testing.T) {
t.Parallel()
@@ -8012,7 +8101,7 @@ func TestStateStore_DeleteScalingPolicies(t *testing.T) {
// Create a watcher
ws := memdb.NewWatchSet()
_, err = state.ScalingPolicyByTarget(ws, policy.Namespace, policy.Target)
_, err = state.ScalingPolicyByTarget(ws, policy.Target)
require.NoError(err)
// Delete the policy
@@ -8024,17 +8113,17 @@ func TestStateStore_DeleteScalingPolicies(t *testing.T) {
// Ensure we don't get the objects back
ws = memdb.NewWatchSet()
out, err := state.ScalingPolicyByTarget(ws, policy.Namespace, policy.Target)
out, err := state.ScalingPolicyByTarget(ws, policy.Target)
require.NoError(err)
require.Nil(out)
ws = memdb.NewWatchSet()
out, err = state.ScalingPolicyByTarget(ws, policy2.Namespace, policy2.Target)
out, err = state.ScalingPolicyByTarget(ws, policy2.Target)
require.NoError(err)
require.Nil(out)
// Ensure we see both policies
iter, err := state.ScalingPoliciesByNamespace(ws, policy.Namespace)
iter, err := state.ScalingPoliciesByNamespace(ws, policy.Target[structs.ScalingTargetNamespace])
require.NoError(err)
count := 0
for {
@@ -8065,7 +8154,7 @@ func TestStateStore_DeleteJob_ChildScalingPolicies(t *testing.T) {
require.NoError(err)
policy := mock.ScalingPolicy()
policy.JobID = job.ID
policy.Target[structs.ScalingTargetJob] = job.ID
err = state.UpsertScalingPolicies(1001, []*structs.ScalingPolicy{policy})
require.NoError(err)
@@ -8075,13 +8164,44 @@ func TestStateStore_DeleteJob_ChildScalingPolicies(t *testing.T) {
// Ensure the scaling policy was deleted
ws := memdb.NewWatchSet()
out, err := state.ScalingPolicyByTarget(ws, policy.Namespace, policy.Target)
out, err := state.ScalingPolicyByTarget(ws, policy.Target)
require.NoError(err)
require.Nil(out)
index, err := state.Index("scaling_policy")
require.True(index > 1001)
}
// This test ensures that deleting a job that doesn't have any scaling policies
// will not cause the scaling_policy table index to increase, on either job
// registration or deletion.
func TestStateStore_DeleteJob_ScalingPolicyIndexNoop(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)
job := mock.Job()
prevIndex, err := state.Index("scaling_policy")
require.NoError(err)
err = state.UpsertJob(1000, job)
require.NoError(err)
newIndex, err := state.Index("scaling_policy")
require.NoError(err)
require.Equal(prevIndex, newIndex)
// Delete the job
err = state.DeleteJob(1002, job.Namespace, job.ID)
require.NoError(err)
newIndex, err = state.Index("scaling_policy")
require.NoError(err)
require.Equal(prevIndex, newIndex)
}
func TestStateStore_ScalingPoliciesByJob(t *testing.T) {
t.Parallel()
@@ -8091,14 +8211,16 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) {
policyA := mock.ScalingPolicy()
policyB1 := mock.ScalingPolicy()
policyB2 := mock.ScalingPolicy()
policyB1.JobID = policyB2.JobID
policyB1.Target[structs.ScalingTargetJob] = policyB2.Target[structs.ScalingTargetJob]
// Create the policies
var baseIndex uint64 = 1000
err := state.UpsertScalingPolicies(baseIndex, []*structs.ScalingPolicy{policyA, policyB1, policyB2})
require.NoError(err)
iter, err := state.ScalingPoliciesByJob(nil, policyA.Namespace, policyA.JobID)
iter, err := state.ScalingPoliciesByJob(nil,
policyA.Target[structs.ScalingTargetNamespace],
policyA.Target[structs.ScalingTargetJob])
require.NoError(err)
// Ensure we see expected policies
@@ -8110,15 +8232,17 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) {
break
}
count++
found = append(found, raw.(*structs.ScalingPolicy).Target)
found = append(found, raw.(*structs.ScalingPolicy).Target[structs.ScalingTargetGroup])
}
require.Equal(1, count)
sort.Strings(found)
expect := []string{policyA.Target}
expect := []string{policyA.Target[structs.ScalingTargetGroup]}
sort.Strings(expect)
require.Equal(expect, found)
iter, err = state.ScalingPoliciesByJob(nil, policyB1.Namespace, policyB1.JobID)
iter, err = state.ScalingPoliciesByJob(nil,
policyB1.Target[structs.ScalingTargetNamespace],
policyB1.Target[structs.ScalingTargetJob])
require.NoError(err)
// Ensure we see expected policies
@@ -8130,11 +8254,14 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) {
break
}
count++
found = append(found, raw.(*structs.ScalingPolicy).Target)
found = append(found, raw.(*structs.ScalingPolicy).Target[structs.ScalingTargetGroup])
}
require.Equal(2, count)
sort.Strings(found)
expect = []string{policyB1.Target, policyB2.Target}
expect = []string{
policyB1.Target[structs.ScalingTargetGroup],
policyB2.Target[structs.ScalingTargetGroup],
}
sort.Strings(expect)
require.Equal(expect, found)
}

View File

@@ -13,6 +13,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/nomad/acl"
"github.com/mitchellh/copystructure"
"golang.org/x/crypto/blake2b"
)
@@ -257,16 +258,22 @@ func CopyScalingPolicy(p *ScalingPolicy) *ScalingPolicy {
return nil
}
opaquePolicyConfig, err := copystructure.Copy(p.Policy)
if err != nil {
panic(err.Error())
}
c := ScalingPolicy{
ID: p.ID,
Namespace: p.Namespace,
Target: p.Target,
JobID: p.JobID,
Policy: p.Policy,
Policy: opaquePolicyConfig.(map[string]interface{}),
Enabled: p.Enabled,
CreateIndex: p.CreateIndex,
ModifyIndex: p.ModifyIndex,
}
c.Target = make(map[string]string, len(p.Target))
for k, v := range p.Target {
c.Target[k] = v
}
return &c
}

View File

@@ -4619,18 +4619,18 @@ type ScalingPolicy struct {
// ID is a generated UUID used for looking up the scaling policy
ID string
// Namespace is the namespace for the containing job
Namespace string
// Target is the scaling target; there can be only one policy per scaling target
Target string
// JobID is the ID of the parent job; there can be multiple policies per job
JobID string
// Target contains information about the target of the scaling policy, like job and group
Target map[string]string
// Policy is an opaque description of the scaling policy, passed to the autoscaler
Policy map[string]interface{}
// Min is the minimum allowable scaling count for this target
Min int64
// Max is the maximum allowable scaling count for this target
Max int64
// Enabled indicates whether this policy has been enabled/disabled
Enabled bool
@@ -4638,20 +4638,42 @@ type ScalingPolicy struct {
ModifyIndex uint64
}
const (
ScalingTargetNamespace = "Namespace"
ScalingTargetJob = "Job"
ScalingTargetGroup = "Group"
)
// Diff indicates whether the specification for a given scaling policy has changed
func (p *ScalingPolicy) Diff(p2 *ScalingPolicy) bool {
copy := *p2
copy.ID = p.ID
copy.CreateIndex = p.CreateIndex
copy.ModifyIndex = p.ModifyIndex
return !reflect.DeepEqual(*p, copy)
}
func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy {
p.Target = fmt.Sprintf("/v1/job/%s/%s/scale", job.ID, tg.Name)
p.Target = map[string]string{
ScalingTargetNamespace: job.Namespace,
ScalingTargetJob: job.ID,
ScalingTargetGroup: tg.Name,
}
return p
}
func (p *ScalingPolicy) Stub() *ScalingPolicyListStub {
return &ScalingPolicyListStub{
stub := &ScalingPolicyListStub{
ID: p.ID,
JobID: p.JobID,
Target: p.Target,
Target: make(map[string]string),
Enabled: p.Enabled,
CreateIndex: p.CreateIndex,
ModifyIndex: p.ModifyIndex,
}
for k, v := range p.Target {
stub.Target[k] = v
}
return stub
}
// GetScalingPolicies returns a slice of all scaling scaling policies for this job
@@ -4671,9 +4693,8 @@ func (j *Job) GetScalingPolicies() []*ScalingPolicy {
// for the scaling policy list
type ScalingPolicyListStub struct {
ID string
JobID string
Target string
Enabled bool
Target map[string]string
CreateIndex uint64
ModifyIndex uint64
}