From c76b311d06e2bed028768e605a5fd5e55101c92e Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 31 Jan 2018 13:49:21 -0800 Subject: [PATCH] client tests --- client/fs_endpoint_test.go | 503 ++++++++++++++++++++++++++++ client/rpc.go | 4 +- nomad/client_fs_endpoint_test.go | 4 + nomad/client_stats_endpoint_test.go | 2 + 4 files changed, 511 insertions(+), 2 deletions(-) create mode 100644 client/fs_endpoint_test.go diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go new file mode 100644 index 000000000..1fda336c3 --- /dev/null +++ b/client/fs_endpoint_test.go @@ -0,0 +1,503 @@ +package client + +import ( + "fmt" + "io" + "net" + "strings" + "testing" + "time" + + "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/client/config" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" + "github.com/ugorji/go/codec" +) + +func TestFS_Logs_NoAlloc(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a client + c := TestClient(t, nil) + defer c.Shutdown() + + // Make the request with bad allocation id + req := &cstructs.FsLogsRequest{ + AllocID: uuid.Generate(), + Task: "foo", + LogType: "stdout", + Origin: "start", + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Get the handler + handler, err := c.StreamingRpcHandler("FileSystem.Logs") + require.Nil(err) + + // Create a pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + // Start the handler + go handler(p2) + + // Start the decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // Send the request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(3 * time.Second) + +OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + t.Logf("Got msg %+v", msg) + if msg.Error == nil { + continue + } + + if strings.Contains(msg.Error.Error(), "unknown alloc") { + break OUTER + } else { + t.Fatalf("bad error: %v", err) + } + } + } +} + +func TestFS_Logs_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server + s, root := nomad.TestACLServer(t, nil) + defer s.Shutdown() + testutil.WaitForLeader(t, s.RPC) + + client := TestClient(t, func(c *config.Config) { + c.ACLEnabled = true + c.Servers = []string{s.GetConfig().RPCAddr.String()} + }) + defer client.Shutdown() + + // Create a bad token + policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS}) + tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) + + policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", + []string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS}) + tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood) + + cases := []struct { + Name string + Token string + ExpectedError string + }{ + { + Name: "bad token", + Token: tokenBad.SecretID, + ExpectedError: structs.ErrPermissionDenied.Error(), + }, + { + Name: "good token", + Token: tokenGood.SecretID, + ExpectedError: "unknown allocation", + }, + { + Name: "root token", + Token: root.SecretID, + ExpectedError: "unknown allocation", + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + // Make the request with bad allocation id + req := &cstructs.FsLogsRequest{ + AllocID: uuid.Generate(), + Task: "foo", + LogType: "stdout", + Origin: "start", + QueryOptions: structs.QueryOptions{ + Namespace: structs.DefaultNamespace, + Region: "global", + AuthToken: c.Token, + }, + } + + // Get the handler + handler, err := client.StreamingRpcHandler("FileSystem.Logs") + require.Nil(err) + + // Create a pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + // Start the handler + go handler(p2) + + // Start the decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // Send the request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(5 * time.Second) + + OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error == nil { + continue + } + + if strings.Contains(msg.Error.Error(), c.ExpectedError) { + break OUTER + } else { + t.Fatalf("Bad error: %v", msg.Error) + } + } + } + }) + } +} + +func TestFS_Logs(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and client + s := nomad.TestServer(t, nil) + defer s.Shutdown() + testutil.WaitForLeader(t, s.RPC) + + c := TestClient(t, func(c *config.Config) { + c.Servers = []string{s.GetConfig().RPCAddr.String()} + }) + defer c.Shutdown() + + // Force an allocation onto the node + expected := "Hello from the other side" + a := mock.Alloc() + a.Job.Type = structs.JobTypeBatch + a.NodeID = c.NodeID() + a.Job.TaskGroups[0].Count = 1 + a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ + Name: "web", + Driver: "mock_driver", + Config: map[string]interface{}{ + "run_for": "2s", + "stdout_string": expected, + }, + LogConfig: structs.DefaultLogConfig(), + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + }, + } + + // Wait for the client to connect + testutil.WaitForResult(func() (bool, error) { + node, err := s.State().NodeByID(nil, c.NodeID()) + if err != nil { + return false, err + } + if node == nil { + return false, fmt.Errorf("unknown node") + } + + return node.Status == structs.NodeStatusReady, fmt.Errorf("bad node status") + }, func(err error) { + t.Fatal(err) + }) + + // Upsert the allocation + state := s.State() + require.Nil(state.UpsertJob(999, a.Job)) + require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a})) + + // Wait for the client to run the allocation + testutil.WaitForResult(func() (bool, error) { + alloc, err := state.AllocByID(nil, a.ID) + if err != nil { + return false, err + } + if alloc == nil { + return false, fmt.Errorf("unknown alloc") + } + if alloc.ClientStatus != structs.AllocClientStatusComplete { + return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) + } + + return true, nil + }, func(err error) { + t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) + }) + + // Make the request + req := &cstructs.FsLogsRequest{ + AllocID: a.ID, + Task: a.Job.TaskGroups[0].Tasks[0].Name, + LogType: "stdout", + Origin: "start", + PlainText: true, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Get the handler + handler, err := c.StreamingRpcHandler("FileSystem.Logs") + require.Nil(err) + + // Create a pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + // Start the handler + go handler(p2) + + // Start the decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // Send the request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(3 * time.Second) + received := "" +OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + // Add the payload + received += string(msg.Payload) + if received == expected { + break OUTER + } + } + } +} + +func TestFS_Logs_Follow(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and client + s := nomad.TestServer(t, nil) + defer s.Shutdown() + testutil.WaitForLeader(t, s.RPC) + + c := TestClient(t, func(c *config.Config) { + c.Servers = []string{s.GetConfig().RPCAddr.String()} + }) + defer c.Shutdown() + + // Force an allocation onto the node + expectedBase := "Hello from the other side" + repeat := 10 + + a := mock.Alloc() + a.Job.Type = structs.JobTypeBatch + a.NodeID = c.NodeID() + a.Job.TaskGroups[0].Count = 1 + a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ + Name: "web", + Driver: "mock_driver", + Config: map[string]interface{}{ + "run_for": "20s", + "stdout_string": expectedBase, + "stdout_repeat": repeat, + "stdout_repeat_duration": 200 * time.Millisecond, + }, + LogConfig: structs.DefaultLogConfig(), + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + }, + } + + // Wait for the client to connect + testutil.WaitForResult(func() (bool, error) { + node, err := s.State().NodeByID(nil, c.NodeID()) + if err != nil { + return false, err + } + if node == nil { + return false, fmt.Errorf("unknown node") + } + + return node.Status == structs.NodeStatusReady, fmt.Errorf("bad node status") + }, func(err error) { + t.Fatal(err) + }) + + // Upsert the allocation + state := s.State() + require.Nil(state.UpsertJob(999, a.Job)) + require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a})) + + // Wait for the client to run the allocation + testutil.WaitForResult(func() (bool, error) { + alloc, err := state.AllocByID(nil, a.ID) + if err != nil { + return false, err + } + if alloc == nil { + return false, fmt.Errorf("unknown alloc") + } + if alloc.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) + } + + return true, nil + }, func(err error) { + t.Fatalf("Alloc on node %q not running: %v", c.NodeID(), err) + }) + + // Make the request + req := &cstructs.FsLogsRequest{ + AllocID: a.ID, + Task: a.Job.TaskGroups[0].Tasks[0].Name, + LogType: "stdout", + Origin: "start", + PlainText: true, + Follow: true, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Get the handler + handler, err := s.StreamingRpcHandler("FileSystem.Logs") + require.Nil(err) + + // Create a pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + // Start the handler + go handler(p2) + + // Start the decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // Send the request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(20 * time.Second) + expected := strings.Repeat(expectedBase, repeat+1) + received := "" +OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + // Add the payload + received += string(msg.Payload) + if received == expected { + break OUTER + } + } + } +} diff --git a/client/rpc.go b/client/rpc.go index 632e9b8e3..9ca0e81c3 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -35,9 +35,9 @@ func (c *Client) ClientRPC(method string, args interface{}, reply interface{}) e return codec.Err } -// ClientStreamingRpcHandler is used to make a local, client only streaming RPC +// StreamingRpcHandler is used to make a local, client only streaming RPC // call. -func (c *Client) ClientStreamingRpcHandler(method string) (structs.StreamingRpcHandler, error) { +func (c *Client) StreamingRpcHandler(method string) (structs.StreamingRpcHandler, error) { return c.streamingRpcs.GetHandler(method) } diff --git a/nomad/client_fs_endpoint_test.go b/nomad/client_fs_endpoint_test.go index b488a6dc9..80d7cb4e5 100644 --- a/nomad/client_fs_endpoint_test.go +++ b/nomad/client_fs_endpoint_test.go @@ -214,6 +214,7 @@ func TestClientFS_Logs_Local(t *testing.T) { c := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) + defer c.Shutdown() // Force an allocation onto the node expected := "Hello from the other side" @@ -346,6 +347,7 @@ func TestClientFS_Logs_Local_Follow(t *testing.T) { c := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) + defer c.Shutdown() // Force an allocation onto the node expectedBase := "Hello from the other side" @@ -490,6 +492,7 @@ func TestClientFS_Logs_Remote_Server(t *testing.T) { c := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} }) + defer c.Shutdown() // Force an allocation onto the node expected := "Hello from the other side" @@ -637,6 +640,7 @@ func TestClientFS_Logs_Remote_Region(t *testing.T) { c.Servers = []string{s2.config.RPCAddr.String()} c.Region = "two" }) + defer c.Shutdown() // Force an allocation onto the node expected := "Hello from the other side" diff --git a/nomad/client_stats_endpoint_test.go b/nomad/client_stats_endpoint_test.go index 4bec05d52..898a4f237 100644 --- a/nomad/client_stats_endpoint_test.go +++ b/nomad/client_stats_endpoint_test.go @@ -28,6 +28,7 @@ func TestClientStats_Stats_Local(t *testing.T) { c := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) + defer c.Shutdown() testutil.WaitForResult(func() (bool, error) { nodes := s.connectedNodes() @@ -158,6 +159,7 @@ func TestClientStats_Stats_Remote(t *testing.T) { c := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} }) + defer c.Shutdown() testutil.WaitForResult(func() (bool, error) { nodes := s2.connectedNodes()