From 7e5a30d5ae04fa0f2916f8410d8f1ad5259fa8ad Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 31 Jan 2018 17:35:21 -0800 Subject: [PATCH] Agent logs --- client/fs_endpoint.go | 6 +- client/fs_endpoint_test.go | 207 +++++++++++++ client/rpc.go | 82 +++++ command/agent/fs_endpoint.go | 141 ++++----- command/agent/fs_endpoint_test.go | 481 ++++-------------------------- demo/vagrant/client1.hcl | 4 + demo/vagrant/client2.hcl | 4 + nomad/client_fs_endpoint.go | 4 +- nomad/client_rpc.go | 22 -- nomad/structs/streaming_rpc.go | 21 ++ 10 files changed, 437 insertions(+), 535 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 4b73461c2..47629c787 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -211,7 +211,11 @@ func (f *FileSystem) Logs(conn io.ReadWriteCloser) { cancel() return } - errCh <- err + select { + case errCh <- err: + case <-ctx.Done(): + return + } } } }() diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index 1fda336c3..38d19f491 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -3,12 +3,14 @@ package client import ( "fmt" "io" + "math" "net" "strings" "testing" "time" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/uuid" @@ -501,3 +503,208 @@ OUTER: } } } + +func TestFS_findClosest(t *testing.T) { + task := "foo" + entries := []*allocdir.AllocFileInfo{ + { + Name: "foo.stdout.0", + Size: 100, + }, + { + Name: "foo.stdout.1", + Size: 100, + }, + { + Name: "foo.stdout.2", + Size: 100, + }, + { + Name: "foo.stdout.3", + Size: 100, + }, + { + Name: "foo.stderr.0", + Size: 100, + }, + { + Name: "foo.stderr.1", + Size: 100, + }, + { + Name: "foo.stderr.2", + Size: 100, + }, + } + + cases := []struct { + Entries []*allocdir.AllocFileInfo + DesiredIdx int64 + DesiredOffset int64 + Task string + LogType string + ExpectedFile string + ExpectedIdx int64 + ExpectedOffset int64 + Error bool + }{ + // Test error cases + { + Entries: nil, + DesiredIdx: 0, + Task: task, + LogType: "stdout", + Error: true, + }, + { + Entries: entries[0:3], + DesiredIdx: 0, + Task: task, + LogType: "stderr", + Error: true, + }, + + // Test beginning cases + { + Entries: entries, + DesiredIdx: 0, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + }, + { + // Desired offset should be ignored at edges + Entries: entries, + DesiredIdx: 0, + DesiredOffset: -100, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + ExpectedOffset: 0, + }, + { + // Desired offset should be ignored at edges + Entries: entries, + DesiredIdx: 1, + DesiredOffset: -1000, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + ExpectedOffset: 0, + }, + { + Entries: entries, + DesiredIdx: 0, + Task: task, + LogType: "stderr", + ExpectedFile: entries[4].Name, + ExpectedIdx: 0, + }, + { + Entries: entries, + DesiredIdx: 0, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + }, + + // Test middle cases + { + Entries: entries, + DesiredIdx: 1, + Task: task, + LogType: "stdout", + ExpectedFile: entries[1].Name, + ExpectedIdx: 1, + }, + { + Entries: entries, + DesiredIdx: 1, + DesiredOffset: 10, + Task: task, + LogType: "stdout", + ExpectedFile: entries[1].Name, + ExpectedIdx: 1, + ExpectedOffset: 10, + }, + { + Entries: entries, + DesiredIdx: 1, + DesiredOffset: 110, + Task: task, + LogType: "stdout", + ExpectedFile: entries[2].Name, + ExpectedIdx: 2, + ExpectedOffset: 10, + }, + { + Entries: entries, + DesiredIdx: 1, + Task: task, + LogType: "stderr", + ExpectedFile: entries[5].Name, + ExpectedIdx: 1, + }, + // Test end cases + { + Entries: entries, + DesiredIdx: math.MaxInt64, + Task: task, + LogType: "stdout", + ExpectedFile: entries[3].Name, + ExpectedIdx: 3, + }, + { + Entries: entries, + DesiredIdx: math.MaxInt64, + DesiredOffset: math.MaxInt64, + Task: task, + LogType: "stdout", + ExpectedFile: entries[3].Name, + ExpectedIdx: 3, + ExpectedOffset: 100, + }, + { + Entries: entries, + DesiredIdx: math.MaxInt64, + DesiredOffset: -10, + Task: task, + LogType: "stdout", + ExpectedFile: entries[3].Name, + ExpectedIdx: 3, + ExpectedOffset: 90, + }, + { + Entries: entries, + DesiredIdx: math.MaxInt64, + Task: task, + LogType: "stderr", + ExpectedFile: entries[6].Name, + ExpectedIdx: 2, + }, + } + + for i, c := range cases { + entry, idx, offset, err := findClosest(c.Entries, c.DesiredIdx, c.DesiredOffset, c.Task, c.LogType) + if err != nil { + if !c.Error { + t.Fatalf("case %d: Unexpected error: %v", i, err) + } + continue + } + + if entry.Name != c.ExpectedFile { + t.Fatalf("case %d: Got file %q; want %q", i, entry.Name, c.ExpectedFile) + } + if idx != c.ExpectedIdx { + t.Fatalf("case %d: Got index %d; want %d", i, idx, c.ExpectedIdx) + } + if offset != c.ExpectedOffset { + t.Fatalf("case %d: Got offset %d; want %d", i, offset, c.ExpectedOffset) + } + } +} diff --git a/client/rpc.go b/client/rpc.go index 9ca0e81c3..475ce9d86 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -9,6 +9,7 @@ import ( metrics "github.com/armon/go-metrics" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/nomad/client/servers" inmem "github.com/hashicorp/nomad/helper/codec" "github.com/hashicorp/nomad/helper/pool" "github.com/hashicorp/nomad/nomad/structs" @@ -104,6 +105,87 @@ func canRetry(args interface{}, err error) bool { return false } +// RemoteStreamingRpcHandler is used to make a streaming RPC call to a remote +// server. +func (c *Client) RemoteStreamingRpcHandler(method string) (structs.StreamingRpcHandler, error) { + server := c.servers.FindServer() + if server == nil { + return nil, noServersErr + } + + conn, err := c.streamingRpcConn(server, method) + if err != nil { + // Move off to another server + c.logger.Printf("[ERR] nomad: %q RPC failed to server %s: %v", method, server.Addr, err) + c.servers.NotifyFailedServer(server) + return nil, err + } + + return bridgedStreamingRpcHandler(conn), nil +} + +// bridgedStreamingRpcHandler creates a bridged streaming RPC handler by copying +// data between the two sides. +func bridgedStreamingRpcHandler(sideA io.ReadWriteCloser) structs.StreamingRpcHandler { + return func(sideB io.ReadWriteCloser) { + defer sideA.Close() + defer sideB.Close() + structs.Bridge(sideA, sideB) + } +} + +// streamingRpcConn is used to retrieve a connection to a server to conduct a +// streaming RPC. +func (c *Client) streamingRpcConn(server *servers.Server, method string) (net.Conn, error) { + // Dial the server + conn, err := net.DialTimeout("tcp", server.Addr.String(), 10*time.Second) + if err != nil { + return nil, err + } + + // Cast to TCPConn + if tcp, ok := conn.(*net.TCPConn); ok { + tcp.SetKeepAlive(true) + tcp.SetNoDelay(true) + } + + // TODO TLS + // Check if TLS is enabled + //if p.tlsWrap != nil { + //// Switch the connection into TLS mode + //if _, err := conn.Write([]byte{byte(RpcTLS)}); err != nil { + //conn.Close() + //return nil, err + //} + + //// Wrap the connection in a TLS client + //tlsConn, err := p.tlsWrap(region, conn) + //if err != nil { + //conn.Close() + //return nil, err + //} + //conn = tlsConn + //} + + // Write the multiplex byte to set the mode + if _, err := conn.Write([]byte{byte(pool.RpcStreaming)}); err != nil { + conn.Close() + return nil, err + } + + // Send the header + encoder := codec.NewEncoder(conn, structs.MsgpackHandle) + header := structs.StreamingRpcHeader{ + Method: method, + } + if err := encoder.Encode(header); err != nil { + conn.Close() + return nil, err + } + + return conn, nil +} + // setupClientRpc is used to setup the Client's RPC endpoints func (c *Client) setupClientRpc() { // Initialize the RPC handlers diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 842200dc0..dd2028bfb 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -11,9 +11,9 @@ import ( "net/http" "strconv" "strings" - "time" "github.com/docker/docker/pkg/ioutils" + "github.com/hashicorp/nomad/acl" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" "github.com/ugorji/go/codec" @@ -28,73 +28,42 @@ var ( invalidOrigin = fmt.Errorf("origin must be start or end") ) -const ( - // streamFrameSize is the maximum number of bytes to send in a single frame - streamFrameSize = 64 * 1024 - - // streamHeartbeatRate is the rate at which a heartbeat will occur to detect - // a closed connection without sending any additional data - streamHeartbeatRate = 1 * time.Second - - // streamBatchWindow is the window in which file content is batched before - // being flushed if the frame size has not been hit. - streamBatchWindow = 200 * time.Millisecond - - // nextLogCheckRate is the rate at which we check for a log entry greater - // than what we are watching for. This is to handle the case in which logs - // rotate faster than we can detect and we have to rely on a normal - // directory listing. - nextLogCheckRate = 100 * time.Millisecond - - // deleteEvent and truncateEvent are the file events that can be sent in a - // StreamFrame - deleteEvent = "file deleted" - truncateEvent = "file truncated" - - // OriginStart and OriginEnd are the available parameters for the origin - // argument when streaming a file. They respectively offset from the start - // and end of a file. - OriginStart = "start" - OriginEnd = "end" -) - func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - //if s.agent.client == nil { - //return nil, clientNotRunning - //} - var secret string - s.parseToken(req, &secret) - var namespace string + s.parseToken(req, &secret) parseNamespace(req, &namespace) - //aclObj, err := s.agent.Client().ResolveToken(secret) - //if err != nil { - //return nil, err - //} + var aclObj *acl.ACL + if s.agent.client != nil { + var err error + aclObj, err = s.agent.Client().ResolveToken(secret) + if err != nil { + return nil, err + } + } path := strings.TrimPrefix(req.URL.Path, "/v1/client/fs/") switch { case strings.HasPrefix(path, "ls/"): - //if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { - //return nil, structs.ErrPermissionDenied - //} + if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { + return nil, structs.ErrPermissionDenied + } return s.DirectoryListRequest(resp, req) case strings.HasPrefix(path, "stat/"): - //if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { - //return nil, structs.ErrPermissionDenied - //} + if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { + return nil, structs.ErrPermissionDenied + } return s.FileStatRequest(resp, req) case strings.HasPrefix(path, "readat/"): - //if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { - //return nil, structs.ErrPermissionDenied - //} + if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { + return nil, structs.ErrPermissionDenied + } return s.FileReadAtRequest(resp, req) case strings.HasPrefix(path, "cat/"): - //if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { - //return nil, structs.ErrPermissionDenied - //} + if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { + return nil, structs.ErrPermissionDenied + } return s.FileCatRequest(resp, req) //case strings.HasPrefix(path, "stream/"): //if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { @@ -102,14 +71,6 @@ func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (int //} //return s.Stream(resp, req) case strings.HasPrefix(path, "logs/"): - // Logs can be accessed with ReadFS or ReadLogs caps - //if aclObj != nil { - //readfs := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) - //logs := aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadLogs) - //if !readfs && !logs { - //return nil, structs.ErrPermissionDenied - //} - //} return s.Logs(resp, req) default: return nil, CodedError(404, ErrInvalidMethod) @@ -504,11 +465,38 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac // Create an output that gets flushed on every write output := ioutils.NewWriteFlusher(resp) - // TODO make work for both - // Get the client's handler - handler, err := s.agent.Server().StreamingRpcHandler("FileSystem.Logs") - if err != nil { - return nil, err + localClient := s.agent.Client() + localServer := s.agent.Server() + + // See if the local client can handle the request. + localAlloc := false + if localClient != nil { + _, err := localClient.GetClientAlloc(allocID) + if err == nil { + localAlloc = true + } + } + + // Only use the client RPC to server if we don't have a server and the local + // client can't handle the call. + useClientRPC := localClient != nil && !localAlloc && localServer == nil + + // Use the server as a last case. + useServerRPC := localServer != nil + + // Get the correct handler + var handler structs.StreamingRpcHandler + var handlerErr error + if localAlloc { + handler, handlerErr = localClient.StreamingRpcHandler("FileSystem.Logs") + } else if useClientRPC { + handler, handlerErr = localClient.RemoteStreamingRpcHandler("FileSystem.Logs") + } else if useServerRPC { + handler, handlerErr = localServer.StreamingRpcHandler("FileSystem.Logs") + } + + if handlerErr != nil { + return nil, CodedError(500, handlerErr.Error()) } // Create the request arguments @@ -521,7 +509,7 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac PlainText: plain, Follow: follow, } - s.parseToken(req, &fsReq.QueryOptions.AuthToken) + s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions) p1, p2 := net.Pipe() decoder := codec.NewDecoder(p1, structs.MsgpackHandle) @@ -532,7 +520,6 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac go func() { <-ctx.Done() p1.Close() - s.logger.Printf("--------- HTTP: Request finished. Closing pipes") }() // Create a channel that decodes the results @@ -550,19 +537,16 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac case <-ctx.Done(): errCh <- nil cancel() - s.logger.Printf("--------- HTTP: Exitting frame copier") return default: } var res cstructs.StreamErrWrapper if err := decoder.Decode(&res); err != nil { - //errCh <- CodedError(500, err.Error()) - errCh <- CodedError(501, err.Error()) + errCh <- CodedError(500, err.Error()) cancel() return } - s.logger.Printf("--------- HTTP: Decoded stream wrapper") if err := res.Error; err != nil { if err.Code != nil { @@ -572,14 +556,10 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac } } - s.logger.Printf("--------- HTTP: Copying payload of size: %d", len(res.Payload)) - if n, err := io.Copy(output, bytes.NewBuffer(res.Payload)); err != nil { - //errCh <- CodedError(500, err.Error()) - errCh <- CodedError(502, err.Error()) + if _, err := io.Copy(output, bytes.NewBuffer(res.Payload)); err != nil { + errCh <- CodedError(500, err.Error()) cancel() return - } else { - s.logger.Printf("--------- HTTP: Copied payload: %d bytes", n) } } }() @@ -587,7 +567,10 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac handler(p2) cancel() codedErr := <-errCh - if codedErr != nil && (codedErr == io.EOF || strings.Contains(codedErr.Error(), "closed")) { + if codedErr != nil && + (codedErr == io.EOF || + strings.Contains(codedErr.Error(), "closed") || + strings.Contains(codedErr.Error(), "EOF")) { codedErr = nil } return nil, codedErr diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index 63fc929a2..5d0b07e49 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -2,7 +2,6 @@ package agent import ( "fmt" - "io" "net/http" "net/http/httptest" "testing" @@ -10,19 +9,9 @@ import ( "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -type WriteCloseChecker struct { - io.WriteCloser - Closed bool -} - -func (w *WriteCloseChecker) Close() error { - w.Closed = true - return w.WriteCloser.Close() -} - func TestAllocDirFS_List_MissingParams(t *testing.T) { t.Parallel() httpTest(t, nil, func(s *TestAgent) { @@ -107,124 +96,65 @@ func TestAllocDirFS_ReadAt_MissingParams(t *testing.T) { func TestAllocDirFS_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) - for _, endpoint := range []string{"ls", "stat", "readat", "cat", "stream"} { - httpACLTest(t, nil, func(s *TestAgent) { - state := s.Agent.server.State() + // TODO This whole thing can go away since the ACLs should be tested in the + // RPC test + //for _, endpoint := range []string{"ls", "stat", "readat", "cat", "stream"} { + for _, endpoint := range []string{"ls", "stat", "readat", "cat"} { + t.Run(endpoint, func(t *testing.T) { - req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/fs/%s/", endpoint), nil) - assert.Nil(err) + httpACLTest(t, nil, func(s *TestAgent) { + state := s.Agent.server.State() - // Try request without a token and expect failure - { - respW := httptest.NewRecorder() - _, err := s.Server.FsRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) - } + req, err := http.NewRequest("GET", fmt.Sprintf("/v1/client/fs/%s/", endpoint), nil) + require.Nil(err) - // Try request with an invalid token and expect failure - { - respW := httptest.NewRecorder() - policy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadLogs}) - token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", policy) - setToken(req, token) - _, err := s.Server.FsRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) - } + // Try request without a token and expect failure + { + respW := httptest.NewRecorder() + _, err := s.Server.FsRequest(respW, req) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } - // Try request with a valid token - // No alloc id set, so expect an error - just not a permissions error - { - respW := httptest.NewRecorder() - policy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS}) - token := mock.CreatePolicyAndToken(t, state, 1007, "valid", policy) - setToken(req, token) - _, err := s.Server.FsRequest(respW, req) - assert.NotNil(err) - assert.Equal(allocIDNotPresentErr, err) - } + // Try request with an invalid token and expect failure + { + respW := httptest.NewRecorder() + policy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadLogs}) + token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", policy) + setToken(req, token) + _, err := s.Server.FsRequest(respW, req) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } - // Try request with a management token - // No alloc id set, so expect an error - just not a permissions error - { - respW := httptest.NewRecorder() - setToken(req, s.RootToken) - _, err := s.Server.FsRequest(respW, req) - assert.NotNil(err) - assert.Equal(allocIDNotPresentErr, err) - } + // Try request with a valid token + // No alloc id set, so expect an error - just not a permissions error + { + respW := httptest.NewRecorder() + policy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS}) + token := mock.CreatePolicyAndToken(t, state, 1007, "valid", policy) + setToken(req, token) + _, err := s.Server.FsRequest(respW, req) + require.NotNil(err) + require.Equal(allocIDNotPresentErr, err) + } + + // Try request with a management token + // No alloc id set, so expect an error - just not a permissions error + { + respW := httptest.NewRecorder() + setToken(req, s.RootToken) + _, err := s.Server.FsRequest(respW, req) + require.NotNil(err) + require.Equal(allocIDNotPresentErr, err) + } + }) }) } } -func TestAllocDirFS_Logs_ACL(t *testing.T) { - t.Parallel() - assert := assert.New(t) - - httpACLTest(t, nil, func(s *TestAgent) { - state := s.Agent.server.State() - - req, err := http.NewRequest("GET", "/v1/client/fs/logs/", nil) - assert.Nil(err) - - // Try request without a token and expect failure - { - respW := httptest.NewRecorder() - _, err := s.Server.FsRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) - } - - // Try request with an invalid token and expect failure - { - respW := httptest.NewRecorder() - policy := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS}) - token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", policy) - setToken(req, token) - _, err := s.Server.FsRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) - } - - // Try request with a valid token (ReadFS) - // No alloc id set, so expect an error - just not a permissions error - { - respW := httptest.NewRecorder() - policy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS}) - token := mock.CreatePolicyAndToken(t, state, 1007, "valid1", policy) - setToken(req, token) - _, err := s.Server.FsRequest(respW, req) - assert.NotNil(err) - assert.Equal(allocIDNotPresentErr, err) - } - - // Try request with a valid token (ReadLogs) - // No alloc id set, so expect an error - just not a permissions error - { - respW := httptest.NewRecorder() - policy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadLogs}) - token := mock.CreatePolicyAndToken(t, state, 1009, "valid2", policy) - setToken(req, token) - _, err := s.Server.FsRequest(respW, req) - assert.NotNil(err) - assert.Equal(allocIDNotPresentErr, err) - } - - // Try request with a management token - // No alloc id set, so expect an error - just not a permissions error - { - respW := httptest.NewRecorder() - setToken(req, s.RootToken) - _, err := s.Server.FsRequest(respW, req) - assert.NotNil(err) - assert.Equal(allocIDNotPresentErr, err) - } - }) -} - /* func TestHTTP_Stream_MissingParams(t *testing.T) { t.Parallel() @@ -758,315 +688,4 @@ func TestHTTP_Logs_Follow(t *testing.T) { }) }) } - -func BenchmarkHTTP_Logs_Follow(t *testing.B) { - runtime.MemProfileRate = 1 - - s := makeHTTPServer(t, nil) - defer s.Shutdown() - testutil.WaitForLeader(t, s.Agent.RPC) - - // Get a temp alloc dir and create the log dir - ad := tempAllocDir(t) - s.Agent.logger.Printf("ALEX: LOG DIR: %q", ad.SharedDir) - //defer os.RemoveAll(ad.AllocDir) - - logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName) - if err := os.MkdirAll(logDir, 0777); err != nil { - t.Fatalf("Failed to make log dir: %v", err) - } - - // Create a series of log files in the temp dir - task := "foo" - logType := "stdout" - expected := make([]byte, 1024*1024*100) - initialWrites := 3 - - writeToFile := func(index int, data []byte) { - logFile := fmt.Sprintf("%s.%s.%d", task, logType, index) - logFilePath := filepath.Join(logDir, logFile) - err := ioutil.WriteFile(logFilePath, data, 777) - if err != nil { - t.Fatalf("Failed to create file: %v", err) - } - } - - part := (len(expected) / 3) - 50 - goodEnough := (8 * len(expected)) / 10 - for i := 0; i < initialWrites; i++ { - writeToFile(i, expected[i*part:(i+1)*part]) - } - - t.ResetTimer() - for i := 0; i < t.N; i++ { - s.Agent.logger.Printf("BENCHMARK %d", i) - - // Create a decoder - r, w := io.Pipe() - wrappedW := &WriteCloseChecker{WriteCloser: w} - defer r.Close() - defer w.Close() - dec := codec.NewDecoder(r, structs.JsonHandle) - - var received []byte - - // Start the reader - fullResultCh := make(chan struct{}) - go func() { - for { - var frame sframer.StreamFrame - if err := dec.Decode(&frame); err != nil { - if err == io.EOF { - t.Logf("EOF") - return - } - - t.Fatalf("failed to decode: %v", err) - } - - if frame.IsHeartbeat() { - continue - } - - received = append(received, frame.Data...) - if len(received) > goodEnough { - close(fullResultCh) - return - } - } - }() - - // Start streaming logs - go func() { - if err := s.Server.logs(true, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { - t.Fatalf("logs() failed: %v", err) - } - }() - - select { - case <-fullResultCh: - case <-time.After(time.Duration(60 * time.Second)): - t.Fatalf("did not receive data: %d < %d", len(received), goodEnough) - } - - s.Agent.logger.Printf("ALEX: CLOSING") - - // Close the reader - r.Close() - s.Agent.logger.Printf("ALEX: CLOSED") - - s.Agent.logger.Printf("ALEX: WAITING FOR WRITER TO CLOSE") - testutil.WaitForResult(func() (bool, error) { - return wrappedW.Closed, nil - }, func(err error) { - t.Fatalf("connection not closed") - }) - s.Agent.logger.Printf("ALEX: WRITER CLOSED") - } -} - -func TestLogs_findClosest(t *testing.T) { - task := "foo" - entries := []*allocdir.AllocFileInfo{ - { - Name: "foo.stdout.0", - Size: 100, - }, - { - Name: "foo.stdout.1", - Size: 100, - }, - { - Name: "foo.stdout.2", - Size: 100, - }, - { - Name: "foo.stdout.3", - Size: 100, - }, - { - Name: "foo.stderr.0", - Size: 100, - }, - { - Name: "foo.stderr.1", - Size: 100, - }, - { - Name: "foo.stderr.2", - Size: 100, - }, - } - - cases := []struct { - Entries []*allocdir.AllocFileInfo - DesiredIdx int64 - DesiredOffset int64 - Task string - LogType string - ExpectedFile string - ExpectedIdx int64 - ExpectedOffset int64 - Error bool - }{ - // Test error cases - { - Entries: nil, - DesiredIdx: 0, - Task: task, - LogType: "stdout", - Error: true, - }, - { - Entries: entries[0:3], - DesiredIdx: 0, - Task: task, - LogType: "stderr", - Error: true, - }, - - // Test beginning cases - { - Entries: entries, - DesiredIdx: 0, - Task: task, - LogType: "stdout", - ExpectedFile: entries[0].Name, - ExpectedIdx: 0, - }, - { - // Desired offset should be ignored at edges - Entries: entries, - DesiredIdx: 0, - DesiredOffset: -100, - Task: task, - LogType: "stdout", - ExpectedFile: entries[0].Name, - ExpectedIdx: 0, - ExpectedOffset: 0, - }, - { - // Desired offset should be ignored at edges - Entries: entries, - DesiredIdx: 1, - DesiredOffset: -1000, - Task: task, - LogType: "stdout", - ExpectedFile: entries[0].Name, - ExpectedIdx: 0, - ExpectedOffset: 0, - }, - { - Entries: entries, - DesiredIdx: 0, - Task: task, - LogType: "stderr", - ExpectedFile: entries[4].Name, - ExpectedIdx: 0, - }, - { - Entries: entries, - DesiredIdx: 0, - Task: task, - LogType: "stdout", - ExpectedFile: entries[0].Name, - ExpectedIdx: 0, - }, - - // Test middle cases - { - Entries: entries, - DesiredIdx: 1, - Task: task, - LogType: "stdout", - ExpectedFile: entries[1].Name, - ExpectedIdx: 1, - }, - { - Entries: entries, - DesiredIdx: 1, - DesiredOffset: 10, - Task: task, - LogType: "stdout", - ExpectedFile: entries[1].Name, - ExpectedIdx: 1, - ExpectedOffset: 10, - }, - { - Entries: entries, - DesiredIdx: 1, - DesiredOffset: 110, - Task: task, - LogType: "stdout", - ExpectedFile: entries[2].Name, - ExpectedIdx: 2, - ExpectedOffset: 10, - }, - { - Entries: entries, - DesiredIdx: 1, - Task: task, - LogType: "stderr", - ExpectedFile: entries[5].Name, - ExpectedIdx: 1, - }, - // Test end cases - { - Entries: entries, - DesiredIdx: math.MaxInt64, - Task: task, - LogType: "stdout", - ExpectedFile: entries[3].Name, - ExpectedIdx: 3, - }, - { - Entries: entries, - DesiredIdx: math.MaxInt64, - DesiredOffset: math.MaxInt64, - Task: task, - LogType: "stdout", - ExpectedFile: entries[3].Name, - ExpectedIdx: 3, - ExpectedOffset: 100, - }, - { - Entries: entries, - DesiredIdx: math.MaxInt64, - DesiredOffset: -10, - Task: task, - LogType: "stdout", - ExpectedFile: entries[3].Name, - ExpectedIdx: 3, - ExpectedOffset: 90, - }, - { - Entries: entries, - DesiredIdx: math.MaxInt64, - Task: task, - LogType: "stderr", - ExpectedFile: entries[6].Name, - ExpectedIdx: 2, - }, - } - - for i, c := range cases { - entry, idx, offset, err := findClosest(c.Entries, c.DesiredIdx, c.DesiredOffset, c.Task, c.LogType) - if err != nil { - if !c.Error { - t.Fatalf("case %d: Unexpected error: %v", i, err) - } - continue - } - - if entry.Name != c.ExpectedFile { - t.Fatalf("case %d: Got file %q; want %q", i, entry.Name, c.ExpectedFile) - } - if idx != c.ExpectedIdx { - t.Fatalf("case %d: Got index %d; want %d", i, idx, c.ExpectedIdx) - } - if offset != c.ExpectedOffset { - t.Fatalf("case %d: Got offset %d; want %d", i, offset, c.ExpectedOffset) - } - } -} */ diff --git a/demo/vagrant/client1.hcl b/demo/vagrant/client1.hcl index 845a4ebe6..955a9b2ef 100644 --- a/demo/vagrant/client1.hcl +++ b/demo/vagrant/client1.hcl @@ -12,6 +12,10 @@ client { # this should be like "nomad.service.consul:4647" and a system # like Consul used for service discovery. servers = ["127.0.0.1:4647"] + + options { + "driver.raw_exec.enable" = "true" + } } # Modify our port to avoid a collision with server1 diff --git a/demo/vagrant/client2.hcl b/demo/vagrant/client2.hcl index 963546c4b..f5fdeb686 100644 --- a/demo/vagrant/client2.hcl +++ b/demo/vagrant/client2.hcl @@ -12,6 +12,10 @@ client { # this should be like "nomad.service.consul:4647" and a system # like Consul used for service discovery. servers = ["127.0.0.1:4647"] + + options { + "driver.raw_exec.enable" = "true" + } } # Modify our port to avoid a collision with server1 and client1 diff --git a/nomad/client_fs_endpoint.go b/nomad/client_fs_endpoint.go index 05f0f86af..6d5081f16 100644 --- a/nomad/client_fs_endpoint.go +++ b/nomad/client_fs_endpoint.go @@ -96,7 +96,7 @@ func (f *FileSystem) Logs(conn io.ReadWriteCloser) { return } - Bridge(conn, srvConn) + structs.Bridge(conn, srvConn) return } @@ -175,6 +175,6 @@ func (f *FileSystem) Logs(conn io.ReadWriteCloser) { return } - Bridge(conn, clientConn) + structs.Bridge(conn, clientConn) return } diff --git a/nomad/client_rpc.go b/nomad/client_rpc.go index bfdabf580..01538bc36 100644 --- a/nomad/client_rpc.go +++ b/nomad/client_rpc.go @@ -2,9 +2,7 @@ package nomad import ( "fmt" - "io" "net" - "sync" "time" multierror "github.com/hashicorp/go-multierror" @@ -203,23 +201,3 @@ func NodeStreamingRpc(session *yamux.Session, method string) (net.Conn, error) { return stream, nil } - -// Bridge is used to just link two connections together and copy traffic -func Bridge(a, b io.ReadWriteCloser) error { - wg := sync.WaitGroup{} - wg.Add(2) - go func() { - defer wg.Done() - io.Copy(a, b) - a.Close() - b.Close() - }() - go func() { - defer wg.Done() - io.Copy(b, a) - a.Close() - b.Close() - }() - wg.Wait() - return nil -} diff --git a/nomad/structs/streaming_rpc.go b/nomad/structs/streaming_rpc.go index 60a05a23c..602194531 100644 --- a/nomad/structs/streaming_rpc.go +++ b/nomad/structs/streaming_rpc.go @@ -4,6 +4,7 @@ import ( "errors" "io" "strings" + "sync" ) // TODO(alexdadgar): move to errors.go @@ -64,3 +65,23 @@ func (s *StreamingRpcRegistery) GetHandler(method string) (StreamingRpcHandler, return h, nil } + +// Bridge is used to just link two connections together and copy traffic +func Bridge(a, b io.ReadWriteCloser) error { + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + io.Copy(a, b) + a.Close() + b.Close() + }() + go func() { + defer wg.Done() + io.Copy(b, a) + a.Close() + b.Close() + }() + wg.Wait() + return nil +}