e2e: add e2e tests for job submission api (#16841)

* e2e: add e2e tests for job submission api

* e2e: fixup callers of AllocLogs

* fix typo
This commit is contained in:
Seth Hoenig
2023-04-12 08:36:17 -05:00
committed by GitHub
parent 313ab8a681
commit a1ebd075c4
10 changed files with 419 additions and 17 deletions

View File

@@ -67,7 +67,7 @@ func dumpLogs(pluginIDs []string) error {
}
for _, alloc := range allocs {
allocID := alloc["ID"]
out, err := e2e.AllocLogs(allocID, e2e.LogsStdErr)
out, err := e2e.AllocLogs(allocID, "", e2e.LogsStdErr)
if err != nil {
return fmt.Errorf("could not get logs for alloc: %v\n%s", err, out)
}

View File

@@ -7,12 +7,16 @@ import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
"testing"
"time"
api "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
)
// AllocsByName sorts allocs by Name
@@ -65,11 +69,38 @@ func WaitForAllocStatusComparison(query func() ([]string, error), comparison fun
return err
}
// SingleAllocID returns the ID for the first allocation found for jobID in namespace
// at the specified job version number. Will retry for ten seconds before returning
// an error.
//
// Should only be used with jobs containing a single task group.
func SingleAllocID(t *testing.T, jobID, namespace string, version int) string {
var id string
f := func() error {
allocations, err := AllocsForJob(jobID, namespace)
if err != nil {
return err
}
for _, m := range allocations {
if m["Version"] == strconv.Itoa(version) {
id = m["ID"]
return nil
}
}
return fmt.Errorf("id not found for %s/%s/%d", namespace, jobID, version)
}
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(f),
wait.Timeout(10*time.Second),
wait.Gap(1*time.Second),
))
return id
}
// AllocsForJob returns a slice of key->value maps, each describing the values
// of the 'nomad job status' Allocations section (not actual
// structs.Allocation objects, query the API if you want those)
func AllocsForJob(jobID, ns string) ([]map[string]string, error) {
var nsArg = []string{}
if ns != "" {
nsArg = []string{"-namespace", ns}
@@ -234,11 +265,14 @@ const (
LogsStdOut
)
func AllocLogs(allocID string, logStream LogStream) (string, error) {
func AllocLogs(allocID, namespace string, logStream LogStream) (string, error) {
cmd := []string{"nomad", "alloc", "logs"}
if logStream == LogsStdErr {
cmd = append(cmd, "-stderr")
}
if namespace != "" {
cmd = append(cmd, "-namespace", namespace)
}
cmd = append(cmd, allocID)
return Command(cmd[0], cmd[1:]...)
}

View File

@@ -21,9 +21,18 @@ import (
// Register registers a jobspec from a file but with a unique ID.
// The caller is responsible for recording that ID for later cleanup.
func Register(jobID, jobFilePath string) error {
_, err := RegisterGetOutput(jobID, jobFilePath)
return err
}
// RegisterGetOutput registers a jobspec from a file but with a unique ID.
// The caller is responsible for recording that ID for later cleanup.
// Also returns the CLI output from running 'job run'.
func RegisterGetOutput(jobID, jobFilePath string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
return execCmd(jobID, jobFilePath, exec.CommandContext(ctx, "nomad", "job", "run", "-detach", "-"))
b, err := execCmd(jobID, jobFilePath, exec.CommandContext(ctx, "nomad", "job", "run", "-detach", "-"))
return string(b), err
}
// RegisterWithArgs registers a jobspec from a file but with a unique ID. The
@@ -36,7 +45,8 @@ func RegisterWithArgs(jobID, jobFilePath string, args ...string) error {
baseArgs = append(baseArgs, "-")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
return execCmd(jobID, jobFilePath, exec.CommandContext(ctx, "nomad", baseArgs...))
_, err := execCmd(jobID, jobFilePath, exec.CommandContext(ctx, "nomad", baseArgs...))
return err
}
// Revert reverts the job to the given version.
@@ -44,18 +54,19 @@ func Revert(jobID, jobFilePath string, version int) error {
args := []string{"job", "revert", "-detach", jobID, strconv.Itoa(version)}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
return execCmd(jobID, jobFilePath, exec.CommandContext(ctx, "nomad", args...))
_, err := execCmd(jobID, jobFilePath, exec.CommandContext(ctx, "nomad", args...))
return err
}
func execCmd(jobID, jobFilePath string, cmd *exec.Cmd) error {
func execCmd(jobID, jobFilePath string, cmd *exec.Cmd) ([]byte, error) {
stdin, err := cmd.StdinPipe()
if err != nil {
return fmt.Errorf("could not open stdin?: %w", err)
return nil, fmt.Errorf("could not open stdin?: %w", err)
}
content, err := os.ReadFile(jobFilePath)
if err != nil {
return fmt.Errorf("could not open job file: %w", err)
return nil, fmt.Errorf("could not open job file: %w", err)
}
// hack off the job block to replace with our unique ID
@@ -72,9 +83,9 @@ func execCmd(jobID, jobFilePath string, cmd *exec.Cmd) error {
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("could not register job: %w\n%v", err, string(out))
return out, fmt.Errorf("could not register job: %w\n%v", err, string(out))
}
return nil
return out, nil
}
// PeriodicForce forces a periodic job to dispatch
@@ -258,6 +269,20 @@ func CleanupJobsAndGC(t *testing.T, jobIDs *[]string) func() {
}
}
// MaybeCleanupJobsAndGC stops and purges the list of jobIDs and runs a
// system gc. Returns a func so that the return value can be used
// in t.Cleanup. Similar to CleanupJobsAndGC, but this one does not assert
// on a successful stop and gc, which is useful for tests that want to stop and
// gc the jobs themselves but we want a backup Cleanup just in case.
func MaybeCleanupJobsAndGC(jobIDs *[]string) func() {
return func() {
for _, jobID := range *jobIDs {
_ = StopJob(jobID, "-purge", "-detach")
}
_, _ = Command("nomad", "system", "gc")
}
}
// CleanupJobsAndGCWithContext stops and purges the list of jobIDs and runs a
// system gc. The passed context allows callers to cancel the execution of the
// cleanup as they desire. This is useful for tests which attempt to remove the

View File

@@ -39,7 +39,7 @@ func testExecUsesChroot(t *testing.T) {
e2eutil.WaitForAllocsStopped(t, nomad, []string{allocID})
// assert log contents
logs, err := e2eutil.AllocLogs(allocID, e2eutil.LogsStdOut)
logs, err := e2eutil.AllocLogs(allocID, "", e2eutil.LogsStdOut)
must.NoError(t, err)
must.RegexMatch(t, regexp.MustCompile(`(?m:^/alloc\b)`), logs)
must.RegexMatch(t, regexp.MustCompile(`(?m:^/local\b)`), logs)
@@ -63,7 +63,7 @@ func testImageUsesChroot(t *testing.T) {
e2eutil.WaitForAllocsStopped(t, nomad, []string{allocID})
// assert log contents
logs, err := e2eutil.AllocLogs(allocID, e2eutil.LogsStdOut)
logs, err := e2eutil.AllocLogs(allocID, "", e2eutil.LogsStdOut)
must.NoError(t, err)
must.RegexMatch(t, regexp.MustCompile(`(?m:^/alloc\b)`), logs)
must.RegexMatch(t, regexp.MustCompile(`(?m:^/local\b)`), logs)

View File

@@ -63,7 +63,7 @@ func (tc *PIDsNamespacing) TestIsolation_ExecDriver_PIDNamespacing(f *framework.
allocID := allocs[0].ID
e2eutil.WaitForAllocStopped(t, tc.Nomad(), allocID)
out, err := e2eutil.AllocLogs(allocID, e2eutil.LogsStdOut)
out, err := e2eutil.AllocLogs(allocID, "", e2eutil.LogsStdOut)
require.NoError(t, err, fmt.Sprintf("could not get logs for alloc %s", allocID))
require.Contains(t, out, "my pid is 1\n")
@@ -93,7 +93,7 @@ func (tc *PIDsNamespacing) TestIsolation_ExecDriver_PIDNamespacing_host(f *frame
allocID := allocs[0].ID
e2eutil.WaitForAllocStopped(t, tc.Nomad(), allocID)
out, err := e2eutil.AllocLogs(allocID, e2eutil.LogsStdOut)
out, err := e2eutil.AllocLogs(allocID, "", e2eutil.LogsStdOut)
require.NoError(t, err, fmt.Sprintf("could not get logs for alloc %s", allocID))
require.NotContains(t, out, "my pid is 1\n")
@@ -293,7 +293,7 @@ func (tc *PIDsNamespacing) TestIsolation_RawExecDriver_NoPIDNamespacing(f *frame
allocID := allocs[0].ID
e2eutil.WaitForAllocStopped(t, tc.Nomad(), allocID)
out, err := e2eutil.AllocLogs(allocID, e2eutil.LogsStdOut)
out, err := e2eutil.AllocLogs(allocID, "", e2eutil.LogsStdOut)
require.NoError(t, err, fmt.Sprintf("could not get logs for alloc %s", allocID))
var pid uint64

View File

@@ -0,0 +1,3 @@
// Package jobsubmissions contains e2e tests related to the /v1/job/<id>/submission
// HTTP API endpoint and related components.
package jobsubmissions

View File

@@ -0,0 +1,27 @@
job "huge" {
type = "batch"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
meta {
key = "REPLACE"
}
group "group" {
task "task" {
driver = "raw_exec"
config {
command = "/usr/bin/false"
}
resources {
cpu = 10
memory = 16
}
}
}
}

View File

@@ -0,0 +1,36 @@
variable "X" {
type = string
}
variable "Y" {
type = number
}
variable "Z" {
type = bool
}
job "xyz" {
type = "batch"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
group "group" {
task "task" {
driver = "raw_exec"
config {
command = "echo"
args = ["X ${var.X}, Y ${var.Y}, Z ${var.Z}"]
}
resources {
cpu = 10
memory = 16
}
}
}
}

View File

@@ -0,0 +1,277 @@
package jobsubmissions
import (
"os"
"path/filepath"
"strings"
"testing"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/e2e/acl"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/shoenig/test/must"
)
func TestJobSubmissionAPI(t *testing.T) {
nomad := e2eutil.NomadClient(t)
e2eutil.WaitForLeader(t, nomad)
e2eutil.WaitForNodesReady(t, nomad, 1)
t.Run("testParseAPI", testParseAPI)
t.Run("testRunCLIVarFlags", testRunCLIVarFlags)
t.Run("testSubmissionACL", testSubmissionACL)
t.Run("testMaxSize", testMaxSize)
}
func testParseAPI(t *testing.T) {
nomad := e2eutil.NomadClient(t)
jobID := "job-sub-parse-" + uuid.Short()
jobIDs := []string{jobID}
t.Cleanup(e2eutil.MaybeCleanupJobsAndGC(&jobIDs))
spec, err := os.ReadFile("input/xyz.hcl")
must.NoError(t, err)
job, err := nomad.Jobs().ParseHCLOpts(&api.JobsParseRequest{
JobHCL: string(spec),
HCLv1: false,
Variables: "X=\"baz\" \n Y=50 \n Z=true \n",
Canonicalize: true,
})
must.NoError(t, err)
args := job.TaskGroups[0].Tasks[0].Config["args"]
must.Eq(t, []any{"X baz, Y 50, Z true"}, args.([]any))
}
func testRunCLIVarFlags(t *testing.T) {
nomad := e2eutil.NomadClient(t)
jobID := "job-sub-cli-" + uuid.Short()
jobIDs := []string{jobID}
t.Cleanup(e2eutil.MaybeCleanupJobsAndGC(&jobIDs))
// register job via cli with var arguments
err := e2eutil.RegisterWithArgs(jobID, "input/xyz.hcl", "-var=X=foo", "-var=Y=42", "-var=Z=true")
must.NoError(t, err)
// find our alloc id
allocID := e2eutil.SingleAllocID(t, jobID, "default", 0)
// wait for alloc to complete
_ = e2eutil.WaitForAllocStopped(t, nomad, allocID)
// inspect alloc logs making sure our variables got set
out, err := e2eutil.AllocLogs(allocID, "", e2eutil.LogsStdOut)
must.NoError(t, err)
must.Eq(t, "X foo, Y 42, Z true\n", out)
// check the submission api
sub, _, err := nomad.Jobs().Submission(jobID, 0, &api.QueryOptions{
Region: "global",
Namespace: "default",
})
must.NoError(t, err)
must.Eq(t, "hcl2", sub.Format)
must.NotEq(t, "", sub.Source)
must.Eq(t, map[string]string{"X": "foo", "Y": "42", "Z": "true"}, sub.VariableFlags)
must.Eq(t, "", sub.Variables)
// register job again with different var arguments
err = e2eutil.RegisterWithArgs(jobID, "input/xyz.hcl", "-var=X=bar", "-var=Y=99", "-var=Z=false")
must.NoError(t, err)
// find our alloc id
allocID = e2eutil.SingleAllocID(t, jobID, "default", 1)
// wait for alloc to complete
_ = e2eutil.WaitForAllocStopped(t, nomad, allocID)
// inspect alloc logs making sure our new variables got set
out, err = e2eutil.AllocLogs(allocID, "", e2eutil.LogsStdOut)
must.NoError(t, err)
must.Eq(t, "X bar, Y 99, Z false\n", out)
// check the submission api for v1
sub, _, err = nomad.Jobs().Submission(jobID, 1, &api.QueryOptions{
Region: "global",
Namespace: "default",
})
must.NoError(t, err)
must.Eq(t, "hcl2", sub.Format)
must.NotEq(t, "", sub.Source)
must.Eq(t, map[string]string{"X": "bar", "Y": "99", "Z": "false"}, sub.VariableFlags)
must.Eq(t, "", sub.Variables)
// check the submission api for v0 (make sure we still have it)
sub, _, err = nomad.Jobs().Submission(jobID, 0, &api.QueryOptions{
Region: "global",
Namespace: "default",
})
must.NoError(t, err)
must.Eq(t, "hcl2", sub.Format)
must.NotEq(t, "", sub.Source)
must.Eq(t, map[string]string{
"X": "foo",
"Y": "42",
"Z": "true",
}, sub.VariableFlags)
must.Eq(t, "", sub.Variables)
// deregister the job with purge
e2eutil.WaitForJobStopped(t, nomad, jobID)
// check the submission api for v0 after deregister (make sure its gone)
sub, _, err = nomad.Jobs().Submission(jobID, 0, &api.QueryOptions{
Region: "global",
Namespace: "default",
})
must.ErrorContains(t, err, "job source not found")
must.Nil(t, sub)
}
func testSubmissionACL(t *testing.T) {
nomad := e2eutil.NomadClient(t)
// setup an acl cleanup thing
aclCleanup := acl.NewCleanup()
defer aclCleanup.Run(t, nomad)
// create a namespace for ourselves
myNamespaceName := "submission-acl-" + uuid.Short()
namespaceClient := nomad.Namespaces()
_, err := namespaceClient.Register(&api.Namespace{
Name: myNamespaceName,
}, &api.WriteOptions{
Region: "global",
})
must.NoError(t, err)
aclCleanup.Add(myNamespaceName, acl.NamespaceTestResourceType)
// create a namespace for a token that will be blocked
otherNamespaceName := "submission-other-acl-" + uuid.Short()
_, err = namespaceClient.Register(&api.Namespace{
Name: otherNamespaceName,
}, &api.WriteOptions{
Region: "global",
})
must.NoError(t, err)
aclCleanup.Add(otherNamespaceName, acl.NamespaceTestResourceType)
// create an ACL policy to read in our namespace
myNamespacePolicy := api.ACLPolicy{
Name: "submission-acl-" + uuid.Short(),
Rules: `namespace "` + myNamespaceName + `" {policy = "write"}`,
Description: "This namespace is for Job Submissions e2e testing",
}
_, err = nomad.ACLPolicies().Upsert(&myNamespacePolicy, nil)
must.NoError(t, err)
aclCleanup.Add(myNamespacePolicy.Name, acl.ACLPolicyTestResourceType)
// create an ACL policy to read in the other namespace
otherNamespacePolicy := api.ACLPolicy{
Name: "submission-other-acl-" + uuid.Short(),
Rules: `namespace "` + otherNamespaceName + `" {policy = "read"}`,
Description: "This is another namespace for Job Submissions e2e testing",
}
_, err = nomad.ACLPolicies().Upsert(&otherNamespacePolicy, nil)
must.NoError(t, err)
aclCleanup.Add(otherNamespacePolicy.Name, acl.ACLPolicyTestResourceType)
// create a token that can read in our namespace
aclTokensClient := nomad.ACLTokens()
myToken, _, err := aclTokensClient.Create(&api.ACLToken{
Name: "submission-my-read-token-" + uuid.Short(),
Type: "client",
Policies: []string{myNamespacePolicy.Name},
}, &api.WriteOptions{
Region: "global",
Namespace: myNamespaceName,
})
must.NoError(t, err)
aclCleanup.Add(myToken.AccessorID, acl.ACLTokenTestResourceType)
// create a token that can read in the other namespace
otherToken, _, err := aclTokensClient.Create(&api.ACLToken{
Name: "submission-other-read-token-" + uuid.Short(),
Type: "client",
Policies: []string{otherNamespacePolicy.Name},
}, &api.WriteOptions{
Region: "global",
Namespace: otherNamespaceName,
})
must.NoError(t, err)
aclCleanup.Add(otherToken.AccessorID, acl.ACLTokenTestResourceType)
// prepare to submit a job
jobID := "job-sub-cli-" + uuid.Short()
jobIDs := []string{jobID}
t.Cleanup(e2eutil.MaybeCleanupJobsAndGC(&jobIDs))
// register job via cli with var arguments (using management token)
err = e2eutil.RegisterWithArgs(jobID, "input/xyz.hcl", "-namespace", myNamespaceName, "-var=X=foo", "-var=Y=42", "-var=Z=true")
must.NoError(t, err)
// find our alloc id
allocID := e2eutil.SingleAllocID(t, jobID, myNamespaceName, 0)
// wait for alloc to complete
_ = e2eutil.WaitForAllocStopped(t, nomad, allocID)
// inspect alloc logs making sure our variables got set
out, err := e2eutil.AllocLogs(allocID, myNamespaceName, e2eutil.LogsStdOut)
must.NoError(t, err)
must.Eq(t, "X foo, Y 42, Z true\n", out)
// get submission using my token
sub, _, err := nomad.Jobs().Submission(jobID, 0, &api.QueryOptions{
Region: "global",
Namespace: myNamespaceName,
AuthToken: myToken.SecretID,
})
must.NoError(t, err)
must.Eq(t, "hcl2", sub.Format)
must.NotEq(t, "", sub.Source)
must.Eq(t, map[string]string{"X": "foo", "Y": "42", "Z": "true"}, sub.VariableFlags)
must.Eq(t, "", sub.Variables)
// get submission using other token (fail)
sub, _, err = nomad.Jobs().Submission(jobID, 0, &api.QueryOptions{
Region: "global",
Namespace: myNamespaceName,
AuthToken: otherToken.SecretID,
})
must.ErrorContains(t, err, "Permission denied")
must.Nil(t, sub)
}
func testMaxSize(t *testing.T) {
jobID := "job-sub-max-" + uuid.Short()
jobIDs := []string{jobID}
t.Cleanup(e2eutil.MaybeCleanupJobsAndGC(&jobIDs))
// modify huge.hcl to exceed the default 1 megabyte limit
b, err := os.ReadFile("input/huge.hcl")
must.NoError(t, err)
huge := strings.Replace(string(b), "REPLACE", strings.Repeat("A", 2e6), 1)
tmpDir := t.TempDir()
hugeFile := filepath.Join(tmpDir, "huge.hcl")
err = os.WriteFile(hugeFile, []byte(huge), 0o644)
must.NoError(t, err)
// register the huge job file and expect a warning
output, err := e2eutil.RegisterGetOutput(jobID, hugeFile)
must.NoError(t, err)
must.StrContains(t, output, "job source size of 2.0 MB exceeds maximum of 1.0 MB and will be discarded")
// check the submission api making sure it is not there
nomad := e2eutil.NomadClient(t)
sub, _, err := nomad.Jobs().Submission(jobID, 0, &api.QueryOptions{
Region: "global",
Namespace: "default",
})
must.ErrorContains(t, err, "job source not found")
must.Nil(t, sub)
}

View File

@@ -75,7 +75,7 @@ func (tc *OversubscriptionTest) TestDocker(f *framework.F) {
alloc := tc.runTest(f, "oversubscription-docker-", "docker.nomad")
// check that cgroup reports the memoryMaxMB as the limit within he container
stdout, err := e2eutil.AllocLogs(alloc.ID, e2eutil.LogsStdOut)
stdout, err := e2eutil.AllocLogs(alloc.ID, "", e2eutil.LogsStdOut)
f.NoError(err)
f.Equal(fmt.Sprintf("%d\n", 30*1024*1024), stdout)
}