tests: test logs endpoint against pending task

Although the really exciting change is making WaitForRunning return the
allocations that it started. This should cut down test boilerplate
significantly.
This commit is contained in:
Michael Schurter
2018-09-26 17:33:32 -07:00
parent 01f057e35d
commit 4d1a1ac5bb
4 changed files with 170 additions and 225 deletions

View File

@@ -45,7 +45,8 @@ type MockDriverConfig struct {
// StartErrRecoverable marks the error returned is recoverable
StartErrRecoverable bool `mapstructure:"start_error_recoverable"`
// StartBlockFor specifies a duration in which to block before returning
// StartBlockFor specifies a duration in which to block Start before
// returning. Useful for testing the behavior of tasks in pending.
StartBlockFor time.Duration `mapstructure:"start_block_for"`
// KillAfter is the duration after which the mock driver indicates the task

View File

@@ -32,7 +32,7 @@ import (
// tempAllocDir returns a new alloc dir that is rooted in a temp dir. The caller
// should destroy the temp dir.
func tempAllocDir(t testing.TB) *allocdir.AllocDir {
dir, err := ioutil.TempDir("", "")
dir, err := ioutil.TempDir("", "nomadtest")
if err != nil {
t.Fatalf("TempDir() failed: %v", err)
}
@@ -536,68 +536,21 @@ func TestFS_Stream(t *testing.T) {
})
defer c.Shutdown()
// Force an allocation onto the node
expected := "Hello from the other side"
a := mock.Alloc()
a.Job.Type = structs.JobTypeBatch
a.NodeID = c.NodeID()
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"stdout_string": expected,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "2s",
"stdout_string": expected,
}
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {
node, err := s.State().NodeByID(nil, c.NodeID())
if err != nil {
return false, err
}
if node == nil {
return false, fmt.Errorf("unknown node")
}
return node.Status == structs.NodeStatusReady, fmt.Errorf("bad node status")
}, func(err error) {
t.Fatal(err)
})
// Upsert the allocation
state := s.State()
require.Nil(state.UpsertJob(999, a.Job))
require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a}))
// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := state.AllocByID(nil, a.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
if alloc.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
})
// Wait for alloc to be running
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
// Make the request
req := &cstructs.FsStreamRequest{
AllocID: a.ID,
Path: "alloc/logs/web.stdout.0",
AllocID: alloc.ID,
Path: "alloc/logs/worker.stdout.0",
PlainText: true,
QueryOptions: structs.QueryOptions{Region: "global"},
}
@@ -693,72 +646,25 @@ func TestFS_Stream_Follow(t *testing.T) {
})
defer c.Shutdown()
// Force an allocation onto the node
expectedBase := "Hello from the other side"
repeat := 10
a := mock.Alloc()
a.Job.Type = structs.JobTypeBatch
a.NodeID = c.NodeID()
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "20s",
"stdout_string": expectedBase,
"stdout_repeat": repeat,
"stdout_repeat_duration": 200 * time.Millisecond,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "20s",
"stdout_string": expectedBase,
"stdout_repeat": repeat,
"stdout_repeat_duration": 200 * time.Millisecond,
}
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {
node, err := s.State().NodeByID(nil, c.NodeID())
if err != nil {
return false, err
}
if node == nil {
return false, fmt.Errorf("unknown node")
}
return node.Status == structs.NodeStatusReady, fmt.Errorf("bad node status")
}, func(err error) {
t.Fatal(err)
})
// Upsert the allocation
state := s.State()
require.Nil(state.UpsertJob(999, a.Job))
require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a}))
// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := state.AllocByID(nil, a.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
if alloc.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("Alloc on node %q not running: %v", c.NodeID(), err)
})
// Wait for alloc to be running
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
// Make the request
req := &cstructs.FsStreamRequest{
AllocID: a.ID,
Path: "alloc/logs/web.stdout.0",
AllocID: alloc.ID,
Path: "alloc/logs/worker.stdout.0",
PlainText: true,
Follow: true,
QueryOptions: structs.QueryOptions{Region: "global"},
@@ -837,69 +743,22 @@ func TestFS_Stream_Limit(t *testing.T) {
})
defer c.Shutdown()
// Force an allocation onto the node
var limit int64 = 5
full := "Hello from the other side"
expected := full[:limit]
a := mock.Alloc()
a.Job.Type = structs.JobTypeBatch
a.NodeID = c.NodeID()
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"stdout_string": full,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "2s",
"stdout_string": full,
}
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {
node, err := s.State().NodeByID(nil, c.NodeID())
if err != nil {
return false, err
}
if node == nil {
return false, fmt.Errorf("unknown node")
}
return node.Status == structs.NodeStatusReady, fmt.Errorf("bad node status")
}, func(err error) {
t.Fatal(err)
})
// Upsert the allocation
state := s.State()
require.Nil(state.UpsertJob(999, a.Job))
require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a}))
// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := state.AllocByID(nil, a.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
if alloc.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
})
// Wait for alloc to be running
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
// Make the request
req := &cstructs.FsStreamRequest{
AllocID: a.ID,
AllocID: alloc.ID,
Path: "alloc/logs/web.stdout.0",
PlainText: true,
Limit: limit,
@@ -1040,6 +899,116 @@ OUTER:
}
}
// TestFS_Logs_TaskPending asserts that trying to stream logs for tasks which
// have not started returns a 404 error.
func TestFS_Logs_TaskPending(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s := nomad.TestServer(t, nil)
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
c := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer c.Shutdown()
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"start_block_for": "4s",
}
// Register job
args := &structs.JobRegisterRequest{}
args.Job = job
args.WriteRequest.Region = "global"
var jobResp structs.JobRegisterResponse
require.NoError(s.RPC("Job.Register", args, &jobResp))
// Get the allocation ID
var allocID string
testutil.WaitForResult(func() (bool, error) {
args := structs.AllocListRequest{}
args.Region = "global"
resp := structs.AllocListResponse{}
if err := s.RPC("Alloc.List", &args, &resp); err != nil {
return false, err
}
if len(resp.Allocations) != 1 {
return false, fmt.Errorf("expected 1 alloc, found %d", len(resp.Allocations))
}
allocID = resp.Allocations[0].ID
return true, nil
}, func(err error) {
t.Fatalf("error getting alloc id: %v", err)
})
// Make the request
req := &cstructs.FsLogsRequest{
AllocID: allocID,
Task: job.TaskGroups[0].Tasks[0].Name,
LogType: "stdout",
Origin: "start",
PlainText: true,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Get the handler
handler, err := c.StreamingRpcHandler("FileSystem.Logs")
require.Nil(err)
// Create a pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler
go handler(p2)
// Start the decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
for {
select {
case <-time.After(3 * time.Second):
t.Fatal("timeout")
case err := <-errCh:
t.Fatalf("unexpected stream error: %v", err)
case msg := <-streamMsg:
require.NotNil(msg.Error)
require.NotNil(msg.Error.Code)
require.EqualValues(404, *msg.Error.Code)
require.Contains(msg.Error.Message, "not started")
return
}
}
}
func TestFS_Logs_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
@@ -1174,7 +1143,6 @@ func TestFS_Logs(t *testing.T) {
})
defer c.Shutdown()
// Force an allocation onto the node
expected := "Hello from the other side\n"
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
@@ -1183,21 +1151,6 @@ func TestFS_Logs(t *testing.T) {
"stdout_string": expected,
}
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {
node, err := s.State().NodeByID(nil, c.NodeID())
if err != nil {
return false, err
}
if node == nil {
return false, fmt.Errorf("unknown node")
}
return node.Status == structs.NodeStatusReady, fmt.Errorf("bad node status")
}, func(err error) {
t.Fatal(err)
})
// Wait for client to be running job
testutil.WaitForRunning(t, s.RPC, job)
@@ -1291,7 +1244,6 @@ func TestFS_Logs_Follow(t *testing.T) {
})
defer c.Shutdown()
// Force an allocation onto the node
expectedBase := "Hello from the other side\n"
repeat := 10
@@ -1305,19 +1257,11 @@ func TestFS_Logs_Follow(t *testing.T) {
}
// Wait for client to be running job
testutil.WaitForRunning(t, s.RPC, job)
// Get the allocation ID
args := structs.AllocListRequest{}
args.Region = "global"
resp := structs.AllocListResponse{}
require.NoError(s.RPC("Alloc.List", &args, &resp))
require.Len(resp.Allocations, 1)
allocID := resp.Allocations[0].ID
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
// Make the request
req := &cstructs.FsLogsRequest{
AllocID: allocID,
AllocID: alloc.ID,
Task: job.TaskGroups[0].Tasks[0].Name,
LogType: "stdout",
Origin: "start",

View File

@@ -453,7 +453,7 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) {
// Wait for the client to run it
testutil.WaitForResult(func() (bool, error) {
if _, err := s.client.GetClientAlloc(alloc.ID); err != nil {
if _, err := s.client.GetAllocState(alloc.ID); err != nil {
return false, err
}

View File

@@ -87,42 +87,40 @@ func WaitForLeader(t testing.T, rpc rpcFn) {
})
}
// WaitForRunning runs a job and blocks until it is running.
func WaitForRunning(t testing.T, rpc rpcFn, job *structs.Job) {
registered := false
// WaitForRunning runs a job and blocks until all allocs are out of pending.
func WaitForRunning(t testing.T, rpc rpcFn, job *structs.Job) []*structs.AllocListStub {
WaitForResult(func() (bool, error) {
if !registered {
args := &structs.JobRegisterRequest{}
args.Job = job
args.WriteRequest.Region = "global"
var jobResp structs.JobRegisterResponse
err := rpc("Job.Register", args, &jobResp)
if err != nil {
return false, fmt.Errorf("Job.Register error: %v", err)
}
args := &structs.JobRegisterRequest{}
args.Job = job
args.WriteRequest.Region = "global"
var jobResp structs.JobRegisterResponse
err := rpc("Job.Register", args, &jobResp)
return err == nil, fmt.Errorf("Job.Register error: %v", err)
}, func(err error) {
t.Fatalf("error registering job: %v", err)
})
// Only register once
registered = true
}
t.Logf("Job %q registered", job.ID)
args := &structs.JobSummaryRequest{}
var resp structs.JobAllocationsResponse
WaitForResult(func() (bool, error) {
args := &structs.JobSpecificRequest{}
args.JobID = job.ID
args.QueryOptions.Region = "global"
var resp structs.JobSummaryResponse
err := rpc("Job.Summary", args, &resp)
err := rpc("Job.Allocations", args, &resp)
if err != nil {
return false, fmt.Errorf("Job.Summary error: %v", err)
return false, fmt.Errorf("Job.Allocations error: %v", err)
}
tgs := len(job.TaskGroups)
summaries := len(resp.JobSummary.Summary)
if tgs != summaries {
return false, fmt.Errorf("task_groups=%d summaries=%d", tgs, summaries)
if len(resp.Allocations) == 0 {
return false, fmt.Errorf("0 allocations")
}
for tg, summary := range resp.JobSummary.Summary {
if summary.Running == 0 {
return false, fmt.Errorf("task_group=%s %#v", tg, resp.JobSummary.Summary)
for _, alloc := range resp.Allocations {
if alloc.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("alloc not running: id=%v tg=%v status=%v",
alloc.ID, alloc.TaskGroup, alloc.ClientStatus)
}
}
@@ -130,4 +128,6 @@ func WaitForRunning(t testing.T, rpc rpcFn, job *structs.Job) {
}, func(err error) {
t.Fatalf("job not running: %v", err)
})
return resp.Allocations
}