scaling api: more testing around the scaling events api

This commit is contained in:
Chris Baker
2020-04-01 16:14:54 +00:00
parent f23695e07a
commit 10ffa7eee5
6 changed files with 108 additions and 22 deletions

View File

@@ -155,9 +155,9 @@ func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) {
// Scale is used to retrieve information about a particular
// job given its unique ID.
func (j *Jobs) Scale(jobID, group string, count *int,
message string, error bool, meta map[string]interface{}, q *WriteOptions) (*JobRegisterResponse, *WriteMeta,
error) {
func (j *Jobs) Scale(jobID, group string, count *int, message string, error bool, meta map[string]interface{},
q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
var count64 *int64
if count != nil {
count64 = int64ToPtr(int64(*count))

View File

@@ -1779,7 +1779,8 @@ func TestJobs_ScaleAction(t *testing.T) {
groupCount := *job.TaskGroups[0].Count
// Trying to scale against a target before it exists returns an error
_, _, err := jobs.Scale(id, "missing", intToPtr(groupCount+1), "this won't work", false, nil, nil)
_, _, err := jobs.Scale(id, "missing", intToPtr(groupCount+1), "this won't work",
false, nil, nil)
require.Error(err)
require.Contains(err.Error(), "not found")
@@ -1791,7 +1792,10 @@ func TestJobs_ScaleAction(t *testing.T) {
// Perform scaling action
newCount := groupCount + 1
scalingResp, wm, err := jobs.Scale(id, groupName,
intToPtr(newCount), "need more instances", false, nil, nil)
intToPtr(newCount), "need more instances", false,
map[string]interface{}{
"meta": "data",
}, nil)
require.NoError(err)
require.NotNil(scalingResp)
@@ -1805,7 +1809,71 @@ func TestJobs_ScaleAction(t *testing.T) {
require.NoError(err)
require.Equal(*resp.TaskGroups[0].Count, newCount)
// TODO: check that scaling event was persisted
// Check for the scaling event
status, _, err := jobs.ScaleStatus(*job.ID, nil)
require.NoError(err)
require.Len(status.TaskGroups[groupName].Events, 1)
scalingEvent := status.TaskGroups[groupName].Events[0]
require.False(scalingEvent.Error)
require.Equal("need more instances", scalingEvent.Message)
require.Equal(map[string]interface{}{
"meta": "data",
}, scalingEvent.Meta)
require.Greater(scalingEvent.Time, uint64(0))
require.NotNil(scalingEvent.EvalID)
require.Equal(scalingResp.EvalID, *scalingEvent.EvalID)
}
func TestJobs_ScaleAction_Error(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()
id := "job-id/with\\troublesome:characters\n?&字\000"
job := testJobWithScalingPolicy()
job.ID = &id
groupName := *job.TaskGroups[0].Name
prevCount := *job.TaskGroups[0].Count
// Register the job
regResp, wm, err := jobs.Register(job, nil)
require.NoError(err)
assertWriteMeta(t, wm)
// Perform scaling action
scaleResp, wm, err := jobs.Scale(id, groupName, nil, "something bad happened", true,
map[string]interface{}{
"meta": "data",
}, nil)
require.NoError(err)
require.NotNil(scaleResp)
require.Empty(scaleResp.EvalID)
require.Empty(scaleResp.EvalCreateIndex)
assertWriteMeta(t, wm)
// Query the job again
resp, _, err := jobs.Info(*job.ID, nil)
require.NoError(err)
require.Equal(*resp.TaskGroups[0].Count, prevCount)
require.Equal(regResp.JobModifyIndex, scaleResp.JobModifyIndex)
require.Empty(scaleResp.EvalCreateIndex)
require.Empty(scaleResp.EvalID)
status, _, err := jobs.ScaleStatus(*job.ID, nil)
require.NoError(err)
require.Len(status.TaskGroups[groupName].Events, 1)
errEvent := status.TaskGroups[groupName].Events[0]
require.True(errEvent.Error)
require.Equal("something bad happened", errEvent.Message)
require.Equal(map[string]interface{}{
"meta": "data",
}, errEvent.Meta)
require.Greater(errEvent.Time, uint64(0))
require.Nil(errEvent.EvalID)
}
func TestJobs_ScaleAction_Noop(t *testing.T) {
@@ -1828,8 +1896,10 @@ func TestJobs_ScaleAction_Noop(t *testing.T) {
assertWriteMeta(t, wm)
// Perform scaling action
scaleResp, wm, err := jobs.Scale(id, groupName,
nil, "no count, just informative", false, nil, nil)
scaleResp, wm, err := jobs.Scale(id, groupName, nil, "no count, just informative",
false, map[string]interface{}{
"meta": "data",
}, nil)
require.NoError(err)
require.NotNil(scaleResp)
@@ -1847,7 +1917,15 @@ func TestJobs_ScaleAction_Noop(t *testing.T) {
status, _, err := jobs.ScaleStatus(*job.ID, nil)
require.NoError(err)
require.NotEmpty(status.TaskGroups[groupName].Events)
require.Len(status.TaskGroups[groupName].Events, 1)
noopEvent := status.TaskGroups[groupName].Events[0]
require.False(noopEvent.Error)
require.Equal("no count, just informative", noopEvent.Message)
require.Equal(map[string]interface{}{
"meta": "data",
}, noopEvent.Meta)
require.Greater(noopEvent.Time, uint64(0))
require.Nil(noopEvent.EvalID)
}
// TestJobs_ScaleStatus tests the /scale status endpoint for task group count
@@ -1867,7 +1945,7 @@ func TestJobs_ScaleStatus(t *testing.T) {
require.Contains(err.Error(), "not found")
// Register the job
job := testJobWithScalingPolicy()
job := testJob()
job.ID = &id
groupName := *job.TaskGroups[0].Name
groupCount := *job.TaskGroups[0].Count

View File

@@ -2171,14 +2171,14 @@ func (s *nomadSnapshot) persistScalingPolicies(sink raft.SnapshotSink,
func (s *nomadSnapshot) persistScalingEvents(sink raft.SnapshotSink, encoder *codec.Encoder) error {
// Get all the scaling events
ws := memdb.NewWatchSet()
jobScalingEvents, err := s.snap.ScalingEvents(ws)
iter, err := s.snap.ScalingEvents(ws)
if err != nil {
return err
}
for {
// Get the next item
raw := jobScalingEvents.Next()
raw := iter.Next()
if raw == nil {
break
}

View File

@@ -908,6 +908,7 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
now := time.Now().UTC().UnixNano()
// If the count is present, commit the job update via Raft
// for now, we'll do this even if count didn't change
if args.Count != nil {
truncCount := int(*args.Count)
if int64(truncCount) != *args.Count {
@@ -933,10 +934,10 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
reply.JobModifyIndex = job.ModifyIndex
}
// If the job is periodic or parameterized, we don't create an eval.
var eval *structs.Evaluation
// only create an eval for non-dispatch jobs and if the count was provided
// for now, we'll do this even if count didn't change
if !job.IsPeriodic() && !job.IsParameterized() && args.Count != nil {
eval = &structs.Evaluation{
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
@@ -979,8 +980,8 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
Meta: args.Meta,
},
}
if eval != nil {
event.ScalingEvent.EvalID = &eval.ID
if reply.EvalID != "" {
event.ScalingEvent.EvalID = &reply.EvalID
}
_, eventIndex, err := j.srv.raftApply(structs.ScalingEventRegisterRequestType, event)
if err != nil {

View File

@@ -5415,6 +5415,7 @@ func TestJobEndpoint_Scale(t *testing.T) {
err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp)
require.NoError(err)
require.NotEmpty(resp.EvalID)
require.Greater(resp.EvalCreateIndex, resp.JobModifyIndex)
}
func TestJobEndpoint_Scale_ACL(t *testing.T) {
@@ -5564,13 +5565,14 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) {
state := s1.fsm.State()
job := mock.Job()
groupName := job.TaskGroups[0].Name
err := state.UpsertJob(1000, job)
require.Nil(err)
scale := &structs.JobScaleRequest{
JobID: job.ID,
Target: map[string]string{
structs.ScalingTargetGroup: job.TaskGroups[0].Name,
structs.ScalingTargetGroup: groupName,
},
Count: nil, // no count => no eval
Message: "something informative",
@@ -5593,11 +5595,15 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) {
require.Empty(resp.EvalID)
require.Empty(resp.EvalCreateIndex)
events, err := state.ScalingEventsByJob(nil, job.Namespace, job.ID)
jobEvents, err := state.ScalingEventsByJob(nil, job.Namespace, job.ID)
require.NoError(err)
require.NotNil(events)
require.Contains(events, job.TaskGroups[0].Name)
require.NotEmpty(events[job.TaskGroups[0].Name])
require.NotNil(jobEvents)
require.Len(jobEvents, 1)
require.Contains(jobEvents, groupName)
groupEvents := jobEvents[groupName]
require.Len(groupEvents, 1)
event := groupEvents[0]
require.Nil(event.EvalID)
}
func TestJobEndpoint_GetScaleStatus(t *testing.T) {

View File

@@ -868,6 +868,7 @@ func scalingEventTableSchema() *memdb.TableSchema {
},
},
// TODO: need to figure out whether we want to index these or the jobs or ...
// "error": {
// Name: "error",
// AllowMissing: false,