Files
nomad/api/allocations_exec.go
Phil Renaud 8902afe651 Nomad Actions (#18794)
* Scaffolding actions (#18639)

* Task-level actions for job submissions and retrieval

* FIXME: Temporary workaround to get ember dev server to pass exec through to 4646

* Update api/tasks.go

Co-authored-by: Tim Gross <tgross@hashicorp.com>

* Update command/agent/job_endpoint.go

Co-authored-by: Tim Gross <tgross@hashicorp.com>

* Diff and copy implementations

* Action structs get their own file, diff updates to behave like our other diffs

* Test to observe actions changes in a version update

* Tests migrated into structs/diff_test and modified with PR comments in mind

* APIActionToSTructsAction now returns a new value

* de-comment some plain parts, remove unused action lookup

* unused param in action converter

---------

Co-authored-by: Tim Gross <tgross@hashicorp.com>

* New endpoint: job/:id/actions (#18690)

* unused param in action converter

* backing out of parse_job level and moved toward new endpoint level

* Adds taskName and taskGroupName to actions at job level

* Unmodified job mock actions tests

* actionless job test

* actionless job test

* Multi group multi task actions test

* HTTP method check for GET, cleaner errors in job_endpoint_test

* decomment

* Actions aggregated at job model level (#18733)

* Removal of temporary fix to proxy to 4646

* Run Action websocket endpoint (#18760)

* Working demo for review purposes

* removal of cors passthru for websockets

* Remove job_endpoint-specific ws handlers and aimed at existing alloc exec handlers instead

* PR comments adressed, no need for taskGroup pass, better group and task lookups from alloc

* early return in action validate and removed jobid from req args per PR comments

* todo removal, we're checking later in the rpc

* boolean style change on tty

* Action CLI command (#18778)

* Action command init and stuck-notes

* Conditional reqpath to aim at Job action endpoint

* De-logged

* General CLI command cleanup, observe namespace, pass action as string, get random alloc w group adherence

* tab and varname cleanup

* Remove action param from Allocations().Exec calls

* changelog

* dont nil-check acl

---------

Co-authored-by: Tim Gross <tgross@hashicorp.com>
2023-10-20 13:05:55 -04:00

257 lines
5.3 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package api
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
"sync"
"time"
"github.com/gorilla/websocket"
)
const (
// heartbeatInterval is the amount of time to wait between sending heartbeats
// during an exec streaming operation
heartbeatInterval = 10 * time.Second
)
type execSession struct {
client *Client
alloc *Allocation
task string
tty bool
command []string
action string
stdin io.Reader
stdout io.Writer
stderr io.Writer
terminalSizeCh <-chan TerminalSize
q *QueryOptions
}
func (s *execSession) run(ctx context.Context) (exitCode int, err error) {
ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()
conn, err := s.startConnection()
if err != nil {
return -2, err
}
defer conn.Close()
sendErrCh := s.startTransmit(ctx, conn)
exitCh, recvErrCh := s.startReceiving(ctx, conn)
for {
select {
case <-ctx.Done():
return -2, ctx.Err()
case exitCode := <-exitCh:
return exitCode, nil
case recvErr := <-recvErrCh:
// drop websocket code, not relevant to user
if wsErr, ok := recvErr.(*websocket.CloseError); ok && wsErr.Text != "" {
return -2, errors.New(wsErr.Text)
}
return -2, recvErr
case sendErr := <-sendErrCh:
return -2, fmt.Errorf("failed to send input: %w", sendErr)
}
}
}
func (s *execSession) startConnection() (*websocket.Conn, error) {
// First, attempt to connect to the node directly, but may fail due to network isolation
// and network errors. Fallback to using server-side forwarding instead.
nodeClient, err := s.client.GetNodeClientWithTimeout(s.alloc.NodeID, ClientConnTimeout, s.q)
if err == NodeDownErr {
return nil, NodeDownErr
}
q := s.q
if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}
commandBytes, err := json.Marshal(s.command)
if err != nil {
return nil, fmt.Errorf("failed to marshal command: %W", err)
}
q.Params["tty"] = strconv.FormatBool(s.tty)
q.Params["task"] = s.task
q.Params["command"] = string(commandBytes)
reqPath := fmt.Sprintf("/v1/client/allocation/%s/exec", s.alloc.ID)
if s.action != "" {
q.Params["action"] = s.action
q.Params["allocID"] = s.alloc.ID
q.Params["group"] = s.alloc.TaskGroup
reqPath = fmt.Sprintf("/v1/job/%s/action", s.alloc.JobID)
}
var conn *websocket.Conn
if nodeClient != nil {
conn, _, _ = nodeClient.websocket(reqPath, q)
}
if conn == nil {
conn, _, err = s.client.websocket(reqPath, q)
if err != nil {
return nil, err
}
}
return conn, nil
}
func (s *execSession) startTransmit(ctx context.Context, conn *websocket.Conn) <-chan error {
// FIXME: Handle websocket send errors.
// Currently, websocket write failures are dropped. As sending and
// receiving are running concurrently, it's expected that some send
// requests may fail with connection errors when connection closes.
// Connection errors should surface in the receive paths already,
// but I'm unsure about one-sided communication errors.
var sendLock sync.Mutex
send := func(v *ExecStreamingInput) {
sendLock.Lock()
defer sendLock.Unlock()
conn.WriteJSON(v)
}
errCh := make(chan error, 4)
// propagate stdin
go func() {
bytes := make([]byte, 2048)
for {
if ctx.Err() != nil {
return
}
input := ExecStreamingInput{Stdin: &ExecStreamingIOOperation{}}
n, err := s.stdin.Read(bytes)
// always send data if we read some
if n != 0 {
input.Stdin.Data = bytes[:n]
send(&input)
}
// then handle error
if err == io.EOF {
// if n != 0, send data and we'll get n = 0 on next read
if n == 0 {
input.Stdin.Close = true
send(&input)
return
}
} else if err != nil {
errCh <- err
return
}
}
}()
// propagate terminal sizing updates
go func() {
for {
resizeInput := ExecStreamingInput{}
select {
case <-ctx.Done():
return
case size, ok := <-s.terminalSizeCh:
if !ok {
return
}
resizeInput.TTYSize = &size
send(&resizeInput)
}
}
}()
// send a heartbeat every 10 seconds
go func() {
t := time.NewTimer(heartbeatInterval)
defer t.Stop()
for {
t.Reset(heartbeatInterval)
select {
case <-ctx.Done():
return
case <-t.C:
// heartbeat message
send(&execStreamingInputHeartbeat)
}
}
}()
return errCh
}
func (s *execSession) startReceiving(ctx context.Context, conn *websocket.Conn) (<-chan int, <-chan error) {
exitCodeCh := make(chan int, 1)
errCh := make(chan error, 1)
go func() {
for ctx.Err() == nil {
// Decode the next frame
var frame ExecStreamingOutput
err := conn.ReadJSON(&frame)
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
errCh <- fmt.Errorf("websocket closed before receiving exit code: %w", err)
return
} else if err != nil {
errCh <- err
return
}
switch {
case frame.Stdout != nil:
if len(frame.Stdout.Data) != 0 {
s.stdout.Write(frame.Stdout.Data)
}
// don't really do anything if stdout is closing
case frame.Stderr != nil:
if len(frame.Stderr.Data) != 0 {
s.stderr.Write(frame.Stderr.Data)
}
// don't really do anything if stderr is closing
case frame.Exited && frame.Result != nil:
exitCodeCh <- frame.Result.ExitCode
return
default:
// noop - heartbeat
}
}
}()
return exitCodeCh, errCh
}