mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
api: new dispatch endpoint sends body as Payload (#24381)
this opens up dispatching parameterized jobs by systems
that do not allow modifying what http request body they send
e.g. these two things are equal:
POST '{"Payload": "'"$(base64 <<< "hello")"'"}' /v1/job/my-job/dispatch
POST 'hello' /v1/job/my-job/dispatch/payload
This commit is contained in:
3
.changelog/24312.txt
Normal file
3
.changelog/24312.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
```release-note:improvement
|
||||
api: new parameterized dispatch endpoint sends raw HTTP request body as Payload
|
||||
```
|
||||
@@ -5,6 +5,7 @@ package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"maps"
|
||||
"net/http"
|
||||
"slices"
|
||||
@@ -88,6 +89,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
|
||||
case strings.HasSuffix(path, "/dispatch"):
|
||||
jobID := strings.TrimSuffix(path, "/dispatch")
|
||||
return s.jobDispatchRequest(resp, req, jobID)
|
||||
case strings.HasSuffix(path, "/dispatch/payload"):
|
||||
jobID := strings.TrimSuffix(path, "/dispatch/payload")
|
||||
return s.jobDispatchPayloadRequest(resp, req, jobID)
|
||||
case strings.HasSuffix(path, "/versions"):
|
||||
jobID := strings.TrimSuffix(path, "/versions")
|
||||
return s.jobVersions(resp, req, jobID)
|
||||
@@ -896,6 +900,30 @@ func (s *HTTPServer) jobDispatchRequest(resp http.ResponseWriter, req *http.Requ
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *HTTPServer) jobDispatchPayloadRequest(resp http.ResponseWriter, req *http.Request, jobID string) (interface{}, error) {
|
||||
if req.Method != http.MethodPut && req.Method != http.MethodPost {
|
||||
return nil, CodedError(405, ErrInvalidMethod)
|
||||
}
|
||||
|
||||
args := structs.JobDispatchRequest{}
|
||||
var err error
|
||||
args.JobID = jobID
|
||||
args.Payload, err = io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
return nil, CodedError(400, err.Error())
|
||||
}
|
||||
|
||||
// this only parses query args and headers (not request body)
|
||||
s.parseWriteRequest(req, &args.WriteRequest)
|
||||
|
||||
var out structs.JobDispatchResponse
|
||||
if err := s.agent.RPC("Job.Dispatch", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
setIndex(resp, out.Index)
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// JobsParseRequest parses a hcl jobspec and returns a api.Job
|
||||
func (s *HTTPServer) JobsParseRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
if req.Method != http.MethodPut && req.Method != http.MethodPost {
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@@ -2003,6 +2004,45 @@ func TestHTTP_JobDispatch(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestHTTP_JobDispatchPayload(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
httpTest(t, nil, func(s *TestAgent) {
|
||||
// Create the parameterized job
|
||||
job := mock.BatchJob()
|
||||
job.ParameterizedJob = &structs.ParameterizedJobConfig{
|
||||
Payload: "required",
|
||||
}
|
||||
|
||||
// Register the job
|
||||
var resp structs.JobRegisterResponse
|
||||
must.NoError(t, s.Agent.RPC("Job.Register",
|
||||
&structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: structs.DefaultNamespace,
|
||||
},
|
||||
}, &resp))
|
||||
|
||||
// Build the request
|
||||
url := "/v1/job/" + job.ID + "/dispatch/payload"
|
||||
body := bytes.NewReader([]byte("any body at all"))
|
||||
req, err := http.NewRequest(http.MethodPut, url, body)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Make the request
|
||||
respW := httptest.NewRecorder()
|
||||
obj, err := s.Server.JobSpecificRequest(respW, req)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, http.StatusOK, respW.Result().StatusCode)
|
||||
|
||||
// Check the response
|
||||
dispatch := obj.(structs.JobDispatchResponse)
|
||||
must.NotEq(t, "", dispatch.EvalID, must.Sprintf("expect EvalID in: %v", dispatch))
|
||||
must.NotEq(t, "", dispatch.DispatchedJobID, must.Sprintf("expect DispatchedJobID in: %v", dispatch))
|
||||
})
|
||||
}
|
||||
|
||||
func TestHTTP_JobRevert(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
httpTest(t, nil, func(s *TestAgent) {
|
||||
|
||||
@@ -1813,6 +1813,55 @@ $ curl \
|
||||
}
|
||||
```
|
||||
|
||||
## Dispatch Job with raw Payload body
|
||||
|
||||
This endpoint dispatches a new instance of a parameterized job using the full
|
||||
request body as the `Payload` as described in [Dispatch Job](#dispatch-job).
|
||||
|
||||
| Method | Path | Produces |
|
||||
| ------ | ---------------------------------- | ------------------ |
|
||||
| `POST` | `/v1/job/:job_id/dispatch/payload` | `application/json` |
|
||||
|
||||
The table below shows this endpoint's support for
|
||||
[blocking queries](/nomad/api-docs#blocking-queries) and
|
||||
[required ACLs](/nomad/api-docs#acls).
|
||||
|
||||
| Blocking Queries | ACL Required |
|
||||
| ---------------- | ------------------------ |
|
||||
| `NO` | `namespace:dispatch-job` |
|
||||
|
||||
### Parameters
|
||||
|
||||
- `:job_id` `(string: <required>)` - Specifies the ID of the job. This is
|
||||
specified as part of the path.
|
||||
|
||||
### Sample Payload
|
||||
|
||||
```
|
||||
any HTTP request body, JSON or otherwise, becomes the dispatch Payload
|
||||
```
|
||||
|
||||
### Sample Request
|
||||
|
||||
```shell-session
|
||||
$ curl \
|
||||
--request POST \
|
||||
--data 'anything at all' \
|
||||
https://localhost:4646/v1/job/my-job/dispatch
|
||||
```
|
||||
|
||||
### Sample Response
|
||||
|
||||
```json
|
||||
{
|
||||
"DispatchedJobID": "param/dispatch-1730920906-81821d1f",
|
||||
"EvalCreateIndex": 179,
|
||||
"EvalID": "5e973383-8d59-3f33-4496-72112a882605",
|
||||
"Index": 179,
|
||||
"JobCreateIndex": 178
|
||||
}
|
||||
```
|
||||
|
||||
## Revert to older Job Version
|
||||
|
||||
This endpoint reverts the job to an older version.
|
||||
|
||||
Reference in New Issue
Block a user