api: new /v1/jobs/statuses endpoint for /ui/jobs page (#20130)

introduce a new API /v1/jobs/statuses, primarily for use in the UI,
which collates info about jobs, their allocations, and latest deployment.

currently the UI gets *all* of /v1/jobs and sorts and paginates them client-side
in the browser, and its "summary" column is based on historical summary data
(which can be visually misleading, and sometimes scary when a job has failed
at some point in the not-yet-garbage-collected past).

this does pagination and filtering and such, and returns jobs sorted by ModifyIndex,
so latest-changed jobs still come first. it pulls allocs and latest deployment
straight out of current state for more a more robust, holistic view of the job status.
it is less efficient per-job, due to the extra state lookups, but should be more efficient
per-page (excepting perhaps for job(s) with very-many allocs).

if a POST body is sent like `{"jobs": [{"namespace": "cool-ns", "id": "cool-job"}]}`,
then the response will be limited to that subset of jobs. the main goal here is to
prevent "jostling" the user in the UI when jobs come into and out of existence.

and if a blocking query is started with `?index=N`, then the query should only
unblock if jobs "on page" change, rather than any change to any of the state
tables being queried ("jobs", "allocs", and "deployment"), to save unnecessary
HTTP round trips.
This commit is contained in:
Daniel Bennett
2024-05-03 15:01:40 -05:00
committed by GitHub
parent 54fc146432
commit cf87a556b3
12 changed files with 1131 additions and 1 deletions

3
.changelog/20130.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:improvement
api: new /v1/jobs/statuses endpoint collates details about jobs' allocs and latest deployment, intended for use in the updated UI jobs index page
```

View File

@@ -1544,3 +1544,12 @@ func (j *Jobs) ActionExec(ctx context.Context,
return s.run(ctx)
}
// JobStatusesRequest is used to get statuses for jobs,
// their allocations and deployments.
type JobStatusesRequest struct {
// Jobs may be optionally provided to request a subset of specific jobs.
Jobs []NamespacedID
// IncludeChildren will include child (batch) jobs in the response.
IncludeChildren bool
}

View File

@@ -156,3 +156,12 @@ func (n NamespaceIndexSort) Less(i, j int) bool {
func (n NamespaceIndexSort) Swap(i, j int) {
n[i], n[j] = n[j], n[i]
}
// NamespacedID is used for things that are unique only per-namespace,
// such as jobs.
type NamespacedID struct {
// Namespace is the Name of the Namespace
Namespace string
// ID is the ID of the namespaced object (e.g. Job ID)
ID string
}

View File

@@ -382,6 +382,7 @@ func (s *HTTPServer) ResolveToken(req *http.Request) (*acl.ACL, error) {
func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/jobs", s.wrap(s.JobsRequest))
s.mux.HandleFunc("/v1/jobs/parse", s.wrap(s.JobsParseRequest))
s.mux.HandleFunc("/v1/jobs/statuses", s.wrap(s.JobStatusesRequest))
s.mux.HandleFunc("/v1/job/", s.wrap(s.JobSpecificRequest))
s.mux.HandleFunc("/v1/nodes", s.wrap(s.NodesRequest))

View File

@@ -0,0 +1,79 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package agent
import (
"fmt"
"net/http"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
)
// JobStatusesRequest looks up the status of jobs' allocs and deployments,
// primarily for use in the UI on the /ui/jobs index page.
func (s *HTTPServer) JobStatusesRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var out structs.JobStatusesResponse
args := structs.JobStatusesRequest{}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
switch req.Method {
case http.MethodGet, http.MethodPost:
break
default:
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}
if includeChildren, err := parseBool(req, "include_children"); err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
} else if includeChildren != nil {
args.IncludeChildren = *includeChildren
}
// ostensibly GETs should not accept structured body, but the HTTP spec
// on this is more what you'd call "guidelines" than actual rules.
if req.Body != nil && req.Body != http.NoBody {
var in api.JobStatusesRequest
if err := decodeBody(req, &in); err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("error decoding request: %v", err))
}
if len(in.Jobs) == 0 {
return nil, CodedError(http.StatusBadRequest, "no jobs in request")
}
// each job has a separate namespace, so in case the NSes are mixed,
// default to wildcard.
// if all requested jobs turn out to have the same namespace,
// then the RPC endpoint will notice that and override this anyway.
if args.QueryOptions.Namespace == structs.DefaultNamespace {
args.QueryOptions.Namespace = structs.AllNamespacesSentinel
}
args.Jobs = make([]structs.NamespacedID, len(in.Jobs))
for i, j := range in.Jobs {
if j.Namespace == "" {
j.Namespace = structs.DefaultNamespace
}
args.Jobs[i] = structs.NamespacedID{
ID: j.ID,
Namespace: j.Namespace,
}
}
// not a direct assignment, because if it is false (default),
// it could override the "include_children" query param.
if in.IncludeChildren {
args.IncludeChildren = true
}
}
if err := s.agent.RPC("Job.Statuses", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return out.Jobs, nil
}

View File

@@ -0,0 +1,247 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package agent
import (
"bytes"
"context"
"errors"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
)
func TestJobEndpoint_Statuses(t *testing.T) {
ci.Parallel(t)
httpTest(t, cb, func(s *TestAgent) {
apiPath := "/v1/jobs/statuses"
parent := mock.MinJob()
parent.ID = "parent"
child := mock.MinJob()
child.ID = "parent/child"
child.ParentID = "parent"
otherNS := mock.MinJob()
otherNS.ID = "otherNS"
otherNS.Namespace = "other"
// lil helpers
registerJob := func(t *testing.T, job *structs.Job) {
must.NoError(t, s.Agent.RPC("Job.Register",
&structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}, &structs.JobRegisterResponse{}),
)
}
createNamespace := func(t *testing.T, ns string) {
must.NoError(t, s.Agent.RPC("Namespace.UpsertNamespaces",
&structs.NamespaceUpsertRequest{
Namespaces: []*structs.Namespace{{
Name: ns,
}},
WriteRequest: structs.WriteRequest{Region: "global"},
}, &structs.GenericResponse{}))
}
buildRequest := func(t *testing.T, method, url, body string) *http.Request {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
t.Cleanup(cancel)
var reqBody io.Reader = http.NoBody
if body != "" {
reqBody = bytes.NewReader([]byte(body))
}
req, err := http.NewRequestWithContext(ctx, method, url, reqBody)
must.NoError(t, err)
return req
}
// note: this api will return jobs ordered by ModifyIndex,
// so in reverse order of their creation here.
registerJob(t, parent)
registerJob(t, child)
createNamespace(t, otherNS.Namespace)
registerJob(t, otherNS)
testCases := []struct {
name string
// request
method, params, body string
// response
expectCode int
expectErr string
expectIDs []string
expectHeaders []string
}{
{
name: "bad method", method: "LOL",
expectCode: 405, expectErr: ErrInvalidMethod,
},
{
name: "bad request param",
params: "?include_children=not-a-bool",
expectCode: 400, expectErr: `Failed to parse value of "include_children"`,
},
{
name: "get ok",
expectIDs: []string{"parent"},
},
{
name: "get all namespaces",
params: "?namespace=*",
expectIDs: []string{"otherNS", "parent"},
},
{
name: "get all reverse",
params: "?namespace=*&reverse=true",
expectIDs: []string{"parent", "otherNS"},
},
{
name: "get one page",
params: "?namespace=*&per_page=1",
expectIDs: []string{"otherNS"},
expectHeaders: []string{"X-Nomad-NextToken"},
},
{
name: "get children",
params: "?include_children=true",
expectIDs: []string{"parent/child", "parent"},
},
{
name: "get children filter",
// this is how the UI does parent job pages
params: "?include_children=true&filter=ParentID == parent",
expectIDs: []string{"parent/child"},
},
// POST and GET are interchangeable, but by convention, the UI will
// POST when sending a request body, so here we test like that too.
{
name: "post no jobs",
method: "POST",
body: `{"jobs": []}`,
expectCode: 400, expectErr: "no jobs in request",
},
{
name: "post bad body",
method: "POST", body: "{malformed",
expectCode: 400, expectErr: "error decoding request: invalid character 'm'",
},
{
name: "post nonexistent job",
method: "POST",
body: `{"jobs": [{"id": "whatever", "namespace": "nope"}]}`,
expectIDs: []string{},
},
{
name: "post single job",
method: "POST",
body: `{"jobs": [{"id": "parent"}]}`,
expectIDs: []string{"parent"},
},
{
name: "post all namespaces",
method: "POST",
// no ?namespace param required, because we default to "*"
// if there is a request body (and ns query is "default")
body: `{"jobs": [{"id": "parent"}, {"id": "otherNS", "namespace": "other"}]}`,
expectIDs: []string{"otherNS", "parent"},
},
{
name: "post auto namespace",
method: "POST",
// namespace gets overridden by the RPC endpoint,
// because jobs in the request body are all one namespace.
params: "?namespace=nope",
body: `{"jobs": [{"id": "parent", "namespace": "default"}]}`,
expectIDs: []string{"parent"},
},
{
name: "post auto namespaces other",
method: "POST",
// "other" namespace should be auto-detected, as it's the only one
body: `{"jobs": [{"id": "otherNS", "namespace": "other"}]}`,
expectIDs: []string{"otherNS"},
},
{
name: "post wrong namespace param",
method: "POST",
params: "?namespace=nope",
// namespace can not be auto-detected, since there are two here,
// so it uses the provided param
body: `{"jobs": [{"id": "parent"}, {"id": "otherNS", "namespace": "other"}]}`,
expectIDs: []string{},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// default happy path values
if tc.method == "" {
tc.method = "GET"
}
if tc.expectCode == 0 {
tc.expectCode = 200
}
req := buildRequest(t, tc.method, apiPath+tc.params, tc.body)
recorder := httptest.NewRecorder()
// method under test!
raw, err := s.Server.JobStatusesRequest(recorder, req)
// sad path
if tc.expectErr != "" {
must.ErrorContains(t, err, tc.expectErr)
var coded *codedError
must.True(t, errors.As(err, &coded))
must.Eq(t, tc.expectCode, coded.code)
must.Nil(t, raw)
return
}
// happy path
must.NoError(t, err)
result := recorder.Result()
must.Eq(t, tc.expectCode, result.StatusCode)
// check response body
jobs := raw.([]structs.JobStatusesJob)
gotIDs := make([]string, len(jobs))
for i, j := range jobs {
gotIDs[i] = j.ID
}
must.Eq(t, tc.expectIDs, gotIDs)
// check headers
expectHeaders := append(
[]string{
"X-Nomad-Index",
"X-Nomad-Lastcontact",
"X-Nomad-Knownleader",
},
tc.expectHeaders...,
)
for _, h := range expectHeaders {
test.NotEq(t, "", result.Header.Get(h),
test.Sprintf("expect '%s' header", h))
}
})
}
})
}

View File

@@ -0,0 +1,303 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package nomad
import (
"errors"
"net/http"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-set/v2"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
)
// Statuses looks up info about jobs, their allocs, and latest deployment.
func (j *Job) Statuses(
args *structs.JobStatusesRequest,
reply *structs.JobStatusesResponse) error {
authErr := j.srv.Authenticate(j.ctx, args)
if done, err := j.srv.forward("Job.Statuses", args, args, reply); done {
return err
}
j.srv.MeasureRPCRate("job", structs.RateMetricList, args)
if authErr != nil {
return structs.ErrPermissionDenied
}
defer metrics.MeasureSince([]string{"nomad", "jobs", "statuses"}, time.Now())
namespace := args.RequestNamespace()
// the namespace from the UI by default is "*", but if specific jobs are
// requested, all with the same namespace, AllowNsOp() below may be able
// to quickly deny the request if the token lacks permissions for that ns,
// rather than iterating the whole jobs table and filtering out every job.
if len(args.Jobs) > 0 {
nses := set.New[string](1)
for _, j := range args.Jobs {
nses.Insert(j.Namespace)
}
if nses.Size() == 1 {
namespace = nses.Slice()[0]
}
}
// check for read-job permissions, since this endpoint includes alloc info
// and possibly a deployment ID, and those APIs require read-job.
aclObj, err := j.srv.ResolveACL(args)
if err != nil {
return err
}
if !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
allow := aclObj.AllowNsOpFunc(acl.NamespaceCapabilityReadJob)
store := j.srv.State()
// get the namespaces the user is allowed to access.
allowableNamespaces, err := allowedNSes(aclObj, store, allow)
if errors.Is(err, structs.ErrPermissionDenied) {
// return empty jobs if token isn't authorized for any
// namespace, matching other endpoints
reply.Jobs = make([]structs.JobStatusesJob, 0)
return nil
} else if err != nil {
return err
}
// since the state index we're using doesn't include namespace,
// explicitly add the user-provided ns to our filter if needed.
// (allowableNamespaces will be nil if the caller sent a mgmt token)
if allowableNamespaces == nil &&
namespace != "" &&
namespace != structs.AllNamespacesSentinel {
allowableNamespaces = map[string]bool{
namespace: true,
}
}
// compare between state run() unblocks to see if the RPC, as a whole,
// should unblock. i.e. if new jobs shift the page, or when jobs go away.
prevJobs := set.New[structs.NamespacedID](0)
// because the state index is in order of ModifyIndex, lowest to highest,
// SortDefault would show oldest jobs first, so instead invert the default
// to show most recent job changes first.
args.QueryOptions.Reverse = !args.QueryOptions.Reverse
sort := state.QueryOptionSort(args.QueryOptions)
// special blocking note: this endpoint employs an unconventional method
// of determining the reply.Index in order to avoid unblocking when
// something changes "off page" -- instead of using the latest index
// from any/all of the state tables queried here (all of: "jobs", "allocs",
// "deployments"), we use the highest ModifyIndex of all items encountered
// while iterating.
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
var err error
var iter memdb.ResultIterator
// the UI jobs index page shows most-recently changed first.
iter, err = state.JobsByModifyIndex(ws, sort)
if err != nil {
return err
}
// set up tokenizer and filters
tokenizer := paginator.NewStructsTokenizer(
iter,
paginator.StructsTokenizerOptions{
OnlyModifyIndex: true,
},
)
filters := []paginator.Filter{
paginator.NamespaceFilter{
AllowableNamespaces: allowableNamespaces,
},
// skip child jobs unless requested to include them
paginator.GenericFilter{Allow: func(i interface{}) (bool, error) {
if args.IncludeChildren {
return true, nil
}
job := i.(*structs.Job)
return job.ParentID == "", nil
}},
}
// only provide specific jobs if requested.
if len(args.Jobs) > 0 {
// set per-page to avoid iterating the whole table
args.QueryOptions.PerPage = int32(len(args.Jobs))
// filter in the requested jobs
jobSet := set.From[structs.NamespacedID](args.Jobs)
filters = append(filters, paginator.GenericFilter{
Allow: func(i interface{}) (bool, error) {
job := i.(*structs.Job)
return jobSet.Contains(job.NamespacedID()), nil
},
})
}
jobs := make([]structs.JobStatusesJob, 0)
newJobs := set.New[structs.NamespacedID](0)
pager, err := paginator.NewPaginator(iter, tokenizer, filters, args.QueryOptions,
func(raw interface{}) error {
job := raw.(*structs.Job)
// this is where the sausage is made
jsj, highestIndexOnPage, err := jobStatusesJobFromJob(ws, state, job)
if err != nil {
return err
}
jobs = append(jobs, jsj)
newJobs.Insert(job.NamespacedID())
// by using the highest index we find on any job/alloc/
// deployment among the jobs on the page, instead of the
// latest index for any particular state table, we can
// avoid unblocking the RPC if something changes "off page"
if highestIndexOnPage > reply.Index {
reply.Index = highestIndexOnPage
}
return nil
})
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusInternalServerError, "failed to create result paginator: %v", err)
}
nextToken, err := pager.Page()
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusInternalServerError, "failed to read result page: %v", err)
}
// if the page has updated, or a job has gone away,
// bump the index to latest jobs entry.
if !prevJobs.Empty() && !newJobs.Equal(prevJobs) {
reply.Index, err = state.Index("jobs")
if err != nil {
return err
}
}
prevJobs = newJobs
reply.QueryMeta.NextToken = nextToken
reply.Jobs = jobs
return nil
}}
return j.srv.blockingRPC(&opts)
}
func jobStatusesJobFromJob(ws memdb.WatchSet, store *state.StateStore, job *structs.Job) (structs.JobStatusesJob, uint64, error) {
highestIdx := job.ModifyIndex
jsj := structs.JobStatusesJob{
NamespacedID: structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
},
Name: job.Name,
Type: job.Type,
NodePool: job.NodePool,
Datacenters: job.Datacenters,
Priority: job.Priority,
Version: job.Version,
ParentID: job.ParentID,
SubmitTime: job.SubmitTime,
ModifyIndex: job.ModifyIndex,
// included here for completeness, populated below.
Allocs: nil,
GroupCountSum: 0,
ChildStatuses: nil,
LatestDeployment: nil,
}
// the GroupCountSum will map to how many allocations we expect to run
// (for service jobs)
for _, tg := range job.TaskGroups {
jsj.GroupCountSum += tg.Count
}
// collect the statuses of child jobs
if job.IsParameterized() || job.IsPeriodic() {
jsj.ChildStatuses = make([]string, 0) // set to not-nil
children, err := store.JobsByIDPrefix(ws, job.Namespace, job.ID, state.SortDefault)
if err != nil {
return jsj, highestIdx, err
}
for {
child := children.Next()
if child == nil {
break
}
j := child.(*structs.Job)
// note: this filters out grandchildren jobs (children of children)
if j.ParentID != job.ID {
continue
}
if j.ModifyIndex > highestIdx {
highestIdx = j.ModifyIndex
}
jsj.ChildStatuses = append(jsj.ChildStatuses, j.Status)
}
// no allocs or deployments for parameterized/period jobs,
// so we're done here.
return jsj, highestIdx, err
}
// collect info about allocations
allocs, err := store.AllocsByJob(ws, job.Namespace, job.ID, true)
if err != nil {
return jsj, highestIdx, err
}
for _, a := range allocs {
jsa := structs.JobStatusesAlloc{
ID: a.ID,
Group: a.TaskGroup,
ClientStatus: a.ClientStatus,
NodeID: a.NodeID,
JobVersion: a.Job.Version,
FollowupEvalID: a.FollowupEvalID,
}
if a.DeploymentStatus != nil {
jsa.DeploymentStatus.Canary = a.DeploymentStatus.IsCanary()
jsa.DeploymentStatus.Healthy = a.DeploymentStatus.Healthy
}
jsj.Allocs = append(jsj.Allocs, jsa)
if a.ModifyIndex > highestIdx {
highestIdx = a.ModifyIndex
}
}
// look for latest deployment
deploy, err := store.LatestDeploymentByJobID(ws, job.Namespace, job.ID)
if err != nil {
return jsj, highestIdx, err
}
if deploy != nil {
jsj.LatestDeployment = &structs.JobStatusesLatestDeployment{
ID: deploy.ID,
IsActive: deploy.Active(),
JobVersion: deploy.JobVersion,
Status: deploy.Status,
StatusDescription: deploy.StatusDescription,
AllAutoPromote: deploy.HasAutoPromote(),
RequiresPromotion: deploy.RequiresPromotion(),
}
if deploy.ModifyIndex > highestIdx {
highestIdx = deploy.ModifyIndex
}
}
return jsj, highestIdx, nil
}

View File

@@ -0,0 +1,382 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package nomad
import (
"context"
"strconv"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
)
func TestJob_Statuses_ACL(t *testing.T) {
s, _, cleanup := TestACLServer(t, nil)
t.Cleanup(cleanup)
testutil.WaitForLeader(t, s.RPC)
insufficientToken := mock.CreatePolicyAndToken(t, s.State(), 1, "job-lister",
mock.NamespacePolicy("default", "", []string{"list-jobs"}))
happyToken := mock.CreatePolicyAndToken(t, s.State(), 2, "job-reader",
mock.NamespacePolicy("default", "", []string{"read-job"}))
for _, tc := range []struct {
name, token, err string
}{
{"no token", "", "Permission denied"},
{"insufficient perms", insufficientToken.SecretID, "Permission denied"},
{"happy token", happyToken.SecretID, ""},
} {
t.Run(tc.name, func(t *testing.T) {
req := &structs.JobStatusesRequest{}
req.QueryOptions.Region = "global"
req.QueryOptions.AuthToken = tc.token
var resp structs.JobStatusesResponse
err := s.RPC("Job.Statuses", &req, &resp)
if tc.err != "" {
must.ErrorContains(t, err, tc.err)
} else {
must.NoError(t, err)
}
})
}
}
func TestJob_Statuses(t *testing.T) {
s, cleanup := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
t.Cleanup(cleanup)
testutil.WaitForLeader(t, s.RPC)
// method under test
doRequest := func(t *testing.T, req *structs.JobStatusesRequest) (resp structs.JobStatusesResponse) {
t.Helper()
must.NotNil(t, req, must.Sprint("request must not be nil"))
req.QueryOptions.Region = "global"
must.NoError(t, s.RPC("Job.Statuses", req, &resp))
return resp
}
// increment state index helper
incIdx := func(t *testing.T) uint64 {
t.Helper()
idx, err := s.State().LatestIndex()
must.NoError(t, err)
return idx + 1
}
// job helpers
deleteJob := func(t *testing.T, job *structs.Job) {
t.Helper()
err := s.State().DeleteJob(incIdx(t), job.Namespace, job.ID)
if err != nil && err.Error() == "job not found" {
return
}
must.NoError(t, err)
}
upsertJob := func(t *testing.T, job *structs.Job) {
t.Helper()
err := s.State().UpsertJob(structs.MsgTypeTestSetup, incIdx(t), nil, job)
must.NoError(t, err)
}
createJob := func(t *testing.T, id string) (job *structs.Job, cleanup func()) {
t.Helper()
job = mock.MinJob()
if id != "" {
job.ID = id
}
upsertJob(t, job)
cleanup = func() {
deleteJob(t, job)
}
t.Cleanup(cleanup)
return job, cleanup
}
// this little cutie sets the latest state index to a predictable value,
// to ensure the below jobs span the boundary from 999->1000 which would
// break pagination without proper uint64 NextToken (ModifyIndex) comparison
must.NoError(t, s.State().UpsertNamespaces(996, nil))
// set up some jobs
// they should be in this order in state using the "modify_index" index,
// but the RPC will return them in reverse order by default.
jobs := make([]*structs.Job, 5)
var deleteJob0, deleteJob1, deleteJob2 func()
jobs[0], deleteJob0 = createJob(t, "job0")
jobs[1], deleteJob1 = createJob(t, "job1")
jobs[2], deleteJob2 = createJob(t, "job2")
jobs[3], _ = createJob(t, "job3")
jobs[4], _ = createJob(t, "job4")
// request all jobs
resp := doRequest(t, &structs.JobStatusesRequest{})
must.Len(t, 5, resp.Jobs)
// make sure our state order assumption is correct
for i, j := range resp.Jobs {
reverse := len(jobs) - i - 1
must.Eq(t, jobs[reverse].ID, j.ID, must.Sprintf("jobs not in order; idx=%d", i))
}
// test various single-job requests
for _, tc := range []struct {
name string
qo structs.QueryOptions
jobs []structs.NamespacedID
expect *structs.Job
expectNext uint64 // NextToken (ModifyIndex)
}{
{
name: "page 1",
qo: structs.QueryOptions{
PerPage: 1,
},
expect: jobs[4],
expectNext: jobs[3].ModifyIndex,
},
{
name: "page 2",
qo: structs.QueryOptions{
PerPage: 1,
NextToken: strconv.FormatUint(jobs[3].ModifyIndex, 10),
},
expect: jobs[3],
expectNext: jobs[2].ModifyIndex,
},
{
name: "reverse",
qo: structs.QueryOptions{
PerPage: 1,
Reverse: true,
},
expect: jobs[0],
expectNext: jobs[1].ModifyIndex,
},
{
name: "filter",
qo: structs.QueryOptions{
Filter: "ID == " + jobs[0].ID,
},
expect: jobs[0],
},
{
name: "specific",
jobs: []structs.NamespacedID{
jobs[0].NamespacedID(),
},
expect: jobs[0],
},
{
name: "missing",
jobs: []structs.NamespacedID{
{
ID: "do-not-exist",
Namespace: "anywhere",
},
},
expect: nil,
},
} {
t.Run(tc.name, func(t *testing.T) {
resp = doRequest(t, &structs.JobStatusesRequest{
QueryOptions: tc.qo,
Jobs: tc.jobs,
})
if tc.expect == nil {
must.Len(t, 0, resp.Jobs, must.Sprint("expect no jobs"))
} else {
must.Len(t, 1, resp.Jobs, must.Sprint("expect only one job"))
must.Eq(t, tc.expect.ID, resp.Jobs[0].ID)
}
expectToken := ""
if tc.expectNext > 0 {
expectToken = strconv.FormatUint(tc.expectNext, 10)
}
must.Eq(t, expectToken, resp.NextToken)
})
}
// test blocking queries
// this endpoint should only unblock if something relevant changes.
// "something relevant" is why this seemingly redundant blocking-query
// testing is done here, as the logic to determine what is "relevant" is
// specific to this endpoint, meaning the latest ModifyIndex on each
// job/alloc/deployment seen while iterating, i.e. those "on-page".
// blocking query helpers
startQuery := func(t *testing.T, req *structs.JobStatusesRequest) context.Context {
t.Helper()
if req == nil {
req = &structs.JobStatusesRequest{}
}
// context to signal when the query unblocks
// mustBlock and mustUnblock below work by checking ctx.Done()
ctx, cancel := context.WithCancel(context.Background())
// default latest index to induce blocking
if req.QueryOptions.MinQueryIndex == 0 {
idx, err := s.State().LatestIndex()
must.NoError(t, err)
req.QueryOptions.MinQueryIndex = idx
}
// start the query
// note: queries that are expected to remain blocked leak this goroutine
// unless some other test (or cleanup) coincidentally unblocks it
go func() {
resp = doRequest(t, req)
cancel()
}()
// give it a moment for the rpc to actually start up and begin blocking
// FLAKE ALERT: if this job is flaky, this might be why.
time.Sleep(time.Millisecond * 100)
return ctx
}
mustBlock := func(t *testing.T, ctx context.Context) {
t.Helper()
timer := time.NewTimer(time.Millisecond * 200)
defer timer.Stop()
select {
case <-ctx.Done():
t.Fatal("query should be blocked")
case <-timer.C:
}
}
mustUnblock := func(t *testing.T, ctx context.Context) {
t.Helper()
timer := time.NewTimer(time.Millisecond * 200)
defer timer.Stop()
select {
case <-ctx.Done():
case <-timer.C:
t.Fatal("query should have unblocked")
}
}
// alloc and deployment helpers
createAlloc := func(t *testing.T, job *structs.Job) {
t.Helper()
a := mock.MinAllocForJob(job)
must.NoError(t,
s.State().UpsertAllocs(structs.AllocUpdateRequestType, incIdx(t), []*structs.Allocation{a}),
must.Sprintf("error creating alloc for job %s", job.ID))
t.Cleanup(func() {
test.NoError(t, s.State().DeleteEval(incIdx(t), []string{}, []string{a.ID}, false))
})
}
createDeployment := func(t *testing.T, job *structs.Job) {
t.Helper()
deploy := mock.Deployment()
deploy.JobID = job.ID
must.NoError(t, s.State().UpsertDeployment(incIdx(t), deploy))
t.Cleanup(func() {
test.NoError(t, s.State().DeleteDeployment(incIdx(t), []string{deploy.ID}))
})
}
// these must be run in order, as they affect outer-scope state.
for _, tc := range []struct {
name string
watch *structs.Job // optional specific job to query
run func(*testing.T) // run after starting the blocking query
check func(*testing.T, context.Context) // mustBlock or mustUnblock
}{
{
name: "get all jobs",
check: mustBlock,
},
{
name: "delete job",
run: func(_ *testing.T) {
deleteJob0()
},
check: mustUnblock,
},
{
name: "change job",
run: func(t *testing.T) {
jobs[1].Name = "job1-new-name"
upsertJob(t, jobs[1])
},
check: mustUnblock,
},
{
name: "new job",
run: func(t *testing.T) {
createJob(t, "new1")
},
check: mustUnblock,
},
{
name: "delete job off page",
watch: jobs[2],
run: func(_ *testing.T) {
deleteJob1()
},
check: mustBlock,
},
{
name: "delete job on page",
watch: jobs[2],
run: func(_ *testing.T) {
deleteJob2()
},
check: mustUnblock,
},
{
name: "new alloc on page",
watch: jobs[3],
run: func(t *testing.T) {
createAlloc(t, jobs[3])
},
check: mustUnblock,
},
{
name: "new alloc off page",
watch: jobs[3],
run: func(t *testing.T) {
createAlloc(t, jobs[4])
},
check: mustBlock,
},
{
name: "new deployment on page",
watch: jobs[3],
run: func(t *testing.T) {
createDeployment(t, jobs[3])
},
check: mustUnblock,
},
{
name: "new deployment off page",
watch: jobs[3],
run: func(t *testing.T) {
createDeployment(t, jobs[4])
},
check: mustBlock,
},
} {
t.Run(tc.name, func(t *testing.T) {
req := &structs.JobStatusesRequest{}
if tc.watch != nil {
req.Jobs = []structs.NamespacedID{tc.watch.NamespacedID()}
}
ctx := startQuery(t, req)
if tc.run != nil {
tc.run(t)
}
tc.check(t, ctx)
})
}
}

View File

@@ -87,7 +87,10 @@ func Alloc() *structs.Allocation {
}
func MinAlloc() *structs.Allocation {
job := MinJob()
return MinAllocForJob(MinJob())
}
func MinAllocForJob(job *structs.Job) *structs.Allocation {
group := job.TaskGroups[0]
task := group.Tasks[0]
return &structs.Allocation{
@@ -95,6 +98,8 @@ func MinAlloc() *structs.Allocation {
EvalID: uuid.Generate(),
NodeID: uuid.Generate(),
Job: job,
JobID: job.ID,
Namespace: job.Namespace,
TaskGroup: group.Name,
ClientStatus: structs.AllocClientStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,

View File

@@ -254,6 +254,15 @@ func jobTableSchema() *memdb.TableSchema {
Field: "NodePool",
},
},
// ModifyIndex allows sorting by last-changed
"modify_index": {
Name: "modify_index",
AllowMissing: false,
Unique: true,
Indexer: &memdb.UintFieldIndex{
Field: "ModifyIndex",
},
},
},
}
}

View File

@@ -2450,6 +2450,20 @@ func (s *StateStore) JobsByPool(ws memdb.WatchSet, pool string) (memdb.ResultIte
return iter, nil
}
// JobsByModifyIndex returns an iterator over all jobs, sorted by ModifyIndex.
func (s *StateStore) JobsByModifyIndex(ws memdb.WatchSet, sort SortOption) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
iter, err := getSorted(txn, sort, "jobs", "modify_index")
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
return iter, nil
}
// JobSummaryByID returns a job summary object which matches a specific id.
func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) (*structs.JobSummary, error) {
txn := s.db.ReadTxn()

View File

@@ -16,6 +16,75 @@ const (
JobServiceRegistrationsRPCMethod = "Job.GetServiceRegistrations"
)
// JobStatusesRequest is used on the Job.Statuses RPC endpoint
// to get job/alloc/deployment status for jobs.
type JobStatusesRequest struct {
// Jobs may be optionally provided to request a subset of specific jobs.
Jobs []NamespacedID
// IncludeChildren will include child (batch) jobs in the response.
IncludeChildren bool
QueryOptions
}
// JobStatusesResponse is the response from Job.Statuses RPC endpoint.
type JobStatusesResponse struct {
Jobs []JobStatusesJob
QueryMeta
}
// JobStatusesJob collates information about a Job, its Allocation(s),
// and latest Deployment.
type JobStatusesJob struct {
NamespacedID
Name string
Type string
NodePool string
Datacenters []string
Priority int
Version uint64
SubmitTime int64
ModifyIndex uint64
// Allocs contains information about current allocations
Allocs []JobStatusesAlloc
// GroupCountSum is the sum of all group{count=X} values,
// can be compared against number of running allocs to determine
// overall health for "service" jobs.
GroupCountSum int
// ChildStatuses contains the statuses of child (batch) jobs
ChildStatuses []string
// ParentID is set on child (batch) jobs, specifying the parent job ID
ParentID string
LatestDeployment *JobStatusesLatestDeployment
}
// JobStatusesAlloc contains a subset of Allocation info.
type JobStatusesAlloc struct {
ID string
Group string
ClientStatus string
NodeID string
DeploymentStatus JobStatusesDeployment
JobVersion uint64
FollowupEvalID string
}
// JobStatusesDeployment contains a subset of AllocDeploymentStatus info.
type JobStatusesDeployment struct {
Canary bool
Healthy *bool
}
// JobStatusesLatestDeployment contains a subset of the latest Deployment.
type JobStatusesLatestDeployment struct {
ID string
IsActive bool
JobVersion uint64
Status string
StatusDescription string
AllAutoPromote bool
RequiresPromotion bool
}
// JobServiceRegistrationsRequest is the request object used to list all
// service registrations belonging to the specified Job.ID.
type JobServiceRegistrationsRequest struct {