diff --git a/.changelog/20130.txt b/.changelog/20130.txt new file mode 100644 index 000000000..1488def61 --- /dev/null +++ b/.changelog/20130.txt @@ -0,0 +1,3 @@ +```release-note:improvement +api: new /v1/jobs/statuses endpoint collates details about jobs' allocs and latest deployment, intended for use in the updated UI jobs index page +``` diff --git a/api/jobs.go b/api/jobs.go index 3bd60c4ef..407b13568 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -1544,3 +1544,12 @@ func (j *Jobs) ActionExec(ctx context.Context, return s.run(ctx) } + +// JobStatusesRequest is used to get statuses for jobs, +// their allocations and deployments. +type JobStatusesRequest struct { + // Jobs may be optionally provided to request a subset of specific jobs. + Jobs []NamespacedID + // IncludeChildren will include child (batch) jobs in the response. + IncludeChildren bool +} diff --git a/api/namespace.go b/api/namespace.go index cbe5849f5..bc12ec77d 100644 --- a/api/namespace.go +++ b/api/namespace.go @@ -156,3 +156,12 @@ func (n NamespaceIndexSort) Less(i, j int) bool { func (n NamespaceIndexSort) Swap(i, j int) { n[i], n[j] = n[j], n[i] } + +// NamespacedID is used for things that are unique only per-namespace, +// such as jobs. +type NamespacedID struct { + // Namespace is the Name of the Namespace + Namespace string + // ID is the ID of the namespaced object (e.g. Job ID) + ID string +} diff --git a/command/agent/http.go b/command/agent/http.go index c08d4fdf5..77394e8d8 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -382,6 +382,7 @@ func (s *HTTPServer) ResolveToken(req *http.Request) (*acl.ACL, error) { func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/jobs", s.wrap(s.JobsRequest)) s.mux.HandleFunc("/v1/jobs/parse", s.wrap(s.JobsParseRequest)) + s.mux.HandleFunc("/v1/jobs/statuses", s.wrap(s.JobStatusesRequest)) s.mux.HandleFunc("/v1/job/", s.wrap(s.JobSpecificRequest)) s.mux.HandleFunc("/v1/nodes", s.wrap(s.NodesRequest)) diff --git a/command/agent/job_endpoint_statuses.go b/command/agent/job_endpoint_statuses.go new file mode 100644 index 000000000..5bd6f4d36 --- /dev/null +++ b/command/agent/job_endpoint_statuses.go @@ -0,0 +1,79 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package agent + +import ( + "fmt" + "net/http" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/nomad/structs" +) + +// JobStatusesRequest looks up the status of jobs' allocs and deployments, +// primarily for use in the UI on the /ui/jobs index page. +func (s *HTTPServer) JobStatusesRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + var out structs.JobStatusesResponse + args := structs.JobStatusesRequest{} + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + switch req.Method { + case http.MethodGet, http.MethodPost: + break + default: + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) + } + + if includeChildren, err := parseBool(req, "include_children"); err != nil { + return nil, CodedError(http.StatusBadRequest, err.Error()) + } else if includeChildren != nil { + args.IncludeChildren = *includeChildren + } + + // ostensibly GETs should not accept structured body, but the HTTP spec + // on this is more what you'd call "guidelines" than actual rules. + if req.Body != nil && req.Body != http.NoBody { + var in api.JobStatusesRequest + if err := decodeBody(req, &in); err != nil { + return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("error decoding request: %v", err)) + } + if len(in.Jobs) == 0 { + return nil, CodedError(http.StatusBadRequest, "no jobs in request") + } + + // each job has a separate namespace, so in case the NSes are mixed, + // default to wildcard. + // if all requested jobs turn out to have the same namespace, + // then the RPC endpoint will notice that and override this anyway. + if args.QueryOptions.Namespace == structs.DefaultNamespace { + args.QueryOptions.Namespace = structs.AllNamespacesSentinel + } + + args.Jobs = make([]structs.NamespacedID, len(in.Jobs)) + for i, j := range in.Jobs { + if j.Namespace == "" { + j.Namespace = structs.DefaultNamespace + } + args.Jobs[i] = structs.NamespacedID{ + ID: j.ID, + Namespace: j.Namespace, + } + } + + // not a direct assignment, because if it is false (default), + // it could override the "include_children" query param. + if in.IncludeChildren { + args.IncludeChildren = true + } + } + + if err := s.agent.RPC("Job.Statuses", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + return out.Jobs, nil +} diff --git a/command/agent/job_endpoint_statuses_test.go b/command/agent/job_endpoint_statuses_test.go new file mode 100644 index 000000000..88ba07833 --- /dev/null +++ b/command/agent/job_endpoint_statuses_test.go @@ -0,0 +1,247 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package agent + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test" + "github.com/shoenig/test/must" +) + +func TestJobEndpoint_Statuses(t *testing.T) { + ci.Parallel(t) + httpTest(t, cb, func(s *TestAgent) { + apiPath := "/v1/jobs/statuses" + + parent := mock.MinJob() + parent.ID = "parent" + child := mock.MinJob() + child.ID = "parent/child" + child.ParentID = "parent" + otherNS := mock.MinJob() + otherNS.ID = "otherNS" + otherNS.Namespace = "other" + + // lil helpers + registerJob := func(t *testing.T, job *structs.Job) { + must.NoError(t, s.Agent.RPC("Job.Register", + &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + }, &structs.JobRegisterResponse{}), + ) + } + createNamespace := func(t *testing.T, ns string) { + must.NoError(t, s.Agent.RPC("Namespace.UpsertNamespaces", + &structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{{ + Name: ns, + }}, + WriteRequest: structs.WriteRequest{Region: "global"}, + }, &structs.GenericResponse{})) + } + buildRequest := func(t *testing.T, method, url, body string) *http.Request { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Cleanup(cancel) + var reqBody io.Reader = http.NoBody + if body != "" { + reqBody = bytes.NewReader([]byte(body)) + } + req, err := http.NewRequestWithContext(ctx, method, url, reqBody) + must.NoError(t, err) + return req + } + + // note: this api will return jobs ordered by ModifyIndex, + // so in reverse order of their creation here. + registerJob(t, parent) + registerJob(t, child) + createNamespace(t, otherNS.Namespace) + registerJob(t, otherNS) + + testCases := []struct { + name string + + // request + method, params, body string + + // response + expectCode int + expectErr string + expectIDs []string + expectHeaders []string + }{ + { + name: "bad method", method: "LOL", + expectCode: 405, expectErr: ErrInvalidMethod, + }, + { + name: "bad request param", + params: "?include_children=not-a-bool", + expectCode: 400, expectErr: `Failed to parse value of "include_children"`, + }, + + { + name: "get ok", + expectIDs: []string{"parent"}, + }, + { + name: "get all namespaces", + params: "?namespace=*", + expectIDs: []string{"otherNS", "parent"}, + }, + { + name: "get all reverse", + params: "?namespace=*&reverse=true", + expectIDs: []string{"parent", "otherNS"}, + }, + { + name: "get one page", + params: "?namespace=*&per_page=1", + expectIDs: []string{"otherNS"}, + expectHeaders: []string{"X-Nomad-NextToken"}, + }, + { + name: "get children", + params: "?include_children=true", + expectIDs: []string{"parent/child", "parent"}, + }, + { + name: "get children filter", + // this is how the UI does parent job pages + params: "?include_children=true&filter=ParentID == parent", + expectIDs: []string{"parent/child"}, + }, + + // POST and GET are interchangeable, but by convention, the UI will + // POST when sending a request body, so here we test like that too. + { + name: "post no jobs", + method: "POST", + body: `{"jobs": []}`, + expectCode: 400, expectErr: "no jobs in request", + }, + { + name: "post bad body", + method: "POST", body: "{malformed", + expectCode: 400, expectErr: "error decoding request: invalid character 'm'", + }, + { + name: "post nonexistent job", + method: "POST", + body: `{"jobs": [{"id": "whatever", "namespace": "nope"}]}`, + expectIDs: []string{}, + }, + { + name: "post single job", + method: "POST", + body: `{"jobs": [{"id": "parent"}]}`, + expectIDs: []string{"parent"}, + }, + { + name: "post all namespaces", + method: "POST", + // no ?namespace param required, because we default to "*" + // if there is a request body (and ns query is "default") + body: `{"jobs": [{"id": "parent"}, {"id": "otherNS", "namespace": "other"}]}`, + expectIDs: []string{"otherNS", "parent"}, + }, + { + name: "post auto namespace", + method: "POST", + // namespace gets overridden by the RPC endpoint, + // because jobs in the request body are all one namespace. + params: "?namespace=nope", + body: `{"jobs": [{"id": "parent", "namespace": "default"}]}`, + expectIDs: []string{"parent"}, + }, + { + name: "post auto namespaces other", + method: "POST", + // "other" namespace should be auto-detected, as it's the only one + body: `{"jobs": [{"id": "otherNS", "namespace": "other"}]}`, + expectIDs: []string{"otherNS"}, + }, + { + name: "post wrong namespace param", + method: "POST", + params: "?namespace=nope", + // namespace can not be auto-detected, since there are two here, + // so it uses the provided param + body: `{"jobs": [{"id": "parent"}, {"id": "otherNS", "namespace": "other"}]}`, + expectIDs: []string{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // default happy path values + if tc.method == "" { + tc.method = "GET" + } + if tc.expectCode == 0 { + tc.expectCode = 200 + } + + req := buildRequest(t, tc.method, apiPath+tc.params, tc.body) + recorder := httptest.NewRecorder() + + // method under test! + raw, err := s.Server.JobStatusesRequest(recorder, req) + + // sad path + if tc.expectErr != "" { + must.ErrorContains(t, err, tc.expectErr) + var coded *codedError + must.True(t, errors.As(err, &coded)) + must.Eq(t, tc.expectCode, coded.code) + + must.Nil(t, raw) + return + } + + // happy path + must.NoError(t, err) + result := recorder.Result() + must.Eq(t, tc.expectCode, result.StatusCode) + + // check response body + jobs := raw.([]structs.JobStatusesJob) + gotIDs := make([]string, len(jobs)) + for i, j := range jobs { + gotIDs[i] = j.ID + } + must.Eq(t, tc.expectIDs, gotIDs) + + // check headers + expectHeaders := append( + []string{ + "X-Nomad-Index", + "X-Nomad-Lastcontact", + "X-Nomad-Knownleader", + }, + tc.expectHeaders..., + ) + for _, h := range expectHeaders { + test.NotEq(t, "", result.Header.Get(h), + test.Sprintf("expect '%s' header", h)) + } + }) + } + }) +} diff --git a/nomad/job_endpoint_statuses.go b/nomad/job_endpoint_statuses.go new file mode 100644 index 000000000..18da2e5f3 --- /dev/null +++ b/nomad/job_endpoint_statuses.go @@ -0,0 +1,303 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package nomad + +import ( + "errors" + "net/http" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-set/v2" + "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/state/paginator" + "github.com/hashicorp/nomad/nomad/structs" +) + +// Statuses looks up info about jobs, their allocs, and latest deployment. +func (j *Job) Statuses( + args *structs.JobStatusesRequest, + reply *structs.JobStatusesResponse) error { + + authErr := j.srv.Authenticate(j.ctx, args) + if done, err := j.srv.forward("Job.Statuses", args, args, reply); done { + return err + } + j.srv.MeasureRPCRate("job", structs.RateMetricList, args) + if authErr != nil { + return structs.ErrPermissionDenied + } + defer metrics.MeasureSince([]string{"nomad", "jobs", "statuses"}, time.Now()) + + namespace := args.RequestNamespace() + // the namespace from the UI by default is "*", but if specific jobs are + // requested, all with the same namespace, AllowNsOp() below may be able + // to quickly deny the request if the token lacks permissions for that ns, + // rather than iterating the whole jobs table and filtering out every job. + if len(args.Jobs) > 0 { + nses := set.New[string](1) + for _, j := range args.Jobs { + nses.Insert(j.Namespace) + } + if nses.Size() == 1 { + namespace = nses.Slice()[0] + } + } + + // check for read-job permissions, since this endpoint includes alloc info + // and possibly a deployment ID, and those APIs require read-job. + aclObj, err := j.srv.ResolveACL(args) + if err != nil { + return err + } + if !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob) { + return structs.ErrPermissionDenied + } + allow := aclObj.AllowNsOpFunc(acl.NamespaceCapabilityReadJob) + + store := j.srv.State() + + // get the namespaces the user is allowed to access. + allowableNamespaces, err := allowedNSes(aclObj, store, allow) + if errors.Is(err, structs.ErrPermissionDenied) { + // return empty jobs if token isn't authorized for any + // namespace, matching other endpoints + reply.Jobs = make([]structs.JobStatusesJob, 0) + return nil + } else if err != nil { + return err + } + // since the state index we're using doesn't include namespace, + // explicitly add the user-provided ns to our filter if needed. + // (allowableNamespaces will be nil if the caller sent a mgmt token) + if allowableNamespaces == nil && + namespace != "" && + namespace != structs.AllNamespacesSentinel { + allowableNamespaces = map[string]bool{ + namespace: true, + } + } + + // compare between state run() unblocks to see if the RPC, as a whole, + // should unblock. i.e. if new jobs shift the page, or when jobs go away. + prevJobs := set.New[structs.NamespacedID](0) + + // because the state index is in order of ModifyIndex, lowest to highest, + // SortDefault would show oldest jobs first, so instead invert the default + // to show most recent job changes first. + args.QueryOptions.Reverse = !args.QueryOptions.Reverse + sort := state.QueryOptionSort(args.QueryOptions) + + // special blocking note: this endpoint employs an unconventional method + // of determining the reply.Index in order to avoid unblocking when + // something changes "off page" -- instead of using the latest index + // from any/all of the state tables queried here (all of: "jobs", "allocs", + // "deployments"), we use the highest ModifyIndex of all items encountered + // while iterating. + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, state *state.StateStore) error { + var err error + var iter memdb.ResultIterator + + // the UI jobs index page shows most-recently changed first. + iter, err = state.JobsByModifyIndex(ws, sort) + if err != nil { + return err + } + + // set up tokenizer and filters + tokenizer := paginator.NewStructsTokenizer( + iter, + paginator.StructsTokenizerOptions{ + OnlyModifyIndex: true, + }, + ) + filters := []paginator.Filter{ + paginator.NamespaceFilter{ + AllowableNamespaces: allowableNamespaces, + }, + // skip child jobs unless requested to include them + paginator.GenericFilter{Allow: func(i interface{}) (bool, error) { + if args.IncludeChildren { + return true, nil + } + job := i.(*structs.Job) + return job.ParentID == "", nil + }}, + } + // only provide specific jobs if requested. + if len(args.Jobs) > 0 { + // set per-page to avoid iterating the whole table + args.QueryOptions.PerPage = int32(len(args.Jobs)) + // filter in the requested jobs + jobSet := set.From[structs.NamespacedID](args.Jobs) + filters = append(filters, paginator.GenericFilter{ + Allow: func(i interface{}) (bool, error) { + job := i.(*structs.Job) + return jobSet.Contains(job.NamespacedID()), nil + }, + }) + } + + jobs := make([]structs.JobStatusesJob, 0) + newJobs := set.New[structs.NamespacedID](0) + pager, err := paginator.NewPaginator(iter, tokenizer, filters, args.QueryOptions, + func(raw interface{}) error { + job := raw.(*structs.Job) + + // this is where the sausage is made + jsj, highestIndexOnPage, err := jobStatusesJobFromJob(ws, state, job) + if err != nil { + return err + } + + jobs = append(jobs, jsj) + newJobs.Insert(job.NamespacedID()) + + // by using the highest index we find on any job/alloc/ + // deployment among the jobs on the page, instead of the + // latest index for any particular state table, we can + // avoid unblocking the RPC if something changes "off page" + if highestIndexOnPage > reply.Index { + reply.Index = highestIndexOnPage + } + return nil + }) + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusInternalServerError, "failed to create result paginator: %v", err) + } + + nextToken, err := pager.Page() + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusInternalServerError, "failed to read result page: %v", err) + } + + // if the page has updated, or a job has gone away, + // bump the index to latest jobs entry. + if !prevJobs.Empty() && !newJobs.Equal(prevJobs) { + reply.Index, err = state.Index("jobs") + if err != nil { + return err + } + } + prevJobs = newJobs + + reply.QueryMeta.NextToken = nextToken + reply.Jobs = jobs + + return nil + }} + return j.srv.blockingRPC(&opts) +} + +func jobStatusesJobFromJob(ws memdb.WatchSet, store *state.StateStore, job *structs.Job) (structs.JobStatusesJob, uint64, error) { + highestIdx := job.ModifyIndex + + jsj := structs.JobStatusesJob{ + NamespacedID: structs.NamespacedID{ + ID: job.ID, + Namespace: job.Namespace, + }, + Name: job.Name, + Type: job.Type, + NodePool: job.NodePool, + Datacenters: job.Datacenters, + Priority: job.Priority, + Version: job.Version, + ParentID: job.ParentID, + SubmitTime: job.SubmitTime, + ModifyIndex: job.ModifyIndex, + // included here for completeness, populated below. + Allocs: nil, + GroupCountSum: 0, + ChildStatuses: nil, + LatestDeployment: nil, + } + + // the GroupCountSum will map to how many allocations we expect to run + // (for service jobs) + for _, tg := range job.TaskGroups { + jsj.GroupCountSum += tg.Count + } + + // collect the statuses of child jobs + if job.IsParameterized() || job.IsPeriodic() { + jsj.ChildStatuses = make([]string, 0) // set to not-nil + children, err := store.JobsByIDPrefix(ws, job.Namespace, job.ID, state.SortDefault) + if err != nil { + return jsj, highestIdx, err + } + for { + child := children.Next() + if child == nil { + break + } + j := child.(*structs.Job) + // note: this filters out grandchildren jobs (children of children) + if j.ParentID != job.ID { + continue + } + if j.ModifyIndex > highestIdx { + highestIdx = j.ModifyIndex + } + jsj.ChildStatuses = append(jsj.ChildStatuses, j.Status) + } + // no allocs or deployments for parameterized/period jobs, + // so we're done here. + return jsj, highestIdx, err + } + + // collect info about allocations + allocs, err := store.AllocsByJob(ws, job.Namespace, job.ID, true) + if err != nil { + return jsj, highestIdx, err + } + for _, a := range allocs { + jsa := structs.JobStatusesAlloc{ + ID: a.ID, + Group: a.TaskGroup, + ClientStatus: a.ClientStatus, + NodeID: a.NodeID, + JobVersion: a.Job.Version, + FollowupEvalID: a.FollowupEvalID, + } + if a.DeploymentStatus != nil { + jsa.DeploymentStatus.Canary = a.DeploymentStatus.IsCanary() + jsa.DeploymentStatus.Healthy = a.DeploymentStatus.Healthy + } + jsj.Allocs = append(jsj.Allocs, jsa) + + if a.ModifyIndex > highestIdx { + highestIdx = a.ModifyIndex + } + } + + // look for latest deployment + deploy, err := store.LatestDeploymentByJobID(ws, job.Namespace, job.ID) + if err != nil { + return jsj, highestIdx, err + } + if deploy != nil { + jsj.LatestDeployment = &structs.JobStatusesLatestDeployment{ + ID: deploy.ID, + IsActive: deploy.Active(), + JobVersion: deploy.JobVersion, + Status: deploy.Status, + StatusDescription: deploy.StatusDescription, + AllAutoPromote: deploy.HasAutoPromote(), + RequiresPromotion: deploy.RequiresPromotion(), + } + + if deploy.ModifyIndex > highestIdx { + highestIdx = deploy.ModifyIndex + } + } + return jsj, highestIdx, nil +} diff --git a/nomad/job_endpoint_statuses_test.go b/nomad/job_endpoint_statuses_test.go new file mode 100644 index 000000000..c5b3367d8 --- /dev/null +++ b/nomad/job_endpoint_statuses_test.go @@ -0,0 +1,382 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package nomad + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test" + "github.com/shoenig/test/must" +) + +func TestJob_Statuses_ACL(t *testing.T) { + s, _, cleanup := TestACLServer(t, nil) + t.Cleanup(cleanup) + testutil.WaitForLeader(t, s.RPC) + + insufficientToken := mock.CreatePolicyAndToken(t, s.State(), 1, "job-lister", + mock.NamespacePolicy("default", "", []string{"list-jobs"})) + happyToken := mock.CreatePolicyAndToken(t, s.State(), 2, "job-reader", + mock.NamespacePolicy("default", "", []string{"read-job"})) + + for _, tc := range []struct { + name, token, err string + }{ + {"no token", "", "Permission denied"}, + {"insufficient perms", insufficientToken.SecretID, "Permission denied"}, + {"happy token", happyToken.SecretID, ""}, + } { + t.Run(tc.name, func(t *testing.T) { + req := &structs.JobStatusesRequest{} + req.QueryOptions.Region = "global" + req.QueryOptions.AuthToken = tc.token + + var resp structs.JobStatusesResponse + err := s.RPC("Job.Statuses", &req, &resp) + + if tc.err != "" { + must.ErrorContains(t, err, tc.err) + } else { + must.NoError(t, err) + } + }) + } +} + +func TestJob_Statuses(t *testing.T) { + s, cleanup := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + t.Cleanup(cleanup) + testutil.WaitForLeader(t, s.RPC) + + // method under test + doRequest := func(t *testing.T, req *structs.JobStatusesRequest) (resp structs.JobStatusesResponse) { + t.Helper() + must.NotNil(t, req, must.Sprint("request must not be nil")) + req.QueryOptions.Region = "global" + must.NoError(t, s.RPC("Job.Statuses", req, &resp)) + return resp + } + + // increment state index helper + incIdx := func(t *testing.T) uint64 { + t.Helper() + idx, err := s.State().LatestIndex() + must.NoError(t, err) + return idx + 1 + } + + // job helpers + deleteJob := func(t *testing.T, job *structs.Job) { + t.Helper() + err := s.State().DeleteJob(incIdx(t), job.Namespace, job.ID) + if err != nil && err.Error() == "job not found" { + return + } + must.NoError(t, err) + } + upsertJob := func(t *testing.T, job *structs.Job) { + t.Helper() + err := s.State().UpsertJob(structs.MsgTypeTestSetup, incIdx(t), nil, job) + must.NoError(t, err) + } + createJob := func(t *testing.T, id string) (job *structs.Job, cleanup func()) { + t.Helper() + job = mock.MinJob() + if id != "" { + job.ID = id + } + upsertJob(t, job) + cleanup = func() { + deleteJob(t, job) + } + t.Cleanup(cleanup) + return job, cleanup + } + + // this little cutie sets the latest state index to a predictable value, + // to ensure the below jobs span the boundary from 999->1000 which would + // break pagination without proper uint64 NextToken (ModifyIndex) comparison + must.NoError(t, s.State().UpsertNamespaces(996, nil)) + + // set up some jobs + // they should be in this order in state using the "modify_index" index, + // but the RPC will return them in reverse order by default. + jobs := make([]*structs.Job, 5) + var deleteJob0, deleteJob1, deleteJob2 func() + jobs[0], deleteJob0 = createJob(t, "job0") + jobs[1], deleteJob1 = createJob(t, "job1") + jobs[2], deleteJob2 = createJob(t, "job2") + jobs[3], _ = createJob(t, "job3") + jobs[4], _ = createJob(t, "job4") + + // request all jobs + resp := doRequest(t, &structs.JobStatusesRequest{}) + must.Len(t, 5, resp.Jobs) + + // make sure our state order assumption is correct + for i, j := range resp.Jobs { + reverse := len(jobs) - i - 1 + must.Eq(t, jobs[reverse].ID, j.ID, must.Sprintf("jobs not in order; idx=%d", i)) + } + + // test various single-job requests + + for _, tc := range []struct { + name string + qo structs.QueryOptions + jobs []structs.NamespacedID + expect *structs.Job + expectNext uint64 // NextToken (ModifyIndex) + }{ + { + name: "page 1", + qo: structs.QueryOptions{ + PerPage: 1, + }, + expect: jobs[4], + expectNext: jobs[3].ModifyIndex, + }, + { + name: "page 2", + qo: structs.QueryOptions{ + PerPage: 1, + NextToken: strconv.FormatUint(jobs[3].ModifyIndex, 10), + }, + expect: jobs[3], + expectNext: jobs[2].ModifyIndex, + }, + { + name: "reverse", + qo: structs.QueryOptions{ + PerPage: 1, + Reverse: true, + }, + expect: jobs[0], + expectNext: jobs[1].ModifyIndex, + }, + { + name: "filter", + qo: structs.QueryOptions{ + Filter: "ID == " + jobs[0].ID, + }, + expect: jobs[0], + }, + { + name: "specific", + jobs: []structs.NamespacedID{ + jobs[0].NamespacedID(), + }, + expect: jobs[0], + }, + { + name: "missing", + jobs: []structs.NamespacedID{ + { + ID: "do-not-exist", + Namespace: "anywhere", + }, + }, + expect: nil, + }, + } { + t.Run(tc.name, func(t *testing.T) { + resp = doRequest(t, &structs.JobStatusesRequest{ + QueryOptions: tc.qo, + Jobs: tc.jobs, + }) + if tc.expect == nil { + must.Len(t, 0, resp.Jobs, must.Sprint("expect no jobs")) + } else { + must.Len(t, 1, resp.Jobs, must.Sprint("expect only one job")) + must.Eq(t, tc.expect.ID, resp.Jobs[0].ID) + } + expectToken := "" + if tc.expectNext > 0 { + expectToken = strconv.FormatUint(tc.expectNext, 10) + } + must.Eq(t, expectToken, resp.NextToken) + }) + } + + // test blocking queries + + // this endpoint should only unblock if something relevant changes. + // "something relevant" is why this seemingly redundant blocking-query + // testing is done here, as the logic to determine what is "relevant" is + // specific to this endpoint, meaning the latest ModifyIndex on each + // job/alloc/deployment seen while iterating, i.e. those "on-page". + + // blocking query helpers + startQuery := func(t *testing.T, req *structs.JobStatusesRequest) context.Context { + t.Helper() + if req == nil { + req = &structs.JobStatusesRequest{} + } + // context to signal when the query unblocks + // mustBlock and mustUnblock below work by checking ctx.Done() + ctx, cancel := context.WithCancel(context.Background()) + // default latest index to induce blocking + if req.QueryOptions.MinQueryIndex == 0 { + idx, err := s.State().LatestIndex() + must.NoError(t, err) + req.QueryOptions.MinQueryIndex = idx + } + // start the query + // note: queries that are expected to remain blocked leak this goroutine + // unless some other test (or cleanup) coincidentally unblocks it + go func() { + resp = doRequest(t, req) + cancel() + }() + // give it a moment for the rpc to actually start up and begin blocking + // FLAKE ALERT: if this job is flaky, this might be why. + time.Sleep(time.Millisecond * 100) + return ctx + } + mustBlock := func(t *testing.T, ctx context.Context) { + t.Helper() + timer := time.NewTimer(time.Millisecond * 200) + defer timer.Stop() + select { + case <-ctx.Done(): + t.Fatal("query should be blocked") + case <-timer.C: + } + } + mustUnblock := func(t *testing.T, ctx context.Context) { + t.Helper() + timer := time.NewTimer(time.Millisecond * 200) + defer timer.Stop() + select { + case <-ctx.Done(): + case <-timer.C: + t.Fatal("query should have unblocked") + } + } + + // alloc and deployment helpers + createAlloc := func(t *testing.T, job *structs.Job) { + t.Helper() + a := mock.MinAllocForJob(job) + must.NoError(t, + s.State().UpsertAllocs(structs.AllocUpdateRequestType, incIdx(t), []*structs.Allocation{a}), + must.Sprintf("error creating alloc for job %s", job.ID)) + t.Cleanup(func() { + test.NoError(t, s.State().DeleteEval(incIdx(t), []string{}, []string{a.ID}, false)) + }) + } + createDeployment := func(t *testing.T, job *structs.Job) { + t.Helper() + deploy := mock.Deployment() + deploy.JobID = job.ID + must.NoError(t, s.State().UpsertDeployment(incIdx(t), deploy)) + t.Cleanup(func() { + test.NoError(t, s.State().DeleteDeployment(incIdx(t), []string{deploy.ID})) + }) + } + + // these must be run in order, as they affect outer-scope state. + + for _, tc := range []struct { + name string + watch *structs.Job // optional specific job to query + run func(*testing.T) // run after starting the blocking query + check func(*testing.T, context.Context) // mustBlock or mustUnblock + }{ + { + name: "get all jobs", + check: mustBlock, + }, + { + name: "delete job", + run: func(_ *testing.T) { + deleteJob0() + }, + check: mustUnblock, + }, + { + name: "change job", + run: func(t *testing.T) { + jobs[1].Name = "job1-new-name" + upsertJob(t, jobs[1]) + }, + check: mustUnblock, + }, + { + name: "new job", + run: func(t *testing.T) { + createJob(t, "new1") + }, + check: mustUnblock, + }, + { + name: "delete job off page", + watch: jobs[2], + run: func(_ *testing.T) { + deleteJob1() + }, + check: mustBlock, + }, + { + name: "delete job on page", + watch: jobs[2], + run: func(_ *testing.T) { + deleteJob2() + }, + check: mustUnblock, + }, + { + name: "new alloc on page", + watch: jobs[3], + run: func(t *testing.T) { + createAlloc(t, jobs[3]) + }, + check: mustUnblock, + }, + { + name: "new alloc off page", + watch: jobs[3], + run: func(t *testing.T) { + createAlloc(t, jobs[4]) + }, + check: mustBlock, + }, + { + name: "new deployment on page", + watch: jobs[3], + run: func(t *testing.T) { + createDeployment(t, jobs[3]) + }, + check: mustUnblock, + }, + { + name: "new deployment off page", + watch: jobs[3], + run: func(t *testing.T) { + createDeployment(t, jobs[4]) + }, + check: mustBlock, + }, + } { + t.Run(tc.name, func(t *testing.T) { + req := &structs.JobStatusesRequest{} + if tc.watch != nil { + req.Jobs = []structs.NamespacedID{tc.watch.NamespacedID()} + } + ctx := startQuery(t, req) + if tc.run != nil { + tc.run(t) + } + tc.check(t, ctx) + }) + } +} diff --git a/nomad/mock/alloc.go b/nomad/mock/alloc.go index bfdd6ba35..170c3b54b 100644 --- a/nomad/mock/alloc.go +++ b/nomad/mock/alloc.go @@ -87,7 +87,10 @@ func Alloc() *structs.Allocation { } func MinAlloc() *structs.Allocation { - job := MinJob() + return MinAllocForJob(MinJob()) +} + +func MinAllocForJob(job *structs.Job) *structs.Allocation { group := job.TaskGroups[0] task := group.Tasks[0] return &structs.Allocation{ @@ -95,6 +98,8 @@ func MinAlloc() *structs.Allocation { EvalID: uuid.Generate(), NodeID: uuid.Generate(), Job: job, + JobID: job.ID, + Namespace: job.Namespace, TaskGroup: group.Name, ClientStatus: structs.AllocClientStatusPending, DesiredStatus: structs.AllocDesiredStatusRun, diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 93c77d0e4..e27261bf1 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -254,6 +254,15 @@ func jobTableSchema() *memdb.TableSchema { Field: "NodePool", }, }, + // ModifyIndex allows sorting by last-changed + "modify_index": { + Name: "modify_index", + AllowMissing: false, + Unique: true, + Indexer: &memdb.UintFieldIndex{ + Field: "ModifyIndex", + }, + }, }, } } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 31d04cc6d..277f393ae 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2450,6 +2450,20 @@ func (s *StateStore) JobsByPool(ws memdb.WatchSet, pool string) (memdb.ResultIte return iter, nil } +// JobsByModifyIndex returns an iterator over all jobs, sorted by ModifyIndex. +func (s *StateStore) JobsByModifyIndex(ws memdb.WatchSet, sort SortOption) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + iter, err := getSorted(txn, sort, "jobs", "modify_index") + if err != nil { + return nil, err + } + + ws.Add(iter.WatchCh()) + + return iter, nil +} + // JobSummaryByID returns a job summary object which matches a specific id. func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) (*structs.JobSummary, error) { txn := s.db.ReadTxn() diff --git a/nomad/structs/job.go b/nomad/structs/job.go index 8eb69b917..e57ef5c2e 100644 --- a/nomad/structs/job.go +++ b/nomad/structs/job.go @@ -16,6 +16,75 @@ const ( JobServiceRegistrationsRPCMethod = "Job.GetServiceRegistrations" ) +// JobStatusesRequest is used on the Job.Statuses RPC endpoint +// to get job/alloc/deployment status for jobs. +type JobStatusesRequest struct { + // Jobs may be optionally provided to request a subset of specific jobs. + Jobs []NamespacedID + // IncludeChildren will include child (batch) jobs in the response. + IncludeChildren bool + QueryOptions +} + +// JobStatusesResponse is the response from Job.Statuses RPC endpoint. +type JobStatusesResponse struct { + Jobs []JobStatusesJob + QueryMeta +} + +// JobStatusesJob collates information about a Job, its Allocation(s), +// and latest Deployment. +type JobStatusesJob struct { + NamespacedID + Name string + Type string + NodePool string + Datacenters []string + Priority int + Version uint64 + SubmitTime int64 + ModifyIndex uint64 + // Allocs contains information about current allocations + Allocs []JobStatusesAlloc + // GroupCountSum is the sum of all group{count=X} values, + // can be compared against number of running allocs to determine + // overall health for "service" jobs. + GroupCountSum int + // ChildStatuses contains the statuses of child (batch) jobs + ChildStatuses []string + // ParentID is set on child (batch) jobs, specifying the parent job ID + ParentID string + LatestDeployment *JobStatusesLatestDeployment +} + +// JobStatusesAlloc contains a subset of Allocation info. +type JobStatusesAlloc struct { + ID string + Group string + ClientStatus string + NodeID string + DeploymentStatus JobStatusesDeployment + JobVersion uint64 + FollowupEvalID string +} + +// JobStatusesDeployment contains a subset of AllocDeploymentStatus info. +type JobStatusesDeployment struct { + Canary bool + Healthy *bool +} + +// JobStatusesLatestDeployment contains a subset of the latest Deployment. +type JobStatusesLatestDeployment struct { + ID string + IsActive bool + JobVersion uint64 + Status string + StatusDescription string + AllAutoPromote bool + RequiresPromotion bool +} + // JobServiceRegistrationsRequest is the request object used to list all // service registrations belonging to the specified Job.ID. type JobServiceRegistrationsRequest struct {