diff --git a/api/jobs.go b/api/jobs.go index 00d2431d3..a205fba10 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -169,6 +169,18 @@ func (j *Jobs) Scale(jobID, group string, value interface{}, reason string, q *W return &resp, qm, nil } +// ScaleStatus is used to retrieve information about a particular +// job given its unique ID. +func (j *Jobs) ScaleStatus(jobID, group string, q *QueryOptions) (*ScaleStatusResponse, *QueryMeta, error) { + var resp ScaleStatusResponse + qm, err := j.client.query(fmt.Sprintf("/v1/job/%s/%s/scale", url.PathEscape(jobID), url.PathEscape(group)), + &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, qm, nil +} + // Versions is used to retrieve all versions of a particular job given its // unique ID. func (j *Jobs) Versions(jobID string, diffs bool, q *QueryOptions) ([]*Job, []*JobDiff, *QueryMeta, error) { diff --git a/api/jobs_test.go b/api/jobs_test.go index 160a8a121..d9bcbec27 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -1533,3 +1533,89 @@ func TestJobs_AddSpread(t *testing.T) { t.Fatalf("expect: %#v, got: %#v", expect, job.Spreads) } } + +// TestJobs_ScaleAction tests the scale target for task group count +func TestJobs_ScaleAction(t *testing.T) { + t.Parallel() + + require := require.New(t) + + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + id := "job-id/with\\troublesome:characters\n?&字\000" + job := testJobWithScalingPolicy() + job.ID = &id + groupName := *job.TaskGroups[0].Name + groupCount := *job.TaskGroups[0].Count + + // Trying to scale against a target before it exists returns an error + _, _, err := jobs.Scale(id, "missing", + groupCount+1, "this won't work", nil) + require.Error(err) + require.Contains(err.Error(), "not found") + + // Register the job + _, wm, err := jobs.Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm) + + // Perform a scaling action with bad group name, verify error + _, _, err = jobs.Scale(id, "incorrect-group-name", + groupCount+1, "this won't work", nil) + require.Error(err) + require.Contains(err.Error(), "does not exist") + + // Query the scaling endpoint and verify success + resp1, wm, err := jobs.Scale(id, groupName, + groupCount+1, "need more instances", nil) + + require.NoError(err) + require.NotNil(resp1) + require.NotEmpty(resp1.EvalID) + assertWriteMeta(t, wm) +} + +// TestJobs_ScaleStatus tests the /scale status endpoint for task group count +func TestJobs_ScaleStatus(t *testing.T) { + t.Parallel() + + require := require.New(t) + + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Trying to retrieve a status before it exists returns an error + id := "job-id/with\\troublesome:characters\n?&字\000" + _, _, err := jobs.ScaleStatus(id, "missing", nil) + require.Error(err) + require.Contains(err.Error(), "not found") + + // Register the job + job := testJobWithScalingPolicy() + job.ID = &id + groupName := *job.TaskGroups[0].Name + groupCount := *job.TaskGroups[0].Count + _, wm, err := jobs.Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm) + + // Query the scaling endpoint with bad group name, verify error + _, _, err = jobs.ScaleStatus(id, "incorrect-group-name", nil) + require.Error(err) + require.Contains(err.Error(), "not found") + + // Query the scaling endpoint and verify success + result, qm, err := jobs.ScaleStatus(id, groupName, nil) + require.NoError(err) + assertQueryMeta(t, qm) + + // Check that the result is what we expect + require.Equal(groupCount, int(result.Value.(float64))) +} diff --git a/api/util_test.go b/api/util_test.go index 2ebf502d7..f32d6d30d 100644 --- a/api/util_test.go +++ b/api/util_test.go @@ -48,6 +48,15 @@ func testJob() *Job { return job } +func testJobWithScalingPolicy() *Job { + job := testJob() + job.TaskGroups[0].Scaling = &ScalingPolicy{ + Policy: map[string]interface{}{}, + Enabled: boolToPtr(true), + } + return job +} + func testPeriodicJob() *Job { job := testJob().AddPeriodicConfig(&PeriodicConfig{ Enabled: boolToPtr(true), diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index d08ff8728..8f39ee708 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -3,7 +3,6 @@ package agent import ( "fmt" "net/http" - "regexp" "strconv" "strings" @@ -459,28 +458,29 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, func (s *HTTPServer) jobScale(resp http.ResponseWriter, req *http.Request, jobAndTarget string) (interface{}, error) { + + jobAndGroup := strings.TrimSuffix(jobAndTarget, "/scale") + var jobName, groupName string + if i := strings.LastIndex(jobAndGroup, "/"); i != -1 { + jobName = jobAndGroup[:i] + groupName = jobAndGroup[i+1:] + } + if jobName == "" || groupName == "" { + return nil, CodedError(400, "Invalid scaling target") + } + switch req.Method { case "GET": - return s.jobScaleStatus(resp, req, jobAndTarget) + return s.jobScaleStatus(resp, req, jobName, groupName) case "PUT", "POST": - return s.jobScaleAction(resp, req, jobAndTarget) + return s.jobScaleAction(resp, req, jobName, groupName) default: return nil, CodedError(405, ErrInvalidMethod) } } func (s *HTTPServer) jobScaleStatus(resp http.ResponseWriter, req *http.Request, - jobAndTarget string) (interface{}, error) { - - regJobGroup := regexp.MustCompile(`^(.+)/([^/]+)/scale$`) - var jobName, groupName string - if subMatch := regJobGroup.FindStringSubmatch(jobAndTarget); subMatch != nil { - jobName = subMatch[1] - groupName = subMatch[2] - } - if jobName == "" || groupName == "" { - return nil, CodedError(400, "Invalid scaling target") - } + jobName, groupName string) (interface{}, error) { args := structs.JobSpecificRequest{ JobID: jobName, @@ -513,7 +513,7 @@ func (s *HTTPServer) jobScaleStatus(resp http.ResponseWriter, req *http.Request, } func (s *HTTPServer) jobScaleAction(resp http.ResponseWriter, req *http.Request, - jobAndTarget string) (interface{}, error) { + jobName, groupName string) (interface{}, error) { if req.Method != "PUT" && req.Method != "POST" { return nil, CodedError(405, ErrInvalidMethod) @@ -527,18 +527,10 @@ func (s *HTTPServer) jobScaleAction(resp http.ResponseWriter, req *http.Request, if args.JobID == "" { return nil, CodedError(400, "Job ID must be specified") } - if !strings.HasPrefix(jobAndTarget, args.JobID) { + + if args.JobID != jobName { return nil, CodedError(400, "Job ID does not match") } - subTarget := strings.TrimPrefix(jobAndTarget, args.JobID) - groupScale := regexp.MustCompile(`^/([^/]+)/scale$`) - var groupName string - if subMatch := groupScale.FindStringSubmatch(subTarget); subMatch != nil { - groupName = subMatch[1] - } - if groupName == "" { - return nil, CodedError(400, "Invalid scaling target") - } scaleReq := structs.JobScaleRequest{ JobID: args.JobID, diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index fd8962ae2..8a979b096 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -846,6 +846,9 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes if err != nil { return err } + if job == nil { + return fmt.Errorf("job %q not found", args.JobID) + } found := false for _, tg := range job.TaskGroups {