Merge pull request #5633 from hashicorp/f-nomad-exec-parts-02-cli

nomad exec part 2: CLI
This commit is contained in:
Mahmood Ali
2019-05-15 12:50:42 -04:00
committed by GitHub
7 changed files with 684 additions and 26 deletions

339
command/alloc_exec.go Normal file
View File

@@ -0,0 +1,339 @@
package command
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"os/signal"
"strings"
"syscall"
"github.com/docker/docker/pkg/term"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)
type AllocExecCommand struct {
Meta
Stdin io.Reader
Stdout io.WriteCloser
Stderr io.WriteCloser
}
func (l *AllocExecCommand) Help() string {
helpText := `
Usage: nomad alloc exec [options] <allocation> <command>
Run command inside the environment of the given allocation and task.
General Options:
` + generalOptionsUsage() + `
Exec Specific Options:
-task <task-name>
Sets the task to exec command in
-job
Use a random allocation from the specified job ID.
-i
Pass stdin to the container, defaults to true. Pass -i=false to disable.
-t
Allocate a pseudo-tty, defaults to true if stdin is detected to be a tty session.
Pass -t=false to disable explicitly.
`
return strings.TrimSpace(helpText)
}
func (l *AllocExecCommand) Synopsis() string {
return "Execute commands in task"
}
func (c *AllocExecCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"--task": complete.PredictAnything,
"-job": complete.PredictAnything,
"-i": complete.PredictNothing,
"-t": complete.PredictNothing,
})
}
func (l *AllocExecCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := l.Meta.Client()
if err != nil {
return nil
}
resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Allocs, nil)
if err != nil {
return []string{}
}
return resp.Matches[contexts.Allocs]
})
}
func (l *AllocExecCommand) Name() string { return "alloc exec" }
func (l *AllocExecCommand) Run(args []string) int {
var job, stdinOpt, ttyOpt bool
var task string
flags := l.Meta.FlagSet(l.Name(), FlagSetClient)
flags.Usage = func() { l.Ui.Output(l.Help()) }
flags.BoolVar(&job, "job", false, "")
flags.StringVar(&task, "task", "", "")
flags.BoolVar(&stdinOpt, "i", true, "")
stdinTty := isStdinTty()
flags.BoolVar(&ttyOpt, "t", stdinTty, "")
if err := flags.Parse(args); err != nil {
return 1
}
args = flags.Args()
if ttyOpt && !stdinOpt {
l.Ui.Error("-i must be enabled if running with tty")
return -1
}
if numArgs := len(args); numArgs < 1 {
if job {
l.Ui.Error("A job ID is required")
} else {
l.Ui.Error("An allocation ID is required")
}
l.Ui.Error(commandErrorText(l))
return 1
} else if numArgs < 2 {
l.Ui.Error("A command is required")
l.Ui.Error(commandErrorText(l))
return 1
}
command := args[1:]
client, err := l.Meta.Client()
if err != nil {
l.Ui.Error(fmt.Sprintf("Error initializing client: %v", err))
return 1
}
// If -job is specified, use random allocation, otherwise use provided allocation
allocID := args[0]
if job {
allocID, err = getRandomJobAlloc(client, args[0])
if err != nil {
l.Ui.Error(fmt.Sprintf("Error fetching allocations: %v", err))
return 1
}
}
length := shortId
// Query the allocation info
if len(allocID) == 1 {
l.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters."))
return 1
}
allocID = sanitizeUUIDPrefix(allocID)
allocs, _, err := client.Allocations().PrefixList(allocID)
if err != nil {
l.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err))
return 1
}
if len(allocs) == 0 {
l.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID))
return 1
}
if len(allocs) > 1 {
// Format the allocs
out := formatAllocListStubs(allocs, false, length)
l.Ui.Error(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", out))
return 1
}
// Prefix lookup matched a single allocation
alloc, _, err := client.Allocations().Info(allocs[0].ID, nil)
if err != nil {
l.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err))
return 1
}
if task == "" {
task, err = lookupAllocTask(alloc)
if err != nil {
l.Ui.Error(err.Error())
l.Ui.Error("\nPlease specify the task.")
return 1
}
}
if err := validateTaskExistsInAllocation(task, alloc); err != nil {
l.Ui.Error(err.Error())
return 1
}
if l.Stdin == nil {
l.Stdin = os.Stdin
}
if l.Stdout == nil {
l.Stdout = os.Stdout
}
if l.Stderr == nil {
l.Stderr = os.Stderr
}
var stdin io.Reader = l.Stdin
if !stdinOpt {
stdin = bytes.NewReader(nil)
}
code, err := l.execImpl(client, alloc, task, ttyOpt, command, stdin, l.Stdout, l.Stderr)
if err != nil {
l.Ui.Error(fmt.Sprintf("failed to exec into task: %v", err))
return 1
}
return code
}
// execImpl invokes the Alloc Exec api call, it also prepares and restores terminal states as necessary.
func (l *AllocExecCommand) execImpl(client *api.Client, alloc *api.Allocation, task string, tty bool,
command []string, stdin io.Reader, stdout, stderr io.WriteCloser) (int, error) {
sizeCh := make(chan api.TerminalSize, 1)
// When tty, ensures we capture all user input and monitor terminal resizes.
if tty {
if stdin == nil {
return -1, fmt.Errorf("stdin is null")
}
inCleanup, err := setRawTerminal(stdin)
if err != nil {
return -1, err
}
defer inCleanup()
outCleanup, err := setRawTerminalOutput(stdout)
if err != nil {
return -1, err
}
defer outCleanup()
sizeCleanup, err := watchTerminalSize(stdout, sizeCh)
if err != nil {
return -1, err
}
defer sizeCleanup()
}
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
go func() {
for range signalCh {
cancelFn()
}
}()
return client.Allocations().Exec(ctx,
alloc, task, tty, command, stdin, stdout, stderr, sizeCh, nil)
}
func isStdinTty() bool {
_, isTerminal := term.GetFdInfo(os.Stdin)
return isTerminal
}
// setRawTerminal sets the stream terminal in raw mode, so process captures
// Ctrl+C and other commands to forward to remote process.
// It returns a cleanup function that restores terminal to original mode.
func setRawTerminal(stream interface{}) (cleanup func(), err error) {
fd, isTerminal := term.GetFdInfo(stream)
if !isTerminal {
return nil, errors.New("not a terminal")
}
state, err := term.SetRawTerminal(fd)
if err != nil {
return nil, err
}
return func() { term.RestoreTerminal(fd, state) }, nil
}
// setRawTerminalOutput sets the output stream in Windows to raw mode,
// so it disables LF -> CRLF translation.
// It's basically a no-op on unix.
func setRawTerminalOutput(stream interface{}) (cleanup func(), err error) {
fd, isTerminal := term.GetFdInfo(stream)
if !isTerminal {
return nil, errors.New("not a terminal")
}
state, err := term.SetRawTerminalOutput(fd)
if err != nil {
return nil, err
}
return func() { term.RestoreTerminal(fd, state) }, nil
}
// watchTerminalSize watches terminal size changes to propogate to remote tty.
func watchTerminalSize(out io.Writer, resize chan<- api.TerminalSize) (func(), error) {
fd, isTerminal := term.GetFdInfo(out)
if !isTerminal {
return nil, errors.New("not a terminal")
}
ctx, cancel := context.WithCancel(context.Background())
signalCh := make(chan os.Signal, 1)
setupWindowNotification(signalCh)
sendTerminalSize := func() {
s, err := term.GetWinsize(fd)
if err != nil {
return
}
resize <- api.TerminalSize{
Height: int(s.Height),
Width: int(s.Width),
}
}
go func() {
for {
select {
case <-ctx.Done():
return
case <-signalCh:
sendTerminalSize()
}
}
}()
go func() {
// send initial size
sendTerminalSize()
}()
return cancel, nil
}

287
command/alloc_exec_test.go Normal file
View File

@@ -0,0 +1,287 @@
package command
import (
"bytes"
"fmt"
"strings"
"testing"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/cli"
"github.com/posener/complete"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// static check
var _ cli.Command = &AllocExecCommand{}
func TestAllocExecCommand_Fails(t *testing.T) {
t.Parallel()
srv, client, url := testServer(t, true, nil)
defer srv.Shutdown()
cases := []struct {
name string
args []string
expectedError string
}{
{
"misuse",
[]string{"bad"},
commandErrorText(&AllocExecCommand{}),
},
{
"connection failure",
[]string{"-address=nope", "26470238-5CF2-438F-8772-DC67CFB0705C", "/bin/bash"},
"Error querying allocation",
},
{
"not found alloc",
[]string{"-address=" + url, "26470238-5CF2-438F-8772-DC67CFB0705C", "/bin/bash"},
"No allocation(s) with prefix or id",
},
{
"not found job",
[]string{"-address=" + url, "-job", "example", "/bin/bash"},
`job "example" doesn't exist`,
},
{
"too short allocis",
[]string{"-address=" + url, "2", "/bin/bash"},
"Alloc ID must contain at least two characters",
},
{
"missing command",
[]string{"-address=" + url, "26470238-5CF2-438F-8772-DC67CFB0705C"},
"A command is required",
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
ui := new(cli.MockUi)
cmd := &AllocExecCommand{Meta: Meta{Ui: ui}}
code := cmd.Run(c.args)
require.Equal(t, 1, code)
require.Contains(t, ui.ErrorWriter.String(), c.expectedError)
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
})
}
// Wait for a node to be ready
testutil.WaitForResult(func() (bool, error) {
nodes, _, err := client.Nodes().List(nil)
if err != nil {
return false, err
}
for _, node := range nodes {
if _, ok := node.Drivers["mock_driver"]; ok &&
node.Status == structs.NodeStatusReady {
return true, nil
}
}
return false, fmt.Errorf("no ready nodes")
}, func(err error) {
require.NoError(t, err)
})
t.Run("non existent task", func(t *testing.T) {
ui := new(cli.MockUi)
cmd := &AllocExecCommand{Meta: Meta{Ui: ui}}
jobID := "job1_sfx"
job1 := testJob(jobID)
resp, _, err := client.Jobs().Register(job1, nil)
require.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
require.Zero(t, code, "status code not zero")
// get an alloc id
allocId1 := ""
if allocs, _, err := client.Jobs().Allocations(jobID, false, nil); err == nil {
if len(allocs) > 0 {
allocId1 = allocs[0].ID
}
}
require.NotEmpty(t, allocId1, "unable to find allocation")
// by alloc
require.Equal(t, 1, cmd.Run([]string{"-address=" + url, "-task=nonexistenttask1", allocId1, "/bin/bash"}))
require.Contains(t, ui.ErrorWriter.String(), "Could not find task named: nonexistenttask1")
ui.ErrorWriter.Reset()
// by jobID
require.Equal(t, 1, cmd.Run([]string{"-address=" + url, "-task=nonexistenttask2", "-job", jobID, "/bin/bash"}))
require.Contains(t, ui.ErrorWriter.String(), "Could not find task named: nonexistenttask2")
ui.ErrorWriter.Reset()
})
}
func TestAllocExecCommand_AutocompleteArgs(t *testing.T) {
assert := assert.New(t)
t.Parallel()
srv, _, url := testServer(t, true, nil)
defer srv.Shutdown()
ui := new(cli.MockUi)
cmd := &AllocExecCommand{Meta: Meta{Ui: ui, flagAddress: url}}
// Create a fake alloc
state := srv.Agent.Server().State()
a := mock.Alloc()
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a}))
prefix := a.ID[:5]
args := complete.Args{Last: prefix}
predictor := cmd.AutocompleteArgs()
res := predictor.Predict(args)
assert.Equal(1, len(res))
assert.Equal(a.ID, res[0])
}
func TestAllocExecCommand_Run(t *testing.T) {
t.Parallel()
srv, client, url := testServer(t, true, nil)
defer srv.Shutdown()
// Wait for a node to be ready
testutil.WaitForResult(func() (bool, error) {
nodes, _, err := client.Nodes().List(nil)
if err != nil {
return false, err
}
for _, node := range nodes {
if _, ok := node.Drivers["mock_driver"]; ok &&
node.Status == structs.NodeStatusReady {
return true, nil
}
}
return false, fmt.Errorf("no ready nodes")
}, func(err error) {
require.NoError(t, err)
})
jobID := uuid.Generate()
job := testJob(jobID)
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
"exec_command": map[string]interface{}{
"run_for": "1ms",
"exit_code": 21,
"stdout_string": "sample stdout output\n",
"stderr_string": "sample stderr output\n",
},
}
resp, _, err := client.Jobs().Register(job, nil)
require.NoError(t, err)
evalUi := new(cli.MockUi)
code := waitForSuccess(evalUi, client, fullId, t, resp.EvalID)
require.Equal(t, 0, code, "failed to get status - output: %v", evalUi.ErrorWriter.String())
allocId := ""
testutil.WaitForResult(func() (bool, error) {
allocs, _, err := client.Jobs().Allocations(jobID, false, nil)
if err != nil {
return false, fmt.Errorf("failed to get allocations: %v", err)
}
if len(allocs) < 0 {
return false, fmt.Errorf("no allocations yet")
}
alloc := allocs[0]
if alloc.ClientStatus != "running" {
return false, fmt.Errorf("alloc is not running yet: %v", alloc.ClientStatus)
}
allocId = alloc.ID
return true, nil
}, func(err error) {
require.NoError(t, err)
})
cases := []struct {
name string
command string
stdin string
stdout string
stderr string
exitCode int
}{
{
name: "basic stdout/err",
command: "simplecommand",
stdin: "",
stdout: "sample stdout output",
stderr: "sample stderr output",
exitCode: 21,
},
{
name: "notty: streamining input",
command: "showinput",
stdin: "hello from stdin",
stdout: "TTY: false\nStdin:\nhello from stdin",
exitCode: 0,
},
}
for _, c := range cases {
t.Run("by id: "+c.name, func(t *testing.T) {
ui := new(cli.MockUi)
var stdout, stderr bufferCloser
cmd := &AllocExecCommand{
Meta: Meta{Ui: ui},
Stdin: strings.NewReader(c.stdin),
Stdout: &stdout,
Stderr: &stderr,
}
code = cmd.Run([]string{"-address=" + url, allocId, c.command})
assert.Equal(t, c.exitCode, code)
assert.Equal(t, c.stdout, strings.TrimSpace(stdout.String()))
assert.Equal(t, c.stderr, strings.TrimSpace(stderr.String()))
})
t.Run("by job: "+c.name, func(t *testing.T) {
ui := new(cli.MockUi)
var stdout, stderr bufferCloser
cmd := &AllocExecCommand{
Meta: Meta{Ui: ui},
Stdin: strings.NewReader(c.stdin),
Stdout: &stdout,
Stderr: &stderr,
}
code = cmd.Run([]string{"-address=" + url, "-job", jobID, c.command})
assert.Equal(t, c.exitCode, code)
assert.Equal(t, c.stdout, strings.TrimSpace(stdout.String()))
assert.Equal(t, c.stderr, strings.TrimSpace(stderr.String()))
})
}
}
type bufferCloser struct {
bytes.Buffer
}
func (b *bufferCloser) Close() error {
return nil
}

View File

@@ -0,0 +1,14 @@
// +build darwin dragonfly freebsd linux netbsd openbsd solaris
package command
import (
"os"
"os/signal"
"golang.org/x/sys/unix"
)
func setupWindowNotification(ch chan<- os.Signal) {
signal.Notify(ch, unix.SIGWINCH)
}

View File

@@ -0,0 +1,9 @@
package command
import (
"os"
)
func setupWindowNotification(ch chan<- os.Signal) {
// do nothing
}

View File

@@ -1,6 +1,7 @@
package command
import (
"errors"
"fmt"
"io"
"os"
@@ -185,25 +186,10 @@ func (l *AllocLogsCommand) Run(args []string) int {
}
} else {
// Try to determine the tasks name from the allocation
var tasks []*api.Task
for _, tg := range alloc.Job.TaskGroups {
if *tg.Name == alloc.TaskGroup {
if len(tg.Tasks) == 1 {
task = tg.Tasks[0].Name
break
}
task, err = lookupAllocTask(alloc)
tasks = tg.Tasks
break
}
}
if task == "" {
l.Ui.Error(fmt.Sprintf("Allocation %q is running the following tasks:", limit(alloc.ID, length)))
for _, t := range tasks {
l.Ui.Error(fmt.Sprintf(" * %s", t.Name))
}
if err != nil {
l.Ui.Error(err.Error())
l.Ui.Error("\nPlease specify the task.")
return 1
}
@@ -294,3 +280,22 @@ func (l *AllocLogsCommand) followFile(client *api.Client, alloc *api.Allocation,
return r, nil
}
func lookupAllocTask(alloc *api.Allocation) (string, error) {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
return "", fmt.Errorf("Could not find allocation task group: %s", alloc.TaskGroup)
}
if len(tg.Tasks) == 1 {
return tg.Tasks[0].Name, nil
}
var errStr strings.Builder
fmt.Fprintf(&errStr, "Allocation %q is running the following tasks:\n", limit(alloc.ID, shortId))
for _, t := range tg.Tasks {
fmt.Fprintf(&errStr, " * %s\n", t.Name)
}
fmt.Fprintf(&errStr, "\nPlease specify the task.")
return "", errors.New(errStr.String())
}

View File

@@ -128,21 +128,15 @@ func validateTaskExistsInAllocation(taskName string, alloc *api.Allocation) erro
return fmt.Errorf("Could not find allocation task group: %s", alloc.TaskGroup)
}
taskExists := false
foundTaskNames := make([]string, len(tg.Tasks))
for i, task := range tg.Tasks {
foundTaskNames[i] = task.Name
if task.Name == taskName {
taskExists = true
break
return nil
}
}
if !taskExists {
return fmt.Errorf("Could not find task named: %s, found:\n%s", taskName, formatList(foundTaskNames))
}
return nil
return fmt.Errorf("Could not find task named: %s, found:\n%s", taskName, formatList(foundTaskNames))
}
func (a *AllocRestartCommand) Synopsis() string {

View File

@@ -145,6 +145,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"alloc exec": func() (cli.Command, error) {
return &AllocExecCommand{
Meta: meta,
}, nil
},
"alloc signal": func() (cli.Command, error) {
return &AllocSignalCommand{
Meta: meta,
@@ -252,6 +257,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"exec": func() (cli.Command, error) {
return &AllocExecCommand{
Meta: meta,
}, nil
},
"executor": func() (cli.Command, error) {
return &ExecutorPluginCommand{
Meta: meta,