From 10ffa7eee5597bd693eb51d8a321387d0370a81e Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Wed, 1 Apr 2020 16:14:54 +0000 Subject: [PATCH] scaling api: more testing around the scaling events api --- api/jobs.go | 6 +-- api/jobs_test.go | 92 +++++++++++++++++++++++++++++++++++--- nomad/fsm.go | 4 +- nomad/job_endpoint.go | 11 ++--- nomad/job_endpoint_test.go | 16 ++++--- nomad/state/schema.go | 1 + 6 files changed, 108 insertions(+), 22 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index 8668cc2bd..7514c608e 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -155,9 +155,9 @@ func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) { // Scale is used to retrieve information about a particular // job given its unique ID. -func (j *Jobs) Scale(jobID, group string, count *int, - message string, error bool, meta map[string]interface{}, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, - error) { +func (j *Jobs) Scale(jobID, group string, count *int, message string, error bool, meta map[string]interface{}, + q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) { + var count64 *int64 if count != nil { count64 = int64ToPtr(int64(*count)) diff --git a/api/jobs_test.go b/api/jobs_test.go index afcab5812..9d4cee430 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -1779,7 +1779,8 @@ func TestJobs_ScaleAction(t *testing.T) { groupCount := *job.TaskGroups[0].Count // Trying to scale against a target before it exists returns an error - _, _, err := jobs.Scale(id, "missing", intToPtr(groupCount+1), "this won't work", false, nil, nil) + _, _, err := jobs.Scale(id, "missing", intToPtr(groupCount+1), "this won't work", + false, nil, nil) require.Error(err) require.Contains(err.Error(), "not found") @@ -1791,7 +1792,10 @@ func TestJobs_ScaleAction(t *testing.T) { // Perform scaling action newCount := groupCount + 1 scalingResp, wm, err := jobs.Scale(id, groupName, - intToPtr(newCount), "need more instances", false, nil, nil) + intToPtr(newCount), "need more instances", false, + map[string]interface{}{ + "meta": "data", + }, nil) require.NoError(err) require.NotNil(scalingResp) @@ -1805,7 +1809,71 @@ func TestJobs_ScaleAction(t *testing.T) { require.NoError(err) require.Equal(*resp.TaskGroups[0].Count, newCount) - // TODO: check that scaling event was persisted + // Check for the scaling event + status, _, err := jobs.ScaleStatus(*job.ID, nil) + require.NoError(err) + require.Len(status.TaskGroups[groupName].Events, 1) + scalingEvent := status.TaskGroups[groupName].Events[0] + require.False(scalingEvent.Error) + require.Equal("need more instances", scalingEvent.Message) + require.Equal(map[string]interface{}{ + "meta": "data", + }, scalingEvent.Meta) + require.Greater(scalingEvent.Time, uint64(0)) + require.NotNil(scalingEvent.EvalID) + require.Equal(scalingResp.EvalID, *scalingEvent.EvalID) +} + +func TestJobs_ScaleAction_Error(t *testing.T) { + t.Parallel() + require := require.New(t) + + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + id := "job-id/with\\troublesome:characters\n?&字\000" + job := testJobWithScalingPolicy() + job.ID = &id + groupName := *job.TaskGroups[0].Name + prevCount := *job.TaskGroups[0].Count + + // Register the job + regResp, wm, err := jobs.Register(job, nil) + require.NoError(err) + assertWriteMeta(t, wm) + + // Perform scaling action + scaleResp, wm, err := jobs.Scale(id, groupName, nil, "something bad happened", true, + map[string]interface{}{ + "meta": "data", + }, nil) + + require.NoError(err) + require.NotNil(scaleResp) + require.Empty(scaleResp.EvalID) + require.Empty(scaleResp.EvalCreateIndex) + assertWriteMeta(t, wm) + + // Query the job again + resp, _, err := jobs.Info(*job.ID, nil) + require.NoError(err) + require.Equal(*resp.TaskGroups[0].Count, prevCount) + require.Equal(regResp.JobModifyIndex, scaleResp.JobModifyIndex) + require.Empty(scaleResp.EvalCreateIndex) + require.Empty(scaleResp.EvalID) + + status, _, err := jobs.ScaleStatus(*job.ID, nil) + require.NoError(err) + require.Len(status.TaskGroups[groupName].Events, 1) + errEvent := status.TaskGroups[groupName].Events[0] + require.True(errEvent.Error) + require.Equal("something bad happened", errEvent.Message) + require.Equal(map[string]interface{}{ + "meta": "data", + }, errEvent.Meta) + require.Greater(errEvent.Time, uint64(0)) + require.Nil(errEvent.EvalID) } func TestJobs_ScaleAction_Noop(t *testing.T) { @@ -1828,8 +1896,10 @@ func TestJobs_ScaleAction_Noop(t *testing.T) { assertWriteMeta(t, wm) // Perform scaling action - scaleResp, wm, err := jobs.Scale(id, groupName, - nil, "no count, just informative", false, nil, nil) + scaleResp, wm, err := jobs.Scale(id, groupName, nil, "no count, just informative", + false, map[string]interface{}{ + "meta": "data", + }, nil) require.NoError(err) require.NotNil(scaleResp) @@ -1847,7 +1917,15 @@ func TestJobs_ScaleAction_Noop(t *testing.T) { status, _, err := jobs.ScaleStatus(*job.ID, nil) require.NoError(err) - require.NotEmpty(status.TaskGroups[groupName].Events) + require.Len(status.TaskGroups[groupName].Events, 1) + noopEvent := status.TaskGroups[groupName].Events[0] + require.False(noopEvent.Error) + require.Equal("no count, just informative", noopEvent.Message) + require.Equal(map[string]interface{}{ + "meta": "data", + }, noopEvent.Meta) + require.Greater(noopEvent.Time, uint64(0)) + require.Nil(noopEvent.EvalID) } // TestJobs_ScaleStatus tests the /scale status endpoint for task group count @@ -1867,7 +1945,7 @@ func TestJobs_ScaleStatus(t *testing.T) { require.Contains(err.Error(), "not found") // Register the job - job := testJobWithScalingPolicy() + job := testJob() job.ID = &id groupName := *job.TaskGroups[0].Name groupCount := *job.TaskGroups[0].Count diff --git a/nomad/fsm.go b/nomad/fsm.go index f2a9ee7e9..97384a4a7 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -2171,14 +2171,14 @@ func (s *nomadSnapshot) persistScalingPolicies(sink raft.SnapshotSink, func (s *nomadSnapshot) persistScalingEvents(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the scaling events ws := memdb.NewWatchSet() - jobScalingEvents, err := s.snap.ScalingEvents(ws) + iter, err := s.snap.ScalingEvents(ws) if err != nil { return err } for { // Get the next item - raw := jobScalingEvents.Next() + raw := iter.Next() if raw == nil { break } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 169a98fa8..2761549f4 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -908,6 +908,7 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes now := time.Now().UTC().UnixNano() // If the count is present, commit the job update via Raft + // for now, we'll do this even if count didn't change if args.Count != nil { truncCount := int(*args.Count) if int64(truncCount) != *args.Count { @@ -933,10 +934,10 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes reply.JobModifyIndex = job.ModifyIndex } - // If the job is periodic or parameterized, we don't create an eval. - var eval *structs.Evaluation + // only create an eval for non-dispatch jobs and if the count was provided + // for now, we'll do this even if count didn't change if !job.IsPeriodic() && !job.IsParameterized() && args.Count != nil { - eval = &structs.Evaluation{ + eval := &structs.Evaluation{ ID: uuid.Generate(), Namespace: args.RequestNamespace(), Priority: structs.JobDefaultPriority, @@ -979,8 +980,8 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes Meta: args.Meta, }, } - if eval != nil { - event.ScalingEvent.EvalID = &eval.ID + if reply.EvalID != "" { + event.ScalingEvent.EvalID = &reply.EvalID } _, eventIndex, err := j.srv.raftApply(structs.ScalingEventRegisterRequestType, event) if err != nil { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 4dfb6204a..4e82bbeb9 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -5415,6 +5415,7 @@ func TestJobEndpoint_Scale(t *testing.T) { err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) require.NoError(err) require.NotEmpty(resp.EvalID) + require.Greater(resp.EvalCreateIndex, resp.JobModifyIndex) } func TestJobEndpoint_Scale_ACL(t *testing.T) { @@ -5564,13 +5565,14 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) { state := s1.fsm.State() job := mock.Job() + groupName := job.TaskGroups[0].Name err := state.UpsertJob(1000, job) require.Nil(err) scale := &structs.JobScaleRequest{ JobID: job.ID, Target: map[string]string{ - structs.ScalingTargetGroup: job.TaskGroups[0].Name, + structs.ScalingTargetGroup: groupName, }, Count: nil, // no count => no eval Message: "something informative", @@ -5593,11 +5595,15 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) { require.Empty(resp.EvalID) require.Empty(resp.EvalCreateIndex) - events, err := state.ScalingEventsByJob(nil, job.Namespace, job.ID) + jobEvents, err := state.ScalingEventsByJob(nil, job.Namespace, job.ID) require.NoError(err) - require.NotNil(events) - require.Contains(events, job.TaskGroups[0].Name) - require.NotEmpty(events[job.TaskGroups[0].Name]) + require.NotNil(jobEvents) + require.Len(jobEvents, 1) + require.Contains(jobEvents, groupName) + groupEvents := jobEvents[groupName] + require.Len(groupEvents, 1) + event := groupEvents[0] + require.Nil(event.EvalID) } func TestJobEndpoint_GetScaleStatus(t *testing.T) { diff --git a/nomad/state/schema.go b/nomad/state/schema.go index ec9893f48..3f926188e 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -868,6 +868,7 @@ func scalingEventTableSchema() *memdb.TableSchema { }, }, + // TODO: need to figure out whether we want to index these or the jobs or ... // "error": { // Name: "error", // AllowMissing: false,