diff --git a/api/scaling.go b/api/scaling.go new file mode 100644 index 000000000..efa042ec7 --- /dev/null +++ b/api/scaling.go @@ -0,0 +1,22 @@ +package api + +// ScalingPolicy is the user-specified API object for an autoscaling policy +type ScalingPolicy struct { + Policy map[string]interface{} + Enabled *bool +} + +func (p *ScalingPolicy) Canonicalize() { + if p.Enabled == nil { + p.Enabled = boolToPtr(true) + } +} + +// ScalingRequeset is the payload for a generic scaling action +type ScalingRequest struct { + JobID string + Value interface{} + Reason string + WriteRequest + PolicyOverride bool +} diff --git a/api/tasks.go b/api/tasks.go index d4856ee2e..f7ee42a78 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -409,18 +409,6 @@ func (vm *VolumeMount) Canonicalize() { } } -// ScalingPolicy is the user-specified API object for an autoscaling policy -type ScalingPolicy struct { - Policy map[string]interface{} - Enabled *bool -} - -func (p *ScalingPolicy) Canonicalize() { - if p.Enabled == nil { - p.Enabled = boolToPtr(true) - } -} - // TaskGroup is the unit of scheduling. type TaskGroup struct { Name *string diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 83a020bc9..4db5f5b93 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -3,6 +3,7 @@ package agent import ( "fmt" "net/http" + "regexp" "strconv" "strings" @@ -82,6 +83,8 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ case strings.HasSuffix(path, "/stable"): jobName := strings.TrimSuffix(path, "/stable") return s.jobStable(resp, req, jobName) + case strings.HasSuffix(path, "/scale"): + return s.jobScale(resp, req, path) default: return s.jobCRUD(resp, req, path) } @@ -454,6 +457,49 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, return out, nil } +func (s *HTTPServer) jobScale(resp http.ResponseWriter, req *http.Request, + jobAndTarget string) (interface{}, error) { + if req.Method != "PUT" && req.Method != "POST" { + return nil, CodedError(405, ErrInvalidMethod) + } + + var args api.ScalingRequest + if err := decodeBody(req, &args); err != nil { + return nil, CodedError(400, err.Error()) + } + + if args.JobID == "" { + return nil, CodedError(400, "Job ID must be specified") + } + if !strings.HasPrefix(jobAndTarget, args.JobID) { + return nil, CodedError(400, "Job ID does not match") + } + subTarget := strings.TrimPrefix(jobAndTarget, args.JobID) + groupScale := regexp.MustCompile(`/[^/]+/scale`) + groupName := groupScale.FindString(subTarget) + if groupName == "" { + return nil, CodedError(400, "Invalid scaling target") + } + + scaleReq := structs.JobScaleRequest{ + JobID: args.JobID, + GroupName: groupName, + Value: args.Value, + PolicyOverride: args.PolicyOverride, + Reason: args.Reason, + } + // parseWriteRequest overrides Namespace, Region and AuthToken + // based on values from the original http request + s.parseWriteRequest(req, &scaleReq.WriteRequest) + + var out structs.JobRegisterResponse + if err := s.agent.RPC("Job.Scale", &scaleReq, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + return out, nil +} + func (s *HTTPServer) jobVersions(resp http.ResponseWriter, req *http.Request, jobName string) (interface{}, error) { diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 807a58375..eebb1fb97 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -666,6 +666,66 @@ func TestHTTP_JobDelete(t *testing.T) { }) } +func TestHTTP_Job_GroupScale(t *testing.T) { + t.Parallel() + + require := require.New(t) + + httpTest(t, nil, func(s *TestAgent) { + // Create the job + job, policy := mock.JobWithScalingPolicy() + args := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + var resp structs.JobRegisterResponse + if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + newCount := job.TaskGroups[0].Count + 1 + scaleReq := &api.ScalingRequest{ + JobID: job.ID, + Value: newCount, + Reason: "testing", + } + buf := encodeReq(scaleReq) + + // Make the HTTP request to scale the job group + req, err := http.NewRequest("POST", policy.Target, buf) + require.NoError(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.JobSpecificRequest(respW, req) + require.NoError(err) + + // Check the response + resp = obj.(structs.JobRegisterResponse) + require.NotEmpty(resp.EvalID) + + // Check for the index + require.NotEmpty(respW.Header().Get("X-Nomad-Index")) + + // Check that the group count was changed + getReq := structs.JobSpecificRequest{ + JobID: job.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + var getResp structs.SingleJobResponse + err = s.Agent.RPC("Job.GetJob", &getReq, &getResp) + require.NoError(err) + require.NotNil(getResp.Job) + require.Equal(newCount, getResp.Job.TaskGroups[0].Count) + }) +} + func TestHTTP_JobForceEvaluate(t *testing.T) { t.Parallel() httpTest(t, nil, func(s *TestAgent) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d03e9c403..f772bff0c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -618,6 +618,18 @@ type JobPlanRequest struct { WriteRequest } +// JobScaleRequest is used for the Job.Scale endpoint to scale one of the +// scaling targets in a job +type JobScaleRequest struct { + JobID string + GroupName string + Value interface{} + Reason string + // PolicyOverride is set when the user is attempting to override any policies + PolicyOverride bool + WriteRequest +} + // JobSummaryRequest is used when we just need to get a specific job summary type JobSummaryRequest struct { JobID string