mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 17:35:43 +03:00
wip: scaling status return, almost done
This commit is contained in:
@@ -177,8 +177,8 @@ func (j *Jobs) Scale(jobID, group string, count int,
|
||||
|
||||
// ScaleStatus is used to retrieve information about a particular
|
||||
// job given its unique ID.
|
||||
func (j *Jobs) ScaleStatus(jobID string, q *QueryOptions) (*ScaleStatusResponse, *QueryMeta, error) {
|
||||
var resp ScaleStatusResponse
|
||||
func (j *Jobs) ScaleStatus(jobID string, q *QueryOptions) (*JobScaleStatusResponse, *QueryMeta, error) {
|
||||
var resp JobScaleStatusResponse
|
||||
qm, err := j.client.query(fmt.Sprintf("/v1/job/%s/scale", url.PathEscape(jobID)), &resp, q)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
||||
@@ -69,12 +69,12 @@ type ScalingPolicyListStub struct {
|
||||
ModifyIndex uint64
|
||||
}
|
||||
|
||||
// ScaleStatusResponse is the payload for a generic scaling action
|
||||
type ScaleStatusResponse struct {
|
||||
// JobScaleStatusResponse is the payload for a generic scaling action
|
||||
type JobScaleStatusResponse struct {
|
||||
JobID string
|
||||
JobCreateIndex uint64
|
||||
JobModifyIndex uint64
|
||||
Stopped bool
|
||||
JobStopped bool
|
||||
TaskGroups map[string]TaskGroupScaleStatus
|
||||
}
|
||||
|
||||
|
||||
@@ -490,7 +490,7 @@ func (s *HTTPServer) jobScaleStatus(resp http.ResponseWriter, req *http.Request,
|
||||
return nil, CodedError(404, "job not found")
|
||||
}
|
||||
|
||||
status := &api.ScaleStatusResponse{
|
||||
status := &api.JobScaleStatusResponse{
|
||||
JobID: out.Job.ID,
|
||||
JobCreateIndex: out.Job.CreateIndex,
|
||||
JobModifyIndex: out.Job.ModifyIndex,
|
||||
|
||||
@@ -758,7 +758,7 @@ func TestHTTP_Job_GroupScaleStatus(t *testing.T) {
|
||||
require.NoError(err)
|
||||
|
||||
// Check the response
|
||||
status := obj.(*api.ScaleStatusResponse)
|
||||
status := obj.(*api.JobScaleStatusResponse)
|
||||
require.NotEmpty(resp.EvalID)
|
||||
require.Equal(job.TaskGroups[0].Count, status.Value.(int))
|
||||
|
||||
|
||||
@@ -840,7 +840,7 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st
|
||||
return nil
|
||||
}
|
||||
|
||||
// Scale is used to modify one of the scaling targest in the job
|
||||
// Scale is used to modify one of the scaling targets in the job
|
||||
func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterResponse) error {
|
||||
if done, err := j.srv.forward("Job.Scale", args, args, reply); done {
|
||||
return err
|
||||
@@ -1688,3 +1688,71 @@ func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ScaleStatus retrieves the scaling status for a job
|
||||
func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest,
|
||||
reply *structs.JobScaleStatusResponse) error {
|
||||
|
||||
if done, err := j.srv.forward("Job.ScaleStatus", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "job", "scale_status"}, time.Now())
|
||||
|
||||
// FINISH
|
||||
// Check for job-autoscaler permissions
|
||||
// if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
|
||||
// return err
|
||||
// } else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
|
||||
// return structs.ErrPermissionDenied
|
||||
// }
|
||||
|
||||
// Setup the blocking query
|
||||
opts := blockingOptions{
|
||||
queryOpts: &args.QueryOptions,
|
||||
queryMeta: &reply.QueryMeta,
|
||||
run: func(ws memdb.WatchSet, state *state.StateStore) error {
|
||||
|
||||
// We need the job and the job summary
|
||||
job, err := state.JobByID(ws, args.RequestNamespace(), args.JobID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if job == nil {
|
||||
return structs.NewErrRPCCoded(404, "job does not exist")
|
||||
}
|
||||
deployment, err := state.LatestDeploymentByJobID(ws, args.RequestNamespace(), args.JobID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Setup the output
|
||||
reply.JobModifyIndex = job.ModifyIndex
|
||||
reply.JobCreateIndex = job.CreateIndex
|
||||
reply.JobID = job.ID
|
||||
reply.JobStopped = job.Stop
|
||||
|
||||
for _, tg := range job.TaskGroups {
|
||||
tgScale := &structs.TaskGroupScaleStatus{
|
||||
Desired: tg.Count,
|
||||
}
|
||||
if deployment != nil {
|
||||
if ds, ok := deployment.TaskGroups[tg.Name]; ok {
|
||||
tgScale.Placed = ds.PlacedAllocs
|
||||
tgScale.Healthy = ds.HealthyAllocs
|
||||
tgScale.Unhealthy = ds.UnhealthyAllocs
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if deployment != nil && deployment.ModifyIndex > job.ModifyIndex {
|
||||
reply.Index = deployment.ModifyIndex
|
||||
} else {
|
||||
reply.Index = job.ModifyIndex
|
||||
}
|
||||
|
||||
// Set the query response
|
||||
j.srv.setQueryMeta(&reply.QueryMeta)
|
||||
return nil
|
||||
}}
|
||||
return j.srv.blockingRPC(&opts)
|
||||
}
|
||||
|
||||
@@ -5267,3 +5267,61 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobEndpoint_GetScaleStatus(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
defer cleanupS1()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request
|
||||
job := mock.Job()
|
||||
req := &structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: job.Namespace,
|
||||
},
|
||||
}
|
||||
|
||||
// Fetch the response
|
||||
var resp structs.JobRegisterResponse
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
|
||||
job.CreateIndex = resp.JobModifyIndex
|
||||
job.ModifyIndex = resp.JobModifyIndex
|
||||
|
||||
// Fetch the scaling status
|
||||
get := &structs.JobScaleStatusRequest{
|
||||
JobID: job.ID,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
Namespace: job.Namespace,
|
||||
},
|
||||
}
|
||||
var resp2 structs.JobScaleStatusResponse
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "Job.ScaleStatus", get, &resp2))
|
||||
|
||||
expectedStatus := structs.JobScaleStatusResponse{
|
||||
JobID: job.ID,
|
||||
JobCreateIndex: job.CreateIndex,
|
||||
JobModifyIndex: job.ModifyIndex,
|
||||
JobStopped: job.Stop,
|
||||
TaskGroups: map[string]structs.TaskGroupScaleStatus{
|
||||
job.TaskGroups[0].Name: {
|
||||
Desired: 1,
|
||||
Placed: 1,
|
||||
Running: 1,
|
||||
Healthy: 0,
|
||||
Unhealthy: 0,
|
||||
Events: nil,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
require.True(reflect.DeepEqual(resp2, expectedStatus))
|
||||
}
|
||||
|
||||
@@ -639,6 +639,12 @@ type JobSummaryRequest struct {
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
// JobScaleStatusRequest is used to get the scale status for a job
|
||||
type JobScaleStatusRequest struct {
|
||||
JobID string
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
// JobDispatchRequest is used to dispatch a job based on a parameterized job
|
||||
type JobDispatchRequest struct {
|
||||
JobID string
|
||||
@@ -1228,6 +1234,35 @@ type JobSummaryResponse struct {
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// JobScaleStatusResponse is used to return the scale status for a job
|
||||
type JobScaleStatusResponse struct {
|
||||
JobID string
|
||||
JobCreateIndex uint64
|
||||
JobModifyIndex uint64
|
||||
JobStopped bool
|
||||
TaskGroups map[string]TaskGroupScaleStatus
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// TaskGroupScaleStatus is used to return the scale status for a given task group
|
||||
type TaskGroupScaleStatus struct {
|
||||
Desired int
|
||||
Placed int
|
||||
Running int
|
||||
Healthy int
|
||||
Unhealthy int
|
||||
Events []ScalingEvent
|
||||
}
|
||||
|
||||
// ScalingEvent represents a specific scaling event
|
||||
type ScalingEvent struct {
|
||||
Reason *string
|
||||
Error *string
|
||||
Meta map[string]interface{}
|
||||
Time uint64
|
||||
EvalID *string
|
||||
}
|
||||
|
||||
type JobDispatchResponse struct {
|
||||
DispatchedJobID string
|
||||
EvalID string
|
||||
|
||||
Reference in New Issue
Block a user