From 03eb96aba23d7745c1c82d9b460dc2f5a4afd63a Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Fri, 20 Mar 2020 22:00:31 +0000 Subject: [PATCH] wip: scaling status return, almost done --- api/jobs.go | 4 +- api/scaling.go | 6 +-- command/agent/job_endpoint.go | 2 +- command/agent/job_endpoint_test.go | 2 +- nomad/job_endpoint.go | 70 +++++++++++++++++++++++++++++- nomad/job_endpoint_test.go | 58 +++++++++++++++++++++++++ nomad/structs/structs.go | 35 +++++++++++++++ 7 files changed, 169 insertions(+), 8 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index c8df51be3..843d85a97 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -177,8 +177,8 @@ func (j *Jobs) Scale(jobID, group string, count int, // ScaleStatus is used to retrieve information about a particular // job given its unique ID. -func (j *Jobs) ScaleStatus(jobID string, q *QueryOptions) (*ScaleStatusResponse, *QueryMeta, error) { - var resp ScaleStatusResponse +func (j *Jobs) ScaleStatus(jobID string, q *QueryOptions) (*JobScaleStatusResponse, *QueryMeta, error) { + var resp JobScaleStatusResponse qm, err := j.client.query(fmt.Sprintf("/v1/job/%s/scale", url.PathEscape(jobID)), &resp, q) if err != nil { return nil, nil, err diff --git a/api/scaling.go b/api/scaling.go index 2eae4abf5..3948b9921 100644 --- a/api/scaling.go +++ b/api/scaling.go @@ -69,12 +69,12 @@ type ScalingPolicyListStub struct { ModifyIndex uint64 } -// ScaleStatusResponse is the payload for a generic scaling action -type ScaleStatusResponse struct { +// JobScaleStatusResponse is the payload for a generic scaling action +type JobScaleStatusResponse struct { JobID string JobCreateIndex uint64 JobModifyIndex uint64 - Stopped bool + JobStopped bool TaskGroups map[string]TaskGroupScaleStatus } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 2cfcb026b..aafc52839 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -490,7 +490,7 @@ func (s *HTTPServer) jobScaleStatus(resp http.ResponseWriter, req *http.Request, return nil, CodedError(404, "job not found") } - status := &api.ScaleStatusResponse{ + status := &api.JobScaleStatusResponse{ JobID: out.Job.ID, JobCreateIndex: out.Job.CreateIndex, JobModifyIndex: out.Job.ModifyIndex, diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 6cf3b86d5..afbf0f607 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -758,7 +758,7 @@ func TestHTTP_Job_GroupScaleStatus(t *testing.T) { require.NoError(err) // Check the response - status := obj.(*api.ScaleStatusResponse) + status := obj.(*api.JobScaleStatusResponse) require.NotEmpty(resp.EvalID) require.Equal(job.TaskGroups[0].Count, status.Value.(int)) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index e1c29b26d..7f5ef41c0 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -840,7 +840,7 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st return nil } -// Scale is used to modify one of the scaling targest in the job +// Scale is used to modify one of the scaling targets in the job func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterResponse) error { if done, err := j.srv.forward("Job.Scale", args, args, reply); done { return err @@ -1688,3 +1688,71 @@ func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job) return nil } + +// ScaleStatus retrieves the scaling status for a job +func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest, + reply *structs.JobScaleStatusResponse) error { + + if done, err := j.srv.forward("Job.ScaleStatus", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "job", "scale_status"}, time.Now()) + + // FINISH + // Check for job-autoscaler permissions + // if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { + // return err + // } else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) { + // return structs.ErrPermissionDenied + // } + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, state *state.StateStore) error { + + // We need the job and the job summary + job, err := state.JobByID(ws, args.RequestNamespace(), args.JobID) + if err != nil { + return err + } + if job == nil { + return structs.NewErrRPCCoded(404, "job does not exist") + } + deployment, err := state.LatestDeploymentByJobID(ws, args.RequestNamespace(), args.JobID) + if err != nil { + return err + } + + // Setup the output + reply.JobModifyIndex = job.ModifyIndex + reply.JobCreateIndex = job.CreateIndex + reply.JobID = job.ID + reply.JobStopped = job.Stop + + for _, tg := range job.TaskGroups { + tgScale := &structs.TaskGroupScaleStatus{ + Desired: tg.Count, + } + if deployment != nil { + if ds, ok := deployment.TaskGroups[tg.Name]; ok { + tgScale.Placed = ds.PlacedAllocs + tgScale.Healthy = ds.HealthyAllocs + tgScale.Unhealthy = ds.UnhealthyAllocs + } + } + } + + if deployment != nil && deployment.ModifyIndex > job.ModifyIndex { + reply.Index = deployment.ModifyIndex + } else { + reply.Index = job.ModifyIndex + } + + // Set the query response + j.srv.setQueryMeta(&reply.QueryMeta) + return nil + }} + return j.srv.blockingRPC(&opts) +} diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 5954a2bc4..97fdccf1b 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -5267,3 +5267,61 @@ func TestJobEndpoint_Dispatch(t *testing.T) { }) } } + +func TestJobEndpoint_GetScaleStatus(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp structs.JobRegisterResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + job.CreateIndex = resp.JobModifyIndex + job.ModifyIndex = resp.JobModifyIndex + + // Fetch the scaling status + get := &structs.JobScaleStatusRequest{ + JobID: job.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobScaleStatusResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "Job.ScaleStatus", get, &resp2)) + + expectedStatus := structs.JobScaleStatusResponse{ + JobID: job.ID, + JobCreateIndex: job.CreateIndex, + JobModifyIndex: job.ModifyIndex, + JobStopped: job.Stop, + TaskGroups: map[string]structs.TaskGroupScaleStatus{ + job.TaskGroups[0].Name: { + Desired: 1, + Placed: 1, + Running: 1, + Healthy: 0, + Unhealthy: 0, + Events: nil, + }, + }, + } + + require.True(reflect.DeepEqual(resp2, expectedStatus)) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ee9c0fa56..65f80cfe8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -639,6 +639,12 @@ type JobSummaryRequest struct { QueryOptions } +// JobScaleStatusRequest is used to get the scale status for a job +type JobScaleStatusRequest struct { + JobID string + QueryOptions +} + // JobDispatchRequest is used to dispatch a job based on a parameterized job type JobDispatchRequest struct { JobID string @@ -1228,6 +1234,35 @@ type JobSummaryResponse struct { QueryMeta } +// JobScaleStatusResponse is used to return the scale status for a job +type JobScaleStatusResponse struct { + JobID string + JobCreateIndex uint64 + JobModifyIndex uint64 + JobStopped bool + TaskGroups map[string]TaskGroupScaleStatus + QueryMeta +} + +// TaskGroupScaleStatus is used to return the scale status for a given task group +type TaskGroupScaleStatus struct { + Desired int + Placed int + Running int + Healthy int + Unhealthy int + Events []ScalingEvent +} + +// ScalingEvent represents a specific scaling event +type ScalingEvent struct { + Reason *string + Error *string + Meta map[string]interface{} + Time uint64 + EvalID *string +} + type JobDispatchResponse struct { DispatchedJobID string EvalID string