From cf87a556b3c7544f774feabb997935162d596a1f Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Fri, 3 May 2024 15:01:40 -0500 Subject: [PATCH] api: new /v1/jobs/statuses endpoint for /ui/jobs page (#20130) introduce a new API /v1/jobs/statuses, primarily for use in the UI, which collates info about jobs, their allocations, and latest deployment. currently the UI gets *all* of /v1/jobs and sorts and paginates them client-side in the browser, and its "summary" column is based on historical summary data (which can be visually misleading, and sometimes scary when a job has failed at some point in the not-yet-garbage-collected past). this does pagination and filtering and such, and returns jobs sorted by ModifyIndex, so latest-changed jobs still come first. it pulls allocs and latest deployment straight out of current state for more a more robust, holistic view of the job status. it is less efficient per-job, due to the extra state lookups, but should be more efficient per-page (excepting perhaps for job(s) with very-many allocs). if a POST body is sent like `{"jobs": [{"namespace": "cool-ns", "id": "cool-job"}]}`, then the response will be limited to that subset of jobs. the main goal here is to prevent "jostling" the user in the UI when jobs come into and out of existence. and if a blocking query is started with `?index=N`, then the query should only unblock if jobs "on page" change, rather than any change to any of the state tables being queried ("jobs", "allocs", and "deployment"), to save unnecessary HTTP round trips. --- .changelog/20130.txt | 3 + api/jobs.go | 9 + api/namespace.go | 9 + command/agent/http.go | 1 + command/agent/job_endpoint_statuses.go | 79 ++++ command/agent/job_endpoint_statuses_test.go | 247 +++++++++++++ nomad/job_endpoint_statuses.go | 303 ++++++++++++++++ nomad/job_endpoint_statuses_test.go | 382 ++++++++++++++++++++ nomad/mock/alloc.go | 7 +- nomad/state/schema.go | 9 + nomad/state/state_store.go | 14 + nomad/structs/job.go | 69 ++++ 12 files changed, 1131 insertions(+), 1 deletion(-) create mode 100644 .changelog/20130.txt create mode 100644 command/agent/job_endpoint_statuses.go create mode 100644 command/agent/job_endpoint_statuses_test.go create mode 100644 nomad/job_endpoint_statuses.go create mode 100644 nomad/job_endpoint_statuses_test.go diff --git a/.changelog/20130.txt b/.changelog/20130.txt new file mode 100644 index 000000000..1488def61 --- /dev/null +++ b/.changelog/20130.txt @@ -0,0 +1,3 @@ +```release-note:improvement +api: new /v1/jobs/statuses endpoint collates details about jobs' allocs and latest deployment, intended for use in the updated UI jobs index page +``` diff --git a/api/jobs.go b/api/jobs.go index 3bd60c4ef..407b13568 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -1544,3 +1544,12 @@ func (j *Jobs) ActionExec(ctx context.Context, return s.run(ctx) } + +// JobStatusesRequest is used to get statuses for jobs, +// their allocations and deployments. +type JobStatusesRequest struct { + // Jobs may be optionally provided to request a subset of specific jobs. + Jobs []NamespacedID + // IncludeChildren will include child (batch) jobs in the response. + IncludeChildren bool +} diff --git a/api/namespace.go b/api/namespace.go index cbe5849f5..bc12ec77d 100644 --- a/api/namespace.go +++ b/api/namespace.go @@ -156,3 +156,12 @@ func (n NamespaceIndexSort) Less(i, j int) bool { func (n NamespaceIndexSort) Swap(i, j int) { n[i], n[j] = n[j], n[i] } + +// NamespacedID is used for things that are unique only per-namespace, +// such as jobs. +type NamespacedID struct { + // Namespace is the Name of the Namespace + Namespace string + // ID is the ID of the namespaced object (e.g. Job ID) + ID string +} diff --git a/command/agent/http.go b/command/agent/http.go index c08d4fdf5..77394e8d8 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -382,6 +382,7 @@ func (s *HTTPServer) ResolveToken(req *http.Request) (*acl.ACL, error) { func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/jobs", s.wrap(s.JobsRequest)) s.mux.HandleFunc("/v1/jobs/parse", s.wrap(s.JobsParseRequest)) + s.mux.HandleFunc("/v1/jobs/statuses", s.wrap(s.JobStatusesRequest)) s.mux.HandleFunc("/v1/job/", s.wrap(s.JobSpecificRequest)) s.mux.HandleFunc("/v1/nodes", s.wrap(s.NodesRequest)) diff --git a/command/agent/job_endpoint_statuses.go b/command/agent/job_endpoint_statuses.go new file mode 100644 index 000000000..5bd6f4d36 --- /dev/null +++ b/command/agent/job_endpoint_statuses.go @@ -0,0 +1,79 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package agent + +import ( + "fmt" + "net/http" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/nomad/structs" +) + +// JobStatusesRequest looks up the status of jobs' allocs and deployments, +// primarily for use in the UI on the /ui/jobs index page. +func (s *HTTPServer) JobStatusesRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + var out structs.JobStatusesResponse + args := structs.JobStatusesRequest{} + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + switch req.Method { + case http.MethodGet, http.MethodPost: + break + default: + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) + } + + if includeChildren, err := parseBool(req, "include_children"); err != nil { + return nil, CodedError(http.StatusBadRequest, err.Error()) + } else if includeChildren != nil { + args.IncludeChildren = *includeChildren + } + + // ostensibly GETs should not accept structured body, but the HTTP spec + // on this is more what you'd call "guidelines" than actual rules. + if req.Body != nil && req.Body != http.NoBody { + var in api.JobStatusesRequest + if err := decodeBody(req, &in); err != nil { + return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("error decoding request: %v", err)) + } + if len(in.Jobs) == 0 { + return nil, CodedError(http.StatusBadRequest, "no jobs in request") + } + + // each job has a separate namespace, so in case the NSes are mixed, + // default to wildcard. + // if all requested jobs turn out to have the same namespace, + // then the RPC endpoint will notice that and override this anyway. + if args.QueryOptions.Namespace == structs.DefaultNamespace { + args.QueryOptions.Namespace = structs.AllNamespacesSentinel + } + + args.Jobs = make([]structs.NamespacedID, len(in.Jobs)) + for i, j := range in.Jobs { + if j.Namespace == "" { + j.Namespace = structs.DefaultNamespace + } + args.Jobs[i] = structs.NamespacedID{ + ID: j.ID, + Namespace: j.Namespace, + } + } + + // not a direct assignment, because if it is false (default), + // it could override the "include_children" query param. + if in.IncludeChildren { + args.IncludeChildren = true + } + } + + if err := s.agent.RPC("Job.Statuses", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + return out.Jobs, nil +} diff --git a/command/agent/job_endpoint_statuses_test.go b/command/agent/job_endpoint_statuses_test.go new file mode 100644 index 000000000..88ba07833 --- /dev/null +++ b/command/agent/job_endpoint_statuses_test.go @@ -0,0 +1,247 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package agent + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test" + "github.com/shoenig/test/must" +) + +func TestJobEndpoint_Statuses(t *testing.T) { + ci.Parallel(t) + httpTest(t, cb, func(s *TestAgent) { + apiPath := "/v1/jobs/statuses" + + parent := mock.MinJob() + parent.ID = "parent" + child := mock.MinJob() + child.ID = "parent/child" + child.ParentID = "parent" + otherNS := mock.MinJob() + otherNS.ID = "otherNS" + otherNS.Namespace = "other" + + // lil helpers + registerJob := func(t *testing.T, job *structs.Job) { + must.NoError(t, s.Agent.RPC("Job.Register", + &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + }, &structs.JobRegisterResponse{}), + ) + } + createNamespace := func(t *testing.T, ns string) { + must.NoError(t, s.Agent.RPC("Namespace.UpsertNamespaces", + &structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{{ + Name: ns, + }}, + WriteRequest: structs.WriteRequest{Region: "global"}, + }, &structs.GenericResponse{})) + } + buildRequest := func(t *testing.T, method, url, body string) *http.Request { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Cleanup(cancel) + var reqBody io.Reader = http.NoBody + if body != "" { + reqBody = bytes.NewReader([]byte(body)) + } + req, err := http.NewRequestWithContext(ctx, method, url, reqBody) + must.NoError(t, err) + return req + } + + // note: this api will return jobs ordered by ModifyIndex, + // so in reverse order of their creation here. + registerJob(t, parent) + registerJob(t, child) + createNamespace(t, otherNS.Namespace) + registerJob(t, otherNS) + + testCases := []struct { + name string + + // request + method, params, body string + + // response + expectCode int + expectErr string + expectIDs []string + expectHeaders []string + }{ + { + name: "bad method", method: "LOL", + expectCode: 405, expectErr: ErrInvalidMethod, + }, + { + name: "bad request param", + params: "?include_children=not-a-bool", + expectCode: 400, expectErr: `Failed to parse value of "include_children"`, + }, + + { + name: "get ok", + expectIDs: []string{"parent"}, + }, + { + name: "get all namespaces", + params: "?namespace=*", + expectIDs: []string{"otherNS", "parent"}, + }, + { + name: "get all reverse", + params: "?namespace=*&reverse=true", + expectIDs: []string{"parent", "otherNS"}, + }, + { + name: "get one page", + params: "?namespace=*&per_page=1", + expectIDs: []string{"otherNS"}, + expectHeaders: []string{"X-Nomad-NextToken"}, + }, + { + name: "get children", + params: "?include_children=true", + expectIDs: []string{"parent/child", "parent"}, + }, + { + name: "get children filter", + // this is how the UI does parent job pages + params: "?include_children=true&filter=ParentID == parent", + expectIDs: []string{"parent/child"}, + }, + + // POST and GET are interchangeable, but by convention, the UI will + // POST when sending a request body, so here we test like that too. + { + name: "post no jobs", + method: "POST", + body: `{"jobs": []}`, + expectCode: 400, expectErr: "no jobs in request", + }, + { + name: "post bad body", + method: "POST", body: "{malformed", + expectCode: 400, expectErr: "error decoding request: invalid character 'm'", + }, + { + name: "post nonexistent job", + method: "POST", + body: `{"jobs": [{"id": "whatever", "namespace": "nope"}]}`, + expectIDs: []string{}, + }, + { + name: "post single job", + method: "POST", + body: `{"jobs": [{"id": "parent"}]}`, + expectIDs: []string{"parent"}, + }, + { + name: "post all namespaces", + method: "POST", + // no ?namespace param required, because we default to "*" + // if there is a request body (and ns query is "default") + body: `{"jobs": [{"id": "parent"}, {"id": "otherNS", "namespace": "other"}]}`, + expectIDs: []string{"otherNS", "parent"}, + }, + { + name: "post auto namespace", + method: "POST", + // namespace gets overridden by the RPC endpoint, + // because jobs in the request body are all one namespace. + params: "?namespace=nope", + body: `{"jobs": [{"id": "parent", "namespace": "default"}]}`, + expectIDs: []string{"parent"}, + }, + { + name: "post auto namespaces other", + method: "POST", + // "other" namespace should be auto-detected, as it's the only one + body: `{"jobs": [{"id": "otherNS", "namespace": "other"}]}`, + expectIDs: []string{"otherNS"}, + }, + { + name: "post wrong namespace param", + method: "POST", + params: "?namespace=nope", + // namespace can not be auto-detected, since there are two here, + // so it uses the provided param + body: `{"jobs": [{"id": "parent"}, {"id": "otherNS", "namespace": "other"}]}`, + expectIDs: []string{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // default happy path values + if tc.method == "" { + tc.method = "GET" + } + if tc.expectCode == 0 { + tc.expectCode = 200 + } + + req := buildRequest(t, tc.method, apiPath+tc.params, tc.body) + recorder := httptest.NewRecorder() + + // method under test! + raw, err := s.Server.JobStatusesRequest(recorder, req) + + // sad path + if tc.expectErr != "" { + must.ErrorContains(t, err, tc.expectErr) + var coded *codedError + must.True(t, errors.As(err, &coded)) + must.Eq(t, tc.expectCode, coded.code) + + must.Nil(t, raw) + return + } + + // happy path + must.NoError(t, err) + result := recorder.Result() + must.Eq(t, tc.expectCode, result.StatusCode) + + // check response body + jobs := raw.([]structs.JobStatusesJob) + gotIDs := make([]string, len(jobs)) + for i, j := range jobs { + gotIDs[i] = j.ID + } + must.Eq(t, tc.expectIDs, gotIDs) + + // check headers + expectHeaders := append( + []string{ + "X-Nomad-Index", + "X-Nomad-Lastcontact", + "X-Nomad-Knownleader", + }, + tc.expectHeaders..., + ) + for _, h := range expectHeaders { + test.NotEq(t, "", result.Header.Get(h), + test.Sprintf("expect '%s' header", h)) + } + }) + } + }) +} diff --git a/nomad/job_endpoint_statuses.go b/nomad/job_endpoint_statuses.go new file mode 100644 index 000000000..18da2e5f3 --- /dev/null +++ b/nomad/job_endpoint_statuses.go @@ -0,0 +1,303 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package nomad + +import ( + "errors" + "net/http" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-set/v2" + "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/state/paginator" + "github.com/hashicorp/nomad/nomad/structs" +) + +// Statuses looks up info about jobs, their allocs, and latest deployment. +func (j *Job) Statuses( + args *structs.JobStatusesRequest, + reply *structs.JobStatusesResponse) error { + + authErr := j.srv.Authenticate(j.ctx, args) + if done, err := j.srv.forward("Job.Statuses", args, args, reply); done { + return err + } + j.srv.MeasureRPCRate("job", structs.RateMetricList, args) + if authErr != nil { + return structs.ErrPermissionDenied + } + defer metrics.MeasureSince([]string{"nomad", "jobs", "statuses"}, time.Now()) + + namespace := args.RequestNamespace() + // the namespace from the UI by default is "*", but if specific jobs are + // requested, all with the same namespace, AllowNsOp() below may be able + // to quickly deny the request if the token lacks permissions for that ns, + // rather than iterating the whole jobs table and filtering out every job. + if len(args.Jobs) > 0 { + nses := set.New[string](1) + for _, j := range args.Jobs { + nses.Insert(j.Namespace) + } + if nses.Size() == 1 { + namespace = nses.Slice()[0] + } + } + + // check for read-job permissions, since this endpoint includes alloc info + // and possibly a deployment ID, and those APIs require read-job. + aclObj, err := j.srv.ResolveACL(args) + if err != nil { + return err + } + if !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob) { + return structs.ErrPermissionDenied + } + allow := aclObj.AllowNsOpFunc(acl.NamespaceCapabilityReadJob) + + store := j.srv.State() + + // get the namespaces the user is allowed to access. + allowableNamespaces, err := allowedNSes(aclObj, store, allow) + if errors.Is(err, structs.ErrPermissionDenied) { + // return empty jobs if token isn't authorized for any + // namespace, matching other endpoints + reply.Jobs = make([]structs.JobStatusesJob, 0) + return nil + } else if err != nil { + return err + } + // since the state index we're using doesn't include namespace, + // explicitly add the user-provided ns to our filter if needed. + // (allowableNamespaces will be nil if the caller sent a mgmt token) + if allowableNamespaces == nil && + namespace != "" && + namespace != structs.AllNamespacesSentinel { + allowableNamespaces = map[string]bool{ + namespace: true, + } + } + + // compare between state run() unblocks to see if the RPC, as a whole, + // should unblock. i.e. if new jobs shift the page, or when jobs go away. + prevJobs := set.New[structs.NamespacedID](0) + + // because the state index is in order of ModifyIndex, lowest to highest, + // SortDefault would show oldest jobs first, so instead invert the default + // to show most recent job changes first. + args.QueryOptions.Reverse = !args.QueryOptions.Reverse + sort := state.QueryOptionSort(args.QueryOptions) + + // special blocking note: this endpoint employs an unconventional method + // of determining the reply.Index in order to avoid unblocking when + // something changes "off page" -- instead of using the latest index + // from any/all of the state tables queried here (all of: "jobs", "allocs", + // "deployments"), we use the highest ModifyIndex of all items encountered + // while iterating. + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, state *state.StateStore) error { + var err error + var iter memdb.ResultIterator + + // the UI jobs index page shows most-recently changed first. + iter, err = state.JobsByModifyIndex(ws, sort) + if err != nil { + return err + } + + // set up tokenizer and filters + tokenizer := paginator.NewStructsTokenizer( + iter, + paginator.StructsTokenizerOptions{ + OnlyModifyIndex: true, + }, + ) + filters := []paginator.Filter{ + paginator.NamespaceFilter{ + AllowableNamespaces: allowableNamespaces, + }, + // skip child jobs unless requested to include them + paginator.GenericFilter{Allow: func(i interface{}) (bool, error) { + if args.IncludeChildren { + return true, nil + } + job := i.(*structs.Job) + return job.ParentID == "", nil + }}, + } + // only provide specific jobs if requested. + if len(args.Jobs) > 0 { + // set per-page to avoid iterating the whole table + args.QueryOptions.PerPage = int32(len(args.Jobs)) + // filter in the requested jobs + jobSet := set.From[structs.NamespacedID](args.Jobs) + filters = append(filters, paginator.GenericFilter{ + Allow: func(i interface{}) (bool, error) { + job := i.(*structs.Job) + return jobSet.Contains(job.NamespacedID()), nil + }, + }) + } + + jobs := make([]structs.JobStatusesJob, 0) + newJobs := set.New[structs.NamespacedID](0) + pager, err := paginator.NewPaginator(iter, tokenizer, filters, args.QueryOptions, + func(raw interface{}) error { + job := raw.(*structs.Job) + + // this is where the sausage is made + jsj, highestIndexOnPage, err := jobStatusesJobFromJob(ws, state, job) + if err != nil { + return err + } + + jobs = append(jobs, jsj) + newJobs.Insert(job.NamespacedID()) + + // by using the highest index we find on any job/alloc/ + // deployment among the jobs on the page, instead of the + // latest index for any particular state table, we can + // avoid unblocking the RPC if something changes "off page" + if highestIndexOnPage > reply.Index { + reply.Index = highestIndexOnPage + } + return nil + }) + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusInternalServerError, "failed to create result paginator: %v", err) + } + + nextToken, err := pager.Page() + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusInternalServerError, "failed to read result page: %v", err) + } + + // if the page has updated, or a job has gone away, + // bump the index to latest jobs entry. + if !prevJobs.Empty() && !newJobs.Equal(prevJobs) { + reply.Index, err = state.Index("jobs") + if err != nil { + return err + } + } + prevJobs = newJobs + + reply.QueryMeta.NextToken = nextToken + reply.Jobs = jobs + + return nil + }} + return j.srv.blockingRPC(&opts) +} + +func jobStatusesJobFromJob(ws memdb.WatchSet, store *state.StateStore, job *structs.Job) (structs.JobStatusesJob, uint64, error) { + highestIdx := job.ModifyIndex + + jsj := structs.JobStatusesJob{ + NamespacedID: structs.NamespacedID{ + ID: job.ID, + Namespace: job.Namespace, + }, + Name: job.Name, + Type: job.Type, + NodePool: job.NodePool, + Datacenters: job.Datacenters, + Priority: job.Priority, + Version: job.Version, + ParentID: job.ParentID, + SubmitTime: job.SubmitTime, + ModifyIndex: job.ModifyIndex, + // included here for completeness, populated below. + Allocs: nil, + GroupCountSum: 0, + ChildStatuses: nil, + LatestDeployment: nil, + } + + // the GroupCountSum will map to how many allocations we expect to run + // (for service jobs) + for _, tg := range job.TaskGroups { + jsj.GroupCountSum += tg.Count + } + + // collect the statuses of child jobs + if job.IsParameterized() || job.IsPeriodic() { + jsj.ChildStatuses = make([]string, 0) // set to not-nil + children, err := store.JobsByIDPrefix(ws, job.Namespace, job.ID, state.SortDefault) + if err != nil { + return jsj, highestIdx, err + } + for { + child := children.Next() + if child == nil { + break + } + j := child.(*structs.Job) + // note: this filters out grandchildren jobs (children of children) + if j.ParentID != job.ID { + continue + } + if j.ModifyIndex > highestIdx { + highestIdx = j.ModifyIndex + } + jsj.ChildStatuses = append(jsj.ChildStatuses, j.Status) + } + // no allocs or deployments for parameterized/period jobs, + // so we're done here. + return jsj, highestIdx, err + } + + // collect info about allocations + allocs, err := store.AllocsByJob(ws, job.Namespace, job.ID, true) + if err != nil { + return jsj, highestIdx, err + } + for _, a := range allocs { + jsa := structs.JobStatusesAlloc{ + ID: a.ID, + Group: a.TaskGroup, + ClientStatus: a.ClientStatus, + NodeID: a.NodeID, + JobVersion: a.Job.Version, + FollowupEvalID: a.FollowupEvalID, + } + if a.DeploymentStatus != nil { + jsa.DeploymentStatus.Canary = a.DeploymentStatus.IsCanary() + jsa.DeploymentStatus.Healthy = a.DeploymentStatus.Healthy + } + jsj.Allocs = append(jsj.Allocs, jsa) + + if a.ModifyIndex > highestIdx { + highestIdx = a.ModifyIndex + } + } + + // look for latest deployment + deploy, err := store.LatestDeploymentByJobID(ws, job.Namespace, job.ID) + if err != nil { + return jsj, highestIdx, err + } + if deploy != nil { + jsj.LatestDeployment = &structs.JobStatusesLatestDeployment{ + ID: deploy.ID, + IsActive: deploy.Active(), + JobVersion: deploy.JobVersion, + Status: deploy.Status, + StatusDescription: deploy.StatusDescription, + AllAutoPromote: deploy.HasAutoPromote(), + RequiresPromotion: deploy.RequiresPromotion(), + } + + if deploy.ModifyIndex > highestIdx { + highestIdx = deploy.ModifyIndex + } + } + return jsj, highestIdx, nil +} diff --git a/nomad/job_endpoint_statuses_test.go b/nomad/job_endpoint_statuses_test.go new file mode 100644 index 000000000..c5b3367d8 --- /dev/null +++ b/nomad/job_endpoint_statuses_test.go @@ -0,0 +1,382 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package nomad + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test" + "github.com/shoenig/test/must" +) + +func TestJob_Statuses_ACL(t *testing.T) { + s, _, cleanup := TestACLServer(t, nil) + t.Cleanup(cleanup) + testutil.WaitForLeader(t, s.RPC) + + insufficientToken := mock.CreatePolicyAndToken(t, s.State(), 1, "job-lister", + mock.NamespacePolicy("default", "", []string{"list-jobs"})) + happyToken := mock.CreatePolicyAndToken(t, s.State(), 2, "job-reader", + mock.NamespacePolicy("default", "", []string{"read-job"})) + + for _, tc := range []struct { + name, token, err string + }{ + {"no token", "", "Permission denied"}, + {"insufficient perms", insufficientToken.SecretID, "Permission denied"}, + {"happy token", happyToken.SecretID, ""}, + } { + t.Run(tc.name, func(t *testing.T) { + req := &structs.JobStatusesRequest{} + req.QueryOptions.Region = "global" + req.QueryOptions.AuthToken = tc.token + + var resp structs.JobStatusesResponse + err := s.RPC("Job.Statuses", &req, &resp) + + if tc.err != "" { + must.ErrorContains(t, err, tc.err) + } else { + must.NoError(t, err) + } + }) + } +} + +func TestJob_Statuses(t *testing.T) { + s, cleanup := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + t.Cleanup(cleanup) + testutil.WaitForLeader(t, s.RPC) + + // method under test + doRequest := func(t *testing.T, req *structs.JobStatusesRequest) (resp structs.JobStatusesResponse) { + t.Helper() + must.NotNil(t, req, must.Sprint("request must not be nil")) + req.QueryOptions.Region = "global" + must.NoError(t, s.RPC("Job.Statuses", req, &resp)) + return resp + } + + // increment state index helper + incIdx := func(t *testing.T) uint64 { + t.Helper() + idx, err := s.State().LatestIndex() + must.NoError(t, err) + return idx + 1 + } + + // job helpers + deleteJob := func(t *testing.T, job *structs.Job) { + t.Helper() + err := s.State().DeleteJob(incIdx(t), job.Namespace, job.ID) + if err != nil && err.Error() == "job not found" { + return + } + must.NoError(t, err) + } + upsertJob := func(t *testing.T, job *structs.Job) { + t.Helper() + err := s.State().UpsertJob(structs.MsgTypeTestSetup, incIdx(t), nil, job) + must.NoError(t, err) + } + createJob := func(t *testing.T, id string) (job *structs.Job, cleanup func()) { + t.Helper() + job = mock.MinJob() + if id != "" { + job.ID = id + } + upsertJob(t, job) + cleanup = func() { + deleteJob(t, job) + } + t.Cleanup(cleanup) + return job, cleanup + } + + // this little cutie sets the latest state index to a predictable value, + // to ensure the below jobs span the boundary from 999->1000 which would + // break pagination without proper uint64 NextToken (ModifyIndex) comparison + must.NoError(t, s.State().UpsertNamespaces(996, nil)) + + // set up some jobs + // they should be in this order in state using the "modify_index" index, + // but the RPC will return them in reverse order by default. + jobs := make([]*structs.Job, 5) + var deleteJob0, deleteJob1, deleteJob2 func() + jobs[0], deleteJob0 = createJob(t, "job0") + jobs[1], deleteJob1 = createJob(t, "job1") + jobs[2], deleteJob2 = createJob(t, "job2") + jobs[3], _ = createJob(t, "job3") + jobs[4], _ = createJob(t, "job4") + + // request all jobs + resp := doRequest(t, &structs.JobStatusesRequest{}) + must.Len(t, 5, resp.Jobs) + + // make sure our state order assumption is correct + for i, j := range resp.Jobs { + reverse := len(jobs) - i - 1 + must.Eq(t, jobs[reverse].ID, j.ID, must.Sprintf("jobs not in order; idx=%d", i)) + } + + // test various single-job requests + + for _, tc := range []struct { + name string + qo structs.QueryOptions + jobs []structs.NamespacedID + expect *structs.Job + expectNext uint64 // NextToken (ModifyIndex) + }{ + { + name: "page 1", + qo: structs.QueryOptions{ + PerPage: 1, + }, + expect: jobs[4], + expectNext: jobs[3].ModifyIndex, + }, + { + name: "page 2", + qo: structs.QueryOptions{ + PerPage: 1, + NextToken: strconv.FormatUint(jobs[3].ModifyIndex, 10), + }, + expect: jobs[3], + expectNext: jobs[2].ModifyIndex, + }, + { + name: "reverse", + qo: structs.QueryOptions{ + PerPage: 1, + Reverse: true, + }, + expect: jobs[0], + expectNext: jobs[1].ModifyIndex, + }, + { + name: "filter", + qo: structs.QueryOptions{ + Filter: "ID == " + jobs[0].ID, + }, + expect: jobs[0], + }, + { + name: "specific", + jobs: []structs.NamespacedID{ + jobs[0].NamespacedID(), + }, + expect: jobs[0], + }, + { + name: "missing", + jobs: []structs.NamespacedID{ + { + ID: "do-not-exist", + Namespace: "anywhere", + }, + }, + expect: nil, + }, + } { + t.Run(tc.name, func(t *testing.T) { + resp = doRequest(t, &structs.JobStatusesRequest{ + QueryOptions: tc.qo, + Jobs: tc.jobs, + }) + if tc.expect == nil { + must.Len(t, 0, resp.Jobs, must.Sprint("expect no jobs")) + } else { + must.Len(t, 1, resp.Jobs, must.Sprint("expect only one job")) + must.Eq(t, tc.expect.ID, resp.Jobs[0].ID) + } + expectToken := "" + if tc.expectNext > 0 { + expectToken = strconv.FormatUint(tc.expectNext, 10) + } + must.Eq(t, expectToken, resp.NextToken) + }) + } + + // test blocking queries + + // this endpoint should only unblock if something relevant changes. + // "something relevant" is why this seemingly redundant blocking-query + // testing is done here, as the logic to determine what is "relevant" is + // specific to this endpoint, meaning the latest ModifyIndex on each + // job/alloc/deployment seen while iterating, i.e. those "on-page". + + // blocking query helpers + startQuery := func(t *testing.T, req *structs.JobStatusesRequest) context.Context { + t.Helper() + if req == nil { + req = &structs.JobStatusesRequest{} + } + // context to signal when the query unblocks + // mustBlock and mustUnblock below work by checking ctx.Done() + ctx, cancel := context.WithCancel(context.Background()) + // default latest index to induce blocking + if req.QueryOptions.MinQueryIndex == 0 { + idx, err := s.State().LatestIndex() + must.NoError(t, err) + req.QueryOptions.MinQueryIndex = idx + } + // start the query + // note: queries that are expected to remain blocked leak this goroutine + // unless some other test (or cleanup) coincidentally unblocks it + go func() { + resp = doRequest(t, req) + cancel() + }() + // give it a moment for the rpc to actually start up and begin blocking + // FLAKE ALERT: if this job is flaky, this might be why. + time.Sleep(time.Millisecond * 100) + return ctx + } + mustBlock := func(t *testing.T, ctx context.Context) { + t.Helper() + timer := time.NewTimer(time.Millisecond * 200) + defer timer.Stop() + select { + case <-ctx.Done(): + t.Fatal("query should be blocked") + case <-timer.C: + } + } + mustUnblock := func(t *testing.T, ctx context.Context) { + t.Helper() + timer := time.NewTimer(time.Millisecond * 200) + defer timer.Stop() + select { + case <-ctx.Done(): + case <-timer.C: + t.Fatal("query should have unblocked") + } + } + + // alloc and deployment helpers + createAlloc := func(t *testing.T, job *structs.Job) { + t.Helper() + a := mock.MinAllocForJob(job) + must.NoError(t, + s.State().UpsertAllocs(structs.AllocUpdateRequestType, incIdx(t), []*structs.Allocation{a}), + must.Sprintf("error creating alloc for job %s", job.ID)) + t.Cleanup(func() { + test.NoError(t, s.State().DeleteEval(incIdx(t), []string{}, []string{a.ID}, false)) + }) + } + createDeployment := func(t *testing.T, job *structs.Job) { + t.Helper() + deploy := mock.Deployment() + deploy.JobID = job.ID + must.NoError(t, s.State().UpsertDeployment(incIdx(t), deploy)) + t.Cleanup(func() { + test.NoError(t, s.State().DeleteDeployment(incIdx(t), []string{deploy.ID})) + }) + } + + // these must be run in order, as they affect outer-scope state. + + for _, tc := range []struct { + name string + watch *structs.Job // optional specific job to query + run func(*testing.T) // run after starting the blocking query + check func(*testing.T, context.Context) // mustBlock or mustUnblock + }{ + { + name: "get all jobs", + check: mustBlock, + }, + { + name: "delete job", + run: func(_ *testing.T) { + deleteJob0() + }, + check: mustUnblock, + }, + { + name: "change job", + run: func(t *testing.T) { + jobs[1].Name = "job1-new-name" + upsertJob(t, jobs[1]) + }, + check: mustUnblock, + }, + { + name: "new job", + run: func(t *testing.T) { + createJob(t, "new1") + }, + check: mustUnblock, + }, + { + name: "delete job off page", + watch: jobs[2], + run: func(_ *testing.T) { + deleteJob1() + }, + check: mustBlock, + }, + { + name: "delete job on page", + watch: jobs[2], + run: func(_ *testing.T) { + deleteJob2() + }, + check: mustUnblock, + }, + { + name: "new alloc on page", + watch: jobs[3], + run: func(t *testing.T) { + createAlloc(t, jobs[3]) + }, + check: mustUnblock, + }, + { + name: "new alloc off page", + watch: jobs[3], + run: func(t *testing.T) { + createAlloc(t, jobs[4]) + }, + check: mustBlock, + }, + { + name: "new deployment on page", + watch: jobs[3], + run: func(t *testing.T) { + createDeployment(t, jobs[3]) + }, + check: mustUnblock, + }, + { + name: "new deployment off page", + watch: jobs[3], + run: func(t *testing.T) { + createDeployment(t, jobs[4]) + }, + check: mustBlock, + }, + } { + t.Run(tc.name, func(t *testing.T) { + req := &structs.JobStatusesRequest{} + if tc.watch != nil { + req.Jobs = []structs.NamespacedID{tc.watch.NamespacedID()} + } + ctx := startQuery(t, req) + if tc.run != nil { + tc.run(t) + } + tc.check(t, ctx) + }) + } +} diff --git a/nomad/mock/alloc.go b/nomad/mock/alloc.go index bfdd6ba35..170c3b54b 100644 --- a/nomad/mock/alloc.go +++ b/nomad/mock/alloc.go @@ -87,7 +87,10 @@ func Alloc() *structs.Allocation { } func MinAlloc() *structs.Allocation { - job := MinJob() + return MinAllocForJob(MinJob()) +} + +func MinAllocForJob(job *structs.Job) *structs.Allocation { group := job.TaskGroups[0] task := group.Tasks[0] return &structs.Allocation{ @@ -95,6 +98,8 @@ func MinAlloc() *structs.Allocation { EvalID: uuid.Generate(), NodeID: uuid.Generate(), Job: job, + JobID: job.ID, + Namespace: job.Namespace, TaskGroup: group.Name, ClientStatus: structs.AllocClientStatusPending, DesiredStatus: structs.AllocDesiredStatusRun, diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 93c77d0e4..e27261bf1 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -254,6 +254,15 @@ func jobTableSchema() *memdb.TableSchema { Field: "NodePool", }, }, + // ModifyIndex allows sorting by last-changed + "modify_index": { + Name: "modify_index", + AllowMissing: false, + Unique: true, + Indexer: &memdb.UintFieldIndex{ + Field: "ModifyIndex", + }, + }, }, } } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 31d04cc6d..277f393ae 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2450,6 +2450,20 @@ func (s *StateStore) JobsByPool(ws memdb.WatchSet, pool string) (memdb.ResultIte return iter, nil } +// JobsByModifyIndex returns an iterator over all jobs, sorted by ModifyIndex. +func (s *StateStore) JobsByModifyIndex(ws memdb.WatchSet, sort SortOption) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + iter, err := getSorted(txn, sort, "jobs", "modify_index") + if err != nil { + return nil, err + } + + ws.Add(iter.WatchCh()) + + return iter, nil +} + // JobSummaryByID returns a job summary object which matches a specific id. func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) (*structs.JobSummary, error) { txn := s.db.ReadTxn() diff --git a/nomad/structs/job.go b/nomad/structs/job.go index 8eb69b917..e57ef5c2e 100644 --- a/nomad/structs/job.go +++ b/nomad/structs/job.go @@ -16,6 +16,75 @@ const ( JobServiceRegistrationsRPCMethod = "Job.GetServiceRegistrations" ) +// JobStatusesRequest is used on the Job.Statuses RPC endpoint +// to get job/alloc/deployment status for jobs. +type JobStatusesRequest struct { + // Jobs may be optionally provided to request a subset of specific jobs. + Jobs []NamespacedID + // IncludeChildren will include child (batch) jobs in the response. + IncludeChildren bool + QueryOptions +} + +// JobStatusesResponse is the response from Job.Statuses RPC endpoint. +type JobStatusesResponse struct { + Jobs []JobStatusesJob + QueryMeta +} + +// JobStatusesJob collates information about a Job, its Allocation(s), +// and latest Deployment. +type JobStatusesJob struct { + NamespacedID + Name string + Type string + NodePool string + Datacenters []string + Priority int + Version uint64 + SubmitTime int64 + ModifyIndex uint64 + // Allocs contains information about current allocations + Allocs []JobStatusesAlloc + // GroupCountSum is the sum of all group{count=X} values, + // can be compared against number of running allocs to determine + // overall health for "service" jobs. + GroupCountSum int + // ChildStatuses contains the statuses of child (batch) jobs + ChildStatuses []string + // ParentID is set on child (batch) jobs, specifying the parent job ID + ParentID string + LatestDeployment *JobStatusesLatestDeployment +} + +// JobStatusesAlloc contains a subset of Allocation info. +type JobStatusesAlloc struct { + ID string + Group string + ClientStatus string + NodeID string + DeploymentStatus JobStatusesDeployment + JobVersion uint64 + FollowupEvalID string +} + +// JobStatusesDeployment contains a subset of AllocDeploymentStatus info. +type JobStatusesDeployment struct { + Canary bool + Healthy *bool +} + +// JobStatusesLatestDeployment contains a subset of the latest Deployment. +type JobStatusesLatestDeployment struct { + ID string + IsActive bool + JobVersion uint64 + Status string + StatusDescription string + AllAutoPromote bool + RequiresPromotion bool +} + // JobServiceRegistrationsRequest is the request object used to list all // service registrations belonging to the specified Job.ID. type JobServiceRegistrationsRequest struct {