api: enable support for setting original job source (#16763)

* api: enable support for setting original source alongside job

This PR adds support for setting job source material along with
the registration of a job.

This includes a new HTTP endpoint and a new RPC endpoint for
making queries for the original source of a job. The
HTTP endpoint is /v1/job/<id>/submission?version=<version> and
the RPC method is Job.GetJobSubmission.

The job source (if submitted, and doing so is always optional), is
stored in the job_submission memdb table, separately from the
actual job. This way we do not incur overhead of reading the large
string field throughout normal job operations.

The server config now includes job_max_source_size for configuring
the maximum size the job source may be, before the server simply
drops the source material. This should help prevent Bad Things from
happening when huge jobs are submitted. If the value is set to 0,
all job source material will be dropped.

* api: avoid writing var content to disk for parsing

* api: move submission validation into RPC layer

* api: return an error if updating a job submission without namespace or job id

* api: be exact about the job index we associate a submission with (modify)

* api: reword api docs scheduling

* api: prune all but the last 6 job submissions

* api: protect against nil job submission in job validation

* api: set max job source size in test server

* api: fixups from pr
This commit is contained in:
Seth Hoenig
2023-04-11 08:45:08 -05:00
committed by GitHub
parent 27b5596b9c
commit 2c44cbb001
79 changed files with 1987 additions and 654 deletions

View File

@@ -17,6 +17,7 @@ import (
"time"
metrics "github.com/armon/go-metrics"
"github.com/dustin/go-humanize"
consulapi "github.com/hashicorp/consul/api"
log "github.com/hashicorp/go-hclog"
uuidparse "github.com/hashicorp/go-uuid"
@@ -29,6 +30,7 @@ import (
"github.com/hashicorp/nomad/helper/bufconndialer"
"github.com/hashicorp/nomad/helper/escapingfs"
"github.com/hashicorp/nomad/helper/pluginutils/loader"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/cpuset"
"github.com/hashicorp/nomad/nomad"
@@ -163,7 +165,7 @@ func NewAgent(config *Config, logger log.InterceptLogger, logOutput io.Writer, i
// convertServerConfig takes an agent config and log output and returns a Nomad
// Config. There may be missing fields that must be set by the agent. To do this
// call finalizeServerConfig
// call finalizeServerConfig.
func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
conf := agentConfig.NomadConfig
if conf == nil {
@@ -574,6 +576,16 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
conf.RaftBoltNoFreelistSync = bolt.NoFreelistSync
}
// Interpret job_max_source_size as bytes from string value
if agentConfig.Server.JobMaxSourceSize == nil {
agentConfig.Server.JobMaxSourceSize = pointer.Of("1M")
}
jobMaxSourceBytes, err := humanize.ParseBytes(*agentConfig.Server.JobMaxSourceSize)
if err != nil {
return nil, fmt.Errorf("failed to parse max job source bytes: %w", err)
}
conf.JobMaxSourceSize = int(jobMaxSourceBytes)
return conf, nil
}
@@ -606,7 +618,7 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) {
return nil, err
}
if err := a.finalizeClientConfig(c); err != nil {
if err = a.finalizeClientConfig(c); err != nil {
return nil, err
}

View File

@@ -678,7 +678,7 @@ func TestAgent_ServerConfig_RaftProtocol_3(t *testing.T) {
}
}
func TestAgent_ClientConfig(t *testing.T) {
func TestAgent_ClientConfig_discovery(t *testing.T) {
ci.Parallel(t)
conf := DefaultConfig()
conf.Client.Enabled = true
@@ -730,6 +730,21 @@ func TestAgent_ClientConfig(t *testing.T) {
require.False(t, c.NomadServiceDiscovery)
}
func TestAgent_ClientConfig_JobMaxSourceSize(t *testing.T) {
ci.Parallel(t)
conf := DevConfig(nil)
must.Eq(t, conf.Server.JobMaxSourceSize, pointer.Of("1M"))
must.NoError(t, conf.normalizeAddrs())
// config conversion ensures value is set
conf.Server.JobMaxSourceSize = nil
agent := &Agent{config: conf}
serverConf, err := agent.serverConfig()
must.NoError(t, err)
must.Eq(t, 1e6, serverConf.JobMaxSourceSize)
}
func TestAgent_ClientConfig_ReservedCores(t *testing.T) {
ci.Parallel(t)
conf := DefaultConfig()
@@ -738,30 +753,28 @@ func TestAgent_ClientConfig_ReservedCores(t *testing.T) {
conf.Client.Reserved.Cores = "0,2-3"
a := &Agent{config: conf}
c, err := a.clientConfig()
require.NoError(t, err)
require.Exactly(t, []uint16{0, 1, 2, 3, 4, 5, 6, 7}, c.ReservableCores)
require.Exactly(t, []uint16{0, 2, 3}, c.Node.ReservedResources.Cpu.ReservedCpuCores)
must.NoError(t, err)
must.Eq(t, []uint16{0, 1, 2, 3, 4, 5, 6, 7}, c.ReservableCores)
must.Eq(t, []uint16{0, 2, 3}, c.Node.ReservedResources.Cpu.ReservedCpuCores)
}
// Clients should inherit telemetry configuration
func TestAgent_Client_TelemetryConfiguration(t *testing.T) {
ci.Parallel(t)
assert := assert.New(t)
conf := DefaultConfig()
conf.DevMode = true
a := &Agent{config: conf}
c, err := a.clientConfig()
assert.Nil(err)
must.NoError(t, err)
telemetry := conf.Telemetry
assert.Equal(c.StatsCollectionInterval, telemetry.collectionInterval)
assert.Equal(c.PublishNodeMetrics, telemetry.PublishNodeMetrics)
assert.Equal(c.PublishAllocationMetrics, telemetry.PublishAllocationMetrics)
must.Eq(t, c.StatsCollectionInterval, telemetry.collectionInterval)
must.Eq(t, c.PublishNodeMetrics, telemetry.PublishNodeMetrics)
must.Eq(t, c.PublishAllocationMetrics, telemetry.PublishAllocationMetrics)
}
// TestAgent_HTTPCheck asserts Agent.agentHTTPCheck properly alters the HTTP

View File

@@ -609,6 +609,11 @@ type ServerConfig struct {
// for the EventBufferSize is 1.
EventBufferSize *int `hcl:"event_buffer_size"`
// JobMaxSourceSize limits the maximum size of a jobs source hcl/json
// before being discarded automatically. If unset, the maximum size defaults
// to 1 MB. If the value is zero, no job sources will be stored.
JobMaxSourceSize *string `hcl:"max_job_source_size"`
// LicensePath is the path to search for an enterprise license.
LicensePath string `hcl:"license_path"`
@@ -675,6 +680,7 @@ func (s *ServerConfig) Copy() *ServerConfig {
ns.PlanRejectionTracker = s.PlanRejectionTracker.Copy()
ns.EnableEventBroker = pointer.Copy(s.EnableEventBroker)
ns.EventBufferSize = pointer.Copy(s.EventBufferSize)
ns.JobMaxSourceSize = pointer.Copy(s.JobMaxSourceSize)
ns.licenseAdditionalPublicKeys = slices.Clone(s.licenseAdditionalPublicKeys)
ns.ExtraKeysHCL = slices.Clone(s.ExtraKeysHCL)
ns.Search = s.Search.Copy()
@@ -1060,7 +1066,7 @@ func (a *Addresses) Copy() *Addresses {
return &na
}
// AdvertiseAddrs is used to control the addresses we advertise out for
// NormalizedAddrs is used to control the addresses we advertise out for
// different network services. All are optional and default to BindAddr and
// their default Port.
type NormalizedAddrs struct {
@@ -1309,6 +1315,7 @@ func DefaultConfig() *Config {
LimitResults: 100,
MinTermLength: 2,
},
JobMaxSourceSize: pointer.Of("1M"),
},
ACL: &ACLConfig{
Enabled: false,
@@ -1978,6 +1985,8 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
result.EventBufferSize = b.EventBufferSize
}
result.JobMaxSourceSize = pointer.Merge(s.JobMaxSourceSize, b.JobMaxSourceSize)
if b.PlanRejectionTracker != nil {
result.PlanRejectionTracker = result.PlanRejectionTracker.Merge(b.PlanRejectionTracker)
}

View File

@@ -111,7 +111,7 @@ func TestHTTP_DeploymentAllocations(t *testing.T) {
a2.TaskStates = make(map[string]*structs.TaskState)
a2.TaskStates["test"] = taskState2
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 998, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 998, nil, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(999, d), "UpsertDeployment")
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a1, a2}), "UpsertAllocs")
@@ -178,7 +178,7 @@ func TestHTTP_DeploymentPause(t *testing.T) {
j := mock.Job()
d := mock.Deployment()
d.JobID = j.ID
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, nil, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(1000, d), "UpsertDeployment")
// Create the pause request
@@ -219,7 +219,7 @@ func TestHTTP_DeploymentPromote(t *testing.T) {
j := mock.Job()
d := mock.Deployment()
d.JobID = j.ID
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, nil, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(1000, d), "UpsertDeployment")
// Create the pause request
@@ -263,7 +263,7 @@ func TestHTTP_DeploymentAllocHealth(t *testing.T) {
a := mock.Alloc()
a.JobID = j.ID
a.DeploymentID = d.ID
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 998, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 998, nil, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(999, d), "UpsertDeployment")
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a}), "UpsertAllocs")
@@ -305,7 +305,7 @@ func TestHTTP_DeploymentFail(t *testing.T) {
j := mock.Job()
d := mock.Deployment()
d.JobID = j.ID
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 998, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 998, nil, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(999, d), "UpsertDeployment")
// Make the HTTP request

View File

@@ -66,7 +66,7 @@ func addAllocToClient(agent *TestAgent, alloc *structs.Allocation, wait clientAl
// Upsert the allocation
state := agent.server.State()
require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job))
require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, nil, alloc.Job))
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc}))
if wait == noWaitClientAlloc {

View File

@@ -107,6 +107,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
case strings.HasSuffix(path, "/services"):
jobID := strings.TrimSuffix(path, "/services")
return s.jobServiceRegistrations(resp, req, jobID)
case strings.HasSuffix(path, "/submission"):
jobID := strings.TrimSuffix(path, "/submission")
return s.jobSubmissionCRUD(resp, req, jobID)
default:
return s.jobCRUD(resp, req, path)
}
@@ -330,6 +333,42 @@ func (s *HTTPServer) jobLatestDeployment(resp http.ResponseWriter, req *http.Req
return out.Deployment, nil
}
func (s *HTTPServer) jobSubmissionCRUD(resp http.ResponseWriter, req *http.Request, jobID string) (*structs.JobSubmission, error) {
version, err := strconv.ParseUint(req.URL.Query().Get("version"), 10, 64)
if err != nil {
return nil, CodedError(400, "Unable to parse job submission version parameter")
}
switch req.Method {
case "GET":
return s.jobSubmissionQuery(resp, req, jobID, version)
default:
return nil, CodedError(405, ErrInvalidMethod)
}
}
func (s *HTTPServer) jobSubmissionQuery(resp http.ResponseWriter, req *http.Request, jobID string, version uint64) (*structs.JobSubmission, error) {
args := structs.JobSubmissionRequest{
JobID: jobID,
Version: version,
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
var out structs.JobSubmissionResponse
if err := s.agent.RPC("Job.GetJobSubmission", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
if out.Submission == nil {
return nil, CodedError(404, "job source not found")
}
return out.Submission, nil
}
func (s *HTTPServer) jobCRUD(resp http.ResponseWriter, req *http.Request, jobID string) (interface{}, error) {
switch req.Method {
case "GET":
@@ -413,8 +452,12 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request, jobI
}
sJob, writeReq := s.apiJobAndRequestToStructs(args.Job, req, args.WriteRequest)
submission := apiJobSubmissionToStructs(args.Submission)
regReq := structs.JobRegisterRequest{
Job: sJob,
Job: sJob,
Submission: submission,
EnforceIndex: args.EnforceIndex,
JobModifyIndex: args.JobModifyIndex,
PolicyOverride: args.PolicyOverride,
@@ -743,10 +786,14 @@ func (s *HTTPServer) JobsParseRequest(resp http.ResponseWriter, req *http.Reques
jobStruct, err = jobspec.Parse(strings.NewReader(args.JobHCL))
} else {
jobStruct, err = jobspec2.ParseWithConfig(&jobspec2.ParseConfig{
Path: "input.hcl",
Body: []byte(args.JobHCL),
AllowFS: false,
Path: "input.hcl",
Body: []byte(args.JobHCL),
AllowFS: false,
VarContent: args.Variables,
})
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Failed to parse job: %v", err))
}
}
if err != nil {
return nil, CodedError(400, err.Error())
@@ -790,6 +837,18 @@ func (s *HTTPServer) jobServiceRegistrations(resp http.ResponseWriter, req *http
return reply.Services, nil
}
func apiJobSubmissionToStructs(submission *api.JobSubmission) *structs.JobSubmission {
if submission == nil {
return nil
}
return &structs.JobSubmission{
Source: submission.Source,
Format: submission.Format,
VariableFlags: submission.VariableFlags,
Variables: submission.Variables,
}
}
// apiJobAndRequestToStructs parses the query params from the incoming
// request and converts to a structs.Job and WriteRequest with the
func (s *HTTPServer) apiJobAndRequestToStructs(job *api.Job, req *http.Request, apiReq api.WriteRequest) (*structs.Job, *structs.WriteRequest) {

View File

@@ -387,31 +387,45 @@ func TestHTTP_JobsParse(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
buf := encodeReq(api.JobsParseRequest{JobHCL: mock.HCL()})
req, err := http.NewRequest("POST", "/v1/jobs/parse", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
respW := httptest.NewRecorder()
obj, err := s.Server.JobsParseRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if obj == nil {
t.Fatal("response should not be nil")
}
must.NoError(t, err)
must.NotNil(t, obj)
job := obj.(*api.Job)
expected := mock.Job()
if job.Name == nil || *job.Name != expected.Name {
t.Fatalf("job name is '%s', expected '%s'", *job.Name, expected.Name)
}
must.Eq(t, expected.Name, *job.Name)
must.Eq(t, expected.Datacenters[0], job.Datacenters[0])
})
}
if job.Datacenters == nil ||
job.Datacenters[0] != expected.Datacenters[0] {
t.Fatalf("job datacenters is '%s', expected '%s'",
job.Datacenters[0], expected.Datacenters[0])
}
func TestHTTP_JobsParse_HCLVar(t *testing.T) {
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
hclJob, hclVar := mock.HCLVar()
buf := encodeReq(api.JobsParseRequest{
JobHCL: hclJob,
Variables: hclVar,
})
req, err := http.NewRequest("POST", "/v1/jobs/parse", buf)
must.NoError(t, err)
respW := httptest.NewRecorder()
obj, err := s.Server.JobsParseRequest(respW, req)
must.NoError(t, err)
must.NotNil(t, obj)
job := obj.(*api.Job)
must.Eq(t, "var-job", *job.Name)
must.Eq(t, map[string]any{
"command": "echo",
"args": []any{"S is stringy, N is 42, B is true"},
}, job.TaskGroups[0].Tasks[0].Config)
})
}
@@ -598,7 +612,7 @@ func TestHTTP_JobQuery_Payload(t *testing.T) {
// Directly manipulate the state
state := s.Agent.server.State()
if err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job); err != nil {
if err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job); err != nil {
t.Fatalf("Failed to upsert job: %v", err)
}
@@ -1608,6 +1622,49 @@ func TestHTTP_JobVersions(t *testing.T) {
})
}
func TestHTTP_JobSubmission(t *testing.T) {
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
job := mock.Job()
args := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
Submission: &structs.JobSubmission{
Source: mock.HCL(),
Format: "hcl2",
},
}
var resp structs.JobRegisterResponse
must.NoError(t, s.Agent.RPC("Job.Register", &args, &resp))
respW := httptest.NewRecorder()
// make request for job submission @ v0
req, err := http.NewRequest(http.MethodGet, "/v1/job/"+job.ID+"/submission?version=0", nil)
must.NoError(t, err)
submission, err := s.Server.jobSubmissionCRUD(respW, req, job.ID)
must.NoError(t, err)
must.Eq(t, "hcl2", submission.Format)
must.StrContains(t, submission.Source, `job "my-job" {`)
// make request for job submission @v1 (does not exist)
req, err = http.NewRequest(http.MethodGet, "/v1/job/"+job.ID+"/submission?version=1", nil)
must.NoError(t, err)
_, err = s.Server.jobSubmissionCRUD(respW, req, job.ID)
must.ErrorContains(t, err, "job source not found")
// make POST request (invalid method)
req, err = http.NewRequest(http.MethodPost, "/v1/job/"+job.ID+"/submission?version=0", nil)
must.NoError(t, err)
_, err = s.Server.jobSubmissionCRUD(respW, req, job.ID)
must.ErrorContains(t, err, "Invalid method")
})
}
func TestHTTP_PeriodicForce(t *testing.T) {
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
@@ -2267,7 +2324,7 @@ func TestHTTPServer_jobServiceRegistrations(t *testing.T) {
// Generate a job and upsert this.
job := mock.Job()
require.NoError(t, testState.UpsertJob(structs.MsgTypeTestSetup, 10, job))
require.NoError(t, testState.UpsertJob(structs.MsgTypeTestSetup, 10, nil, job))
// Generate a service registration, assigned the jobID to the
// mocked jobID, and upsert this.
@@ -2301,7 +2358,7 @@ func TestHTTPServer_jobServiceRegistrations(t *testing.T) {
// Generate a job and upsert this.
job := mock.Job()
require.NoError(t, testState.UpsertJob(structs.MsgTypeTestSetup, 10, job))
require.NoError(t, testState.UpsertJob(structs.MsgTypeTestSetup, 10, nil, job))
// Build the HTTP request.
path := fmt.Sprintf("/v1/job/%s/services", job.ID)
@@ -3635,6 +3692,30 @@ func TestConversion_apiResourcesToStructs(t *testing.T) {
}
}
func TestConversion_apiJobSubmissionToStructs(t *testing.T) {
ci.Parallel(t)
t.Run("nil", func(t *testing.T) {
result := apiJobSubmissionToStructs(nil)
must.Nil(t, result)
})
t.Run("not nil", func(t *testing.T) {
result := apiJobSubmissionToStructs(&api.JobSubmission{
Source: "source",
Format: "hcl2",
VariableFlags: map[string]string{"foo": "bar"},
Variables: "variable",
})
must.Eq(t, &structs.JobSubmission{
Source: "source",
Format: "hcl2",
VariableFlags: map[string]string{"foo": "bar"},
Variables: "variable",
}, result)
})
}
func TestConversion_apiConnectSidecarTaskToStructs(t *testing.T) {
ci.Parallel(t)
require.Nil(t, apiConnectSidecarTaskToStructs(nil))

View File

@@ -25,7 +25,7 @@ func createJobForTest(jobID string, s *TestAgent, t *testing.T) {
job.ID = jobID
job.TaskGroups[0].Count = 1
state := s.Agent.server.State()
err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job)
require.NoError(t, err)
}
@@ -61,7 +61,7 @@ func createCmdJobForTest(name, cmd string, s *TestAgent, t *testing.T) *structs.
job.TaskGroups[0].Tasks[0].Config["command"] = cmd
job.TaskGroups[0].Count = 1
state := s.Agent.server.State()
err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job)
require.NoError(t, err)
return job
}