wip: working on job group scaling endpoint

This commit is contained in:
Chris Baker
2020-01-16 22:14:00 +00:00
parent a715eb7820
commit 8102849683
5 changed files with 140 additions and 12 deletions

22
api/scaling.go Normal file
View File

@@ -0,0 +1,22 @@
package api
// ScalingPolicy is the user-specified API object for an autoscaling policy
type ScalingPolicy struct {
Policy map[string]interface{}
Enabled *bool
}
func (p *ScalingPolicy) Canonicalize() {
if p.Enabled == nil {
p.Enabled = boolToPtr(true)
}
}
// ScalingRequeset is the payload for a generic scaling action
type ScalingRequest struct {
JobID string
Value interface{}
Reason string
WriteRequest
PolicyOverride bool
}

View File

@@ -409,18 +409,6 @@ func (vm *VolumeMount) Canonicalize() {
}
}
// ScalingPolicy is the user-specified API object for an autoscaling policy
type ScalingPolicy struct {
Policy map[string]interface{}
Enabled *bool
}
func (p *ScalingPolicy) Canonicalize() {
if p.Enabled == nil {
p.Enabled = boolToPtr(true)
}
}
// TaskGroup is the unit of scheduling.
type TaskGroup struct {
Name *string

View File

@@ -3,6 +3,7 @@ package agent
import (
"fmt"
"net/http"
"regexp"
"strconv"
"strings"
@@ -82,6 +83,8 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
case strings.HasSuffix(path, "/stable"):
jobName := strings.TrimSuffix(path, "/stable")
return s.jobStable(resp, req, jobName)
case strings.HasSuffix(path, "/scale"):
return s.jobScale(resp, req, path)
default:
return s.jobCRUD(resp, req, path)
}
@@ -454,6 +457,49 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
return out, nil
}
func (s *HTTPServer) jobScale(resp http.ResponseWriter, req *http.Request,
jobAndTarget string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
}
var args api.ScalingRequest
if err := decodeBody(req, &args); err != nil {
return nil, CodedError(400, err.Error())
}
if args.JobID == "" {
return nil, CodedError(400, "Job ID must be specified")
}
if !strings.HasPrefix(jobAndTarget, args.JobID) {
return nil, CodedError(400, "Job ID does not match")
}
subTarget := strings.TrimPrefix(jobAndTarget, args.JobID)
groupScale := regexp.MustCompile(`/[^/]+/scale`)
groupName := groupScale.FindString(subTarget)
if groupName == "" {
return nil, CodedError(400, "Invalid scaling target")
}
scaleReq := structs.JobScaleRequest{
JobID: args.JobID,
GroupName: groupName,
Value: args.Value,
PolicyOverride: args.PolicyOverride,
Reason: args.Reason,
}
// parseWriteRequest overrides Namespace, Region and AuthToken
// based on values from the original http request
s.parseWriteRequest(req, &scaleReq.WriteRequest)
var out structs.JobRegisterResponse
if err := s.agent.RPC("Job.Scale", &scaleReq, &out); err != nil {
return nil, err
}
setIndex(resp, out.Index)
return out, nil
}
func (s *HTTPServer) jobVersions(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {

View File

@@ -666,6 +666,66 @@ func TestHTTP_JobDelete(t *testing.T) {
})
}
func TestHTTP_Job_GroupScale(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
// Create the job
job, policy := mock.JobWithScalingPolicy()
args := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var resp structs.JobRegisterResponse
if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil {
t.Fatalf("err: %v", err)
}
newCount := job.TaskGroups[0].Count + 1
scaleReq := &api.ScalingRequest{
JobID: job.ID,
Value: newCount,
Reason: "testing",
}
buf := encodeReq(scaleReq)
// Make the HTTP request to scale the job group
req, err := http.NewRequest("POST", policy.Target, buf)
require.NoError(err)
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.JobSpecificRequest(respW, req)
require.NoError(err)
// Check the response
resp = obj.(structs.JobRegisterResponse)
require.NotEmpty(resp.EvalID)
// Check for the index
require.NotEmpty(respW.Header().Get("X-Nomad-Index"))
// Check that the group count was changed
getReq := structs.JobSpecificRequest{
JobID: job.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var getResp structs.SingleJobResponse
err = s.Agent.RPC("Job.GetJob", &getReq, &getResp)
require.NoError(err)
require.NotNil(getResp.Job)
require.Equal(newCount, getResp.Job.TaskGroups[0].Count)
})
}
func TestHTTP_JobForceEvaluate(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {

View File

@@ -618,6 +618,18 @@ type JobPlanRequest struct {
WriteRequest
}
// JobScaleRequest is used for the Job.Scale endpoint to scale one of the
// scaling targets in a job
type JobScaleRequest struct {
JobID string
GroupName string
Value interface{}
Reason string
// PolicyOverride is set when the user is attempting to override any policies
PolicyOverride bool
WriteRequest
}
// JobSummaryRequest is used when we just need to get a specific job summary
type JobSummaryRequest struct {
JobID string