actions: minor bug fixes and improvements (#18904)

This commit is contained in:
Luiz Aoqui
2023-10-31 17:06:02 -04:00
committed by GitHub
parent 2bff6d2a6a
commit 3ddf1ecf1d
12 changed files with 254 additions and 108 deletions

View File

@@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"io"
"net/url"
"strconv"
"sync"
"time"
@@ -25,6 +26,7 @@ const (
type execSession struct {
client *Client
alloc *Allocation
job string
task string
tty bool
command []string
@@ -101,7 +103,7 @@ func (s *execSession) startConnection() (*websocket.Conn, error) {
q.Params["action"] = s.action
q.Params["allocID"] = s.alloc.ID
q.Params["group"] = s.alloc.TaskGroup
reqPath = fmt.Sprintf("/v1/job/%s/action", s.alloc.JobID)
reqPath = fmt.Sprintf("/v1/job/%s/action", url.PathEscape(s.job))
}
var conn *websocket.Conn

View File

@@ -1520,7 +1520,7 @@ type EvalOptions struct {
// ActionExec is used to run a pre-defined command inside a running task.
// The call blocks until command terminates (or an error occurs), and returns the exit code.
func (j *Jobs) ActionExec(ctx context.Context,
alloc *Allocation, task string, tty bool, command []string,
alloc *Allocation, job string, task string, tty bool, command []string,
action string,
stdin io.Reader, stdout, stderr io.Writer,
terminalSizeCh <-chan TerminalSize, q *QueryOptions) (exitCode int, err error) {
@@ -1528,6 +1528,7 @@ func (j *Jobs) ActionExec(ctx context.Context,
s := &execSession{
client: j.client,
alloc: alloc,
job: job,
task: task,
tty: tty,
command: command,

View File

@@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"io"
"net/http"
"time"
"github.com/armon/go-metrics"
@@ -248,17 +249,26 @@ func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, e
return pointer.Of(int64(400)), taskNotPresentErr
}
if req.JobID != "" && req.JobID != alloc.JobID {
return pointer.Of(int64(http.StatusBadRequest)),
fmt.Errorf("job %s does not have allocation %s", req.JobID, req.AllocID)
}
// If an action is present, go find the command and args
if req.Action != "" {
alloc, _ := a.c.GetAlloc(req.AllocID)
jobAction, err := validateActionExists(req.Action, req.Task, alloc)
if err != nil {
return pointer.Of(int64(400)), err
task := alloc.LookupTask(req.Task)
if task == nil {
return pointer.Of(int64(http.StatusBadRequest)),
fmt.Errorf("task %s not found in allocation %s", req.Task, alloc.ID)
}
if jobAction != nil {
// append both Command and Args
req.Cmd = append([]string{jobAction.Command}, jobAction.Args...)
jobAction := task.GetAction(req.Action)
if jobAction == nil {
return pointer.Of(int64(http.StatusBadRequest)),
fmt.Errorf("action %s not found in task %s", req.Action, req.Task)
}
// append both Command and Args
req.Cmd = append([]string{jobAction.Command}, jobAction.Args...)
}
if len(req.Cmd) == 0 {
@@ -357,14 +367,3 @@ func (s *execStream) Recv() (*drivers.ExecTaskStreamingRequestMsg, error) {
err := s.decoder.Decode(&req)
return &req, err
}
func validateActionExists(actionName string, taskName string, alloc *nstructs.Allocation) (*nstructs.Action, error) {
t := alloc.LookupTask(taskName)
for _, action := range t.Actions {
if action.Name == actionName {
return action, nil
}
}
return nil, fmt.Errorf("action %s not found", actionName)
}

View File

@@ -175,6 +175,9 @@ type StreamErrWrapper struct {
// AllocExecRequest is the initial request for execing into an Alloc task
type AllocExecRequest struct {
// JobID is the ID of the job requested
JobID string
// AllocID is the allocation to stream logs from
AllocID string

View File

@@ -14,7 +14,6 @@ import (
"syscall"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/hashicorp/nomad/helper/escapingio"
"github.com/posener/complete"
)
@@ -27,7 +26,7 @@ type ActionCommand struct {
Stderr io.WriteCloser
}
func (l *ActionCommand) Help() string {
func (c *ActionCommand) Help() string {
helpText := `
Usage: nomad action [options] <action>
@@ -49,7 +48,7 @@ Action Specific Options:
-job <job-id>
Specifies the job in which the Action is defined
-allocation <allocation-id>
-alloc <allocation-id>
Specifies the allocation in which the Action is defined. If not provided,
a group and task name must be provided and a random allocation will be
selected from the job.
@@ -58,90 +57,86 @@ Action Specific Options:
Specifies the task in which the Action is defined. Required if no
allocation is provided.
-group <group-name>
-group=<group-name>
Specifies the group in which the Action is defined. Required if no
allocation is provided.
-i
Pass stdin to the container, defaults to true. Pass -i=false to disable.
Pass stdin to the container, defaults to true. Pass -i=false to disable.
-t
Allocate a pseudo-tty, defaults to true if stdin is detected to be a tty session.
Pass -t=false to disable explicitly.
-e <escape_char>
Sets the escape character for sessions with a pty (default: '~'). The escape
character is only recognized at the beginning of a line. The escape character
followed by a dot ('.') closes the connection. Setting the character to
Sets the escape character for sessions with a pty (default: '~'). The escape
character is only recognized at the beginning of a line. The escape character
followed by a dot ('.') closes the connection. Setting the character to
'none' disables any escapes and makes the session fully transparent.
`
`
return strings.TrimSpace(helpText)
}
func (l *ActionCommand) Synopsis() string {
func (c *ActionCommand) Synopsis() string {
return "Run a pre-defined action from a Nomad task"
}
func (l *ActionCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(l.Meta.AutocompleteFlags(FlagSetClient),
func (c *ActionCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-task": complete.PredictAnything,
"-job": complete.PredictAnything,
"-allocation": complete.PredictAnything,
"-job": complete.PredictAnything,
"-alloc": complete.PredictAnything,
"-task": complete.PredictAnything,
"-group": complete.PredictAnything,
"-i": complete.PredictNothing,
"-t": complete.PredictNothing,
"-e": complete.PredictAnything,
})
}
func (l *ActionCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := l.Meta.Client()
if err != nil {
return nil
}
resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Allocs, nil)
if err != nil {
return []string{}
}
return resp.Matches[contexts.Allocs]
})
func (c *ActionCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictNothing
}
func (l *ActionCommand) Name() string { return "action" }
func (c *ActionCommand) Name() string { return "action" }
func (l *ActionCommand) Run(args []string) int {
func (c *ActionCommand) Run(args []string) int {
var stdinOpt, ttyOpt bool
var task, allocation, job, group, escapeChar string
flags := l.Meta.FlagSet(l.Name(), FlagSetClient)
flags.Usage = func() { l.Ui.Output(l.Help()) }
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.StringVar(&task, "task", "", "")
flags.StringVar(&group, "group", "", "")
flags.StringVar(&allocation, "allocation", "", "")
flags.StringVar(&allocation, "alloc", "", "")
flags.StringVar(&job, "job", "", "")
flags.BoolVar(&stdinOpt, "i", true, "")
flags.BoolVar(&ttyOpt, "t", isTty(), "")
flags.StringVar(&escapeChar, "e", "~", "")
if err := flags.Parse(args); err != nil {
l.Ui.Error(fmt.Sprintf("Error parsing flags: %s", err))
c.Ui.Error(fmt.Sprintf("Error parsing flags: %s", err))
return 1
}
args = flags.Args()
if len(args) < 1 {
l.Ui.Error("An action name is required")
c.Ui.Error("An action name is required")
c.Ui.Error(commandErrorText(c))
return 1
}
if job == "" {
l.Ui.Error("A job ID is required")
c.Ui.Error("A job ID is required")
c.Ui.Error(commandErrorText(c))
return 1
}
if ttyOpt && !stdinOpt {
l.Ui.Error("-i must be enabled if running with tty")
c.Ui.Error("-i must be enabled if running with tty")
c.Ui.Error(commandErrorText(c))
return 1
}
@@ -150,13 +145,14 @@ func (l *ActionCommand) Run(args []string) int {
}
if len(escapeChar) > 1 {
l.Ui.Error("-e requires 'none' or a single character")
c.Ui.Error("-e requires 'none' or a single character")
c.Ui.Error(commandErrorText(c))
return 1
}
client, err := l.Meta.Client()
client, err := c.Meta.Client()
if err != nil {
l.Ui.Error(fmt.Sprintf("Error initializing client: %v", err))
c.Ui.Error(fmt.Sprintf("Error initializing client: %v", err))
return 1
}
@@ -167,41 +163,43 @@ func (l *ActionCommand) Run(args []string) int {
// Group param cannot be empty if allocation is empty,
// since we'll need to get a random allocation from the group
if group == "" {
l.Ui.Error("A group name is required if no allocation is provided")
c.Ui.Error("A group name is required if no allocation is provided")
c.Ui.Error(commandErrorText(c))
return 1
}
if task == "" {
l.Ui.Error("A task name is required if no allocation is provided")
c.Ui.Error("A task name is required if no allocation is provided")
c.Ui.Error(commandErrorText(c))
return 1
}
jobID, ns, err := l.JobIDByPrefix(client, job, nil)
jobID, ns, err := c.JobIDByPrefix(client, job, nil)
if err != nil {
l.Ui.Error(err.Error())
c.Ui.Error(err.Error())
return 1
}
allocStub, err = getRandomJobAlloc(client, jobID, group, ns)
if err != nil {
l.Ui.Error(fmt.Sprintf("Error fetching allocations: %v", err))
c.Ui.Error(fmt.Sprintf("Error fetching allocations: %v", err))
return 1
}
} else {
allocs, _, err := client.Allocations().PrefixList(sanitizeUUIDPrefix(allocation))
if err != nil {
l.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err))
c.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err))
return 1
}
if len(allocs) == 0 {
l.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocation))
c.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocation))
return 1
}
if len(allocs) > 1 {
out := formatAllocListStubs(allocs, false, shortId)
l.Ui.Error(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", out))
c.Ui.Error(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", out))
return 1
}
@@ -211,7 +209,7 @@ func (l *ActionCommand) Run(args []string) int {
q := &api.QueryOptions{Namespace: allocStub.Namespace}
alloc, _, err := client.Allocations().Info(allocStub.ID, q)
if err != nil {
l.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err))
c.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err))
return 1
}
@@ -221,31 +219,31 @@ func (l *ActionCommand) Run(args []string) int {
task, err = lookupAllocTask(alloc)
}
if err != nil {
l.Ui.Error(err.Error())
c.Ui.Error(err.Error())
return 1
}
if !stdinOpt {
l.Stdin = bytes.NewReader(nil)
c.Stdin = bytes.NewReader(nil)
}
if l.Stdin == nil {
l.Stdin = os.Stdin
if c.Stdin == nil {
c.Stdin = os.Stdin
}
if l.Stdout == nil {
l.Stdout = os.Stdout
if c.Stdout == nil {
c.Stdout = os.Stdout
}
if l.Stderr == nil {
l.Stderr = os.Stderr
if c.Stderr == nil {
c.Stderr = os.Stderr
}
action := args[0]
code, err := l.execImpl(client, alloc, task, job, action, ttyOpt, escapeChar, l.Stdin, l.Stdout, l.Stderr)
code, err := c.execImpl(client, alloc, task, job, action, ttyOpt, escapeChar, c.Stdin, c.Stdout, c.Stderr)
if err != nil {
l.Ui.Error(fmt.Sprintf("failed to exec into task: %v", err))
c.Ui.Error(fmt.Sprintf("failed to exec into task: %v", err))
return 1
}
@@ -253,7 +251,7 @@ func (l *ActionCommand) Run(args []string) int {
}
// execImpl invokes the Alloc Exec api call, it also prepares and restores terminal states as necessary.
func (l *ActionCommand) execImpl(client *api.Client, alloc *api.Allocation, task string, job string, action string, tty bool,
func (c *ActionCommand) execImpl(client *api.Client, alloc *api.Allocation, task string, job string, action string, tty bool,
escapeChar string, stdin io.Reader, stdout, stderr io.WriteCloser) (int, error) {
sizeCh := make(chan api.TerminalSize, 1)
@@ -312,6 +310,7 @@ func (l *ActionCommand) execImpl(client *api.Client, alloc *api.Allocation, task
}
}()
return client.Jobs().ActionExec(ctx,
alloc, task, tty, make([]string, 0), action, stdin, stdout, stderr, sizeCh, nil)
return client.Jobs().ActionExec(ctx, alloc, job, task,
tty, make([]string, 0), action,
stdin, stdout, stderr, sizeCh, nil)
}

View File

@@ -116,7 +116,8 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
jobID := strings.TrimSuffix(path, "/actions")
return s.jobActions(resp, req, jobID)
case strings.HasSuffix(path, "/action"):
return s.jobRunAction(resp, req)
jobID := strings.TrimSuffix(path, "/action")
return s.jobRunAction(resp, req, jobID)
default:
return s.jobCRUD(resp, req, path)
}
@@ -342,7 +343,7 @@ func (s *HTTPServer) jobLatestDeployment(resp http.ResponseWriter, req *http.Req
func (s *HTTPServer) jobActions(resp http.ResponseWriter, req *http.Request, jobID string) (any, error) {
if req.Method != http.MethodGet {
return nil, CodedError(405, ErrInvalidMethod)
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}
args := structs.JobSpecificRequest{
@@ -362,16 +363,14 @@ func (s *HTTPServer) jobActions(resp http.ResponseWriter, req *http.Request, job
return out.Actions, nil
}
func (s *HTTPServer) jobRunAction(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
s.logger.Info("jobRunAction called")
// Build the request and parse the ACL token
func (s *HTTPServer) jobRunAction(resp http.ResponseWriter, req *http.Request, jobID string) (interface{}, error) {
task := req.URL.Query().Get("task")
action := req.URL.Query().Get("action")
allocID := req.URL.Query().Get("allocID")
// Build the request and parse the ACL token
var err error
isTTY := false
err := error(nil)
if tty := req.URL.Query().Get("tty"); tty != "" {
isTTY, err = strconv.ParseBool(tty)
if err != nil {
@@ -380,16 +379,15 @@ func (s *HTTPServer) jobRunAction(resp http.ResponseWriter, req *http.Request) (
}
args := cstructs.AllocExecRequest{
JobID: jobID,
Task: task,
Action: action,
AllocID: allocID,
Tty: isTTY,
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
conn, err := s.wsUpgrader.Upgrade(resp, req, nil)
if err != nil {
return nil, fmt.Errorf("failed to upgrade connection: %v", err)
}

View File

@@ -32,6 +32,7 @@ var (
}
normalTaskKeys = append(commonTaskKeys,
"action",
"artifact",
"constraint",
"affinity",
@@ -46,7 +47,6 @@ var (
"kind",
"volume_mount",
"csi_plugin",
"actions",
)
sidecarTaskKeys = append(commonTaskKeys,

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net"
"net/http"
"time"
"github.com/armon/go-metrics"
@@ -430,6 +431,37 @@ func (a *ClientAllocations) exec(conn io.ReadWriteCloser) {
return
}
if alloc.ClientTerminalStatus() {
handleStreamResultError(fmt.Errorf("exec not possible, client status of allocation %s is %s", alloc.ID, alloc.ClientStatus),
pointer.Of(int64(http.StatusBadRequest)), encoder)
return
}
// Handle job ID if requested.
if args.JobID != "" {
// Verify job exists.
job, err := snap.JobByID(nil, args.Namespace, args.JobID)
if err != nil {
handleStreamResultError(err,
pointer.Of(int64(http.StatusInternalServerError)), encoder)
return
}
if job == nil {
handleStreamResultError(
fmt.Errorf("job %s not found in namespace %s", args.JobID, args.Namespace),
pointer.Of(int64(http.StatusNotFound)), encoder)
return
}
// Verify requested allocation belongs to the job.
if args.JobID != alloc.JobID {
handleStreamResultError(
fmt.Errorf("job %s does not have allocation %s", args.JobID, alloc.ID),
pointer.Of(int64(http.StatusBadRequest)), encoder,
)
}
}
nodeID := alloc.NodeID
// Make sure Node is valid and new enough to support RPC

View File

@@ -25,6 +25,7 @@ import (
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
@@ -1227,6 +1228,87 @@ func TestAlloc_ExecStreaming(t *testing.T) {
}
}
func TestAlloc_ExecStreaming_TerminalAlloc(t *testing.T) {
ci.Parallel(t)
// Start a Nomad server and a client.
s, cleanupS := TestServer(t, nil)
defer cleanupS()
// Wait for a cluster leader and the client to connect.
testutil.WaitForLeader(t, s.RPC)
testutil.WaitForResult(func() (bool, error) {
nodes := s.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
must.NoError(t, err)
})
// Create an alloc with terminal status.
alloc := mock.BatchAlloc()
alloc.ClientStatus = nstructs.AllocClientStatusComplete
alloc.Job.TaskGroups[0].Count = 1
alloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "20s",
"exec_command": map[string]interface{}{
"run_for": "1ms",
"stdout_string": "expected output",
"exit_code": 3,
},
}
// Upsert the job and allocation.
state := s.State()
err := state.UpsertJob(nstructs.MsgTypeTestSetup, 999, nil, alloc.Job)
must.NoError(t, err)
err = state.UpsertAllocs(nstructs.MsgTypeTestSetup, 1003, []*nstructs.Allocation{alloc})
must.NoError(t, err)
// Make the exec request.
req := &cstructs.AllocExecRequest{
AllocID: alloc.ID,
Task: alloc.Job.TaskGroups[0].Tasks[0].Name,
Tty: true,
Cmd: []string{"placeholder command"},
QueryOptions: nstructs.QueryOptions{Region: "global"},
}
// Get the handler.
handler, err := s.StreamingRpcHandler("Allocations.Exec")
must.Nil(t, err)
// Create a pipe.
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
frames := make(chan *drivers.ExecTaskStreamingResponseMsg)
// Start the handler.
go handler(p2)
go decodeFrames(t, p1, frames, errCh)
// Send the request.
encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle)
must.Nil(t, encoder.Encode(req))
timeout := time.NewTimer(3 * time.Second)
t.Cleanup(func() { timeout.Stop() })
OUTER:
for {
select {
case <-timeout.C:
t.Error("timed out before getting exit code")
case err := <-errCh:
must.ErrorContains(t, err, "exec not possible")
break OUTER
case <-frames:
}
}
}
func decodeFrames(t *testing.T, p1 net.Conn, frames chan<- *drivers.ExecTaskStreamingResponseMsg, errCh chan<- error) {
// Start the decoder
decoder := codec.NewDecoder(p1, nstructs.MsgpackHandle)

View File

@@ -7,7 +7,12 @@
package structs
import "slices"
import (
"errors"
"slices"
"github.com/hashicorp/go-multierror"
)
type Action struct {
Name string
@@ -47,3 +52,16 @@ func (a *Action) Equal(o *Action) bool {
a.Command == o.Command &&
slices.Equal(a.Args, o.Args)
}
func (a *Action) Validate() error {
if a == nil {
return nil
}
var mErr *multierror.Error
if a.Command == "" {
mErr = multierror.Append(mErr, errors.New("Missing command"))
}
return mErr.ErrorOrNil()
}

View File

@@ -338,19 +338,6 @@ func CopySliceNodeScoreMeta(s []*NodeScoreMeta) []*NodeScoreMeta {
return c
}
func CopySliceActions(s []*Action) []*Action {
l := len(s)
if l == 0 {
return nil
}
c := make([]*Action, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
// VaultPoliciesSet takes the structure returned by VaultPolicies and returns
// the set of required policies
func VaultPoliciesSet(policies map[string]map[string]*Vault) []string {

View File

@@ -7592,6 +7592,15 @@ func (t *Task) GetIdentity(name string) *WorkloadIdentity {
return nil
}
func (t *Task) GetAction(name string) *Action {
for _, a := range t.Actions {
if a.Name == name {
return a
}
}
return nil
}
// IdentityHandle returns a WorkloadIdentityHandle which is a pair of unique WI
// name and task name.
func (t *Task) IdentityHandle(identity *WorkloadIdentity) *WIHandle {
@@ -7632,7 +7641,7 @@ func (t *Task) Copy() *Task {
nt.Lifecycle = nt.Lifecycle.Copy()
nt.Identity = nt.Identity.Copy()
nt.Identities = helper.CopySlice(nt.Identities)
nt.Actions = CopySliceActions(nt.Actions)
nt.Actions = helper.CopySlice(nt.Actions)
if t.Artifacts != nil {
artifacts := make([]*TaskArtifact, 0, len(t.Artifacts))
@@ -7844,6 +7853,22 @@ func (t *Task) Validate(jobType string, tg *TaskGroup) error {
}
}
// Validate actions.
actions := make(map[string]bool)
for _, action := range t.Actions {
if err := action.Validate(); err != nil {
outer := fmt.Errorf("Action %s validation failed: %s", action.Name, err)
mErr.Errors = append(mErr.Errors, outer)
}
if handled, seen := actions[action.Name]; seen && !handled {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Action %s defined multiple times", action.Name))
actions[action.Name] = true
continue
}
actions[action.Name] = false
}
// Validate the dispatch payload block if there
if t.DispatchPayload != nil {
if err := t.DispatchPayload.Validate(); err != nil {