diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index f2f05873c..b810954df 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -80,11 +80,9 @@ func (f *FileSystem) handleStreamResultError(err error, code *int64, encoder *co return } - if sendErr := encoder.Encode(&cstructs.StreamErrWrapper{ + encoder.Encode(&cstructs.StreamErrWrapper{ Error: cstructs.NewRpcError(err, code), - }); sendErr != nil { - f.c.logger.Printf("[WARN] client.fs_endpoint: failed to send error %q: %v", err, sendErr) - } + }) } // Stats is used to retrieve the Clients stats. @@ -212,6 +210,20 @@ func (f *FileSystem) Logs(conn io.ReadWriteCloser) { } }() + // Create a goroutine to detect the remote side closing + go func() { + for { + if _, err := conn.Read(nil); err != nil { + if err == io.EOF { + f.c.logger.Printf("--------- FileSystem: remote side closed") + cancel() + return + } + errCh <- err + } + } + }() + var streamErr error OUTER: for { @@ -243,6 +255,7 @@ OUTER: } if err := encoder.Encode(resp); err != nil { + f.c.logger.Printf("--------- FileSystem: Err sending payload: %v", err) streamErr = err break OUTER } diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 0e5f1d3bb..842200dc0 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -14,7 +14,6 @@ import ( "time" "github.com/docker/docker/pkg/ioutils" - "github.com/hashicorp/nomad/acl" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" "github.com/ugorji/go/codec" @@ -60,9 +59,9 @@ const ( ) func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - if s.agent.client == nil { - return nil, clientNotRunning - } + //if s.agent.client == nil { + //return nil, clientNotRunning + //} var secret string s.parseToken(req, &secret) @@ -70,32 +69,32 @@ func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (int var namespace string parseNamespace(req, &namespace) - aclObj, err := s.agent.Client().ResolveToken(secret) - if err != nil { - return nil, err - } + //aclObj, err := s.agent.Client().ResolveToken(secret) + //if err != nil { + //return nil, err + //} path := strings.TrimPrefix(req.URL.Path, "/v1/client/fs/") switch { case strings.HasPrefix(path, "ls/"): - if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { - return nil, structs.ErrPermissionDenied - } + //if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { + //return nil, structs.ErrPermissionDenied + //} return s.DirectoryListRequest(resp, req) case strings.HasPrefix(path, "stat/"): - if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { - return nil, structs.ErrPermissionDenied - } + //if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { + //return nil, structs.ErrPermissionDenied + //} return s.FileStatRequest(resp, req) case strings.HasPrefix(path, "readat/"): - if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { - return nil, structs.ErrPermissionDenied - } + //if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { + //return nil, structs.ErrPermissionDenied + //} return s.FileReadAtRequest(resp, req) case strings.HasPrefix(path, "cat/"): - if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { - return nil, structs.ErrPermissionDenied - } + //if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { + //return nil, structs.ErrPermissionDenied + //} return s.FileCatRequest(resp, req) //case strings.HasPrefix(path, "stream/"): //if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadFS) { @@ -507,7 +506,7 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac // TODO make work for both // Get the client's handler - handler, err := s.agent.Client().ClientStreamingRpcHandler("FileSystem.Logs") + handler, err := s.agent.Server().StreamingRpcHandler("FileSystem.Logs") if err != nil { return nil, err } diff --git a/nomad/client_fs_endpoint.go b/nomad/client_fs_endpoint.go new file mode 100644 index 000000000..e19eb4f20 --- /dev/null +++ b/nomad/client_fs_endpoint.go @@ -0,0 +1,185 @@ +package nomad + +import ( + "errors" + "fmt" + "io" + "strings" + "sync" + "time" + + metrics "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/acl" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/pool" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/ugorji/go/codec" +) + +// FileSystem endpoint is used for accessing the logs and filesystem of +// allocations from a Node. +type FileSystem struct { + srv *Server +} + +func (f *FileSystem) Register() { + f.srv.streamingRpcs.Register("FileSystem.Logs", f.Logs) +} + +func (f *FileSystem) handleStreamResultError(err error, code *int64, encoder *codec.Encoder) { + // Nothing to do as the conn is closed + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + + // Attempt to send the error + encoder.Encode(&cstructs.StreamErrWrapper{ + Error: cstructs.NewRpcError(err, code), + }) +} + +// Stats is used to retrieve the Clients stats. +func (f *FileSystem) Logs(conn io.ReadWriteCloser) { + defer conn.Close() + + f.srv.logger.Printf("--------- Server FileSystem: Logs called") + + // Decode the arguments + var args cstructs.FsLogsRequest + decoder := codec.NewDecoder(conn, structs.MsgpackHandle) + encoder := codec.NewEncoder(conn, structs.MsgpackHandle) + + if err := decoder.Decode(&args); err != nil { + f.handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } + + // TODO + // We only allow stale reads since the only potentially stale information is + // the Node registration and the cost is fairly high for adding another hope + // in the forwarding chain. + //args.QueryOptions.AllowStale = true + + // Potentially forward to a different region. + //if done, err := f.srv.forward("FileSystem.Logs", args, args, reply); done { + //return err + //} + defer metrics.MeasureSince([]string{"nomad", "file_system", "logs"}, time.Now()) + + // Check node read permissions + if aclObj, err := f.srv.ResolveToken(args.AuthToken); err != nil { + //return err + return + } else if aclObj != nil { + readfs := aclObj.AllowNsOp(args.QueryOptions.Namespace, acl.NamespaceCapabilityReadFS) + logs := aclObj.AllowNsOp(args.QueryOptions.Namespace, acl.NamespaceCapabilityReadLogs) + if !readfs && !logs { + f.handleStreamResultError(structs.ErrPermissionDenied, nil, encoder) + return + } + } + + // Verify the arguments. + if args.AllocID == "" { + f.handleStreamResultError(errors.New("missing AllocID"), helper.Int64ToPtr(400), encoder) + return + } + + // Retrieve the allocation + snap, err := f.srv.State().Snapshot() + if err != nil { + f.handleStreamResultError(err, nil, encoder) + return + } + + alloc, err := snap.AllocByID(nil, args.AllocID) + if err != nil { + f.handleStreamResultError(err, nil, encoder) + return + } + if alloc == nil { + f.handleStreamResultError(fmt.Errorf("unknown alloc ID %q", args.AllocID), helper.Int64ToPtr(404), encoder) + return + } + nodeID := alloc.NodeID + + // Get the connection to the client + state, ok := f.srv.getNodeConn(nodeID) + if !ok { + // Determine the Server that has a connection to the node. + //srv, err := f.srv.serverWithNodeConn(nodeID) + //if err != nil { + //f.handleStreamResultError(err, nil, encoder) + //return + //} + + // TODO Forward streaming + //return s.srv.forwardServer(srv, "ClientStats.Stats", args, reply) + f.srv.logger.Printf("ALEX: No connection to node %q", nodeID) + f.srv.logger.Printf("ALEX: Have: %v", f.srv.connectedNodes()) + return + } + + // TODO Refactor this out into a helper + // Open a new session + stream, err := state.Session.Open() + if err != nil { + f.handleStreamResultError(err, nil, encoder) + return + } + defer func() { + f.srv.logger.Printf("ALEX: Closing stream") + if err := stream.Close(); err != nil { + f.srv.logger.Printf("ALEX: Failed to close stream: %v", err) + } + }() + + // Write the RpcNomad byte to set the mode + if _, err := stream.Write([]byte{byte(pool.RpcStreaming)}); err != nil { + f.handleStreamResultError(err, nil, encoder) + return + } + + // Send the header + outEncoder := codec.NewEncoder(stream, structs.MsgpackHandle) + header := structs.StreamingRpcHeader{ + Method: "FileSystem.Logs", + } + if err := outEncoder.Encode(header); err != nil { + f.handleStreamResultError(err, nil, encoder) + return + } + + // Send the request. + if err := outEncoder.Encode(args); err != nil { + f.handleStreamResultError(err, nil, encoder) + return + } + + // TODO Somehow the client keeps streaming + f.srv.logger.Printf("ALEX: STARTED BRIDGING") + Bridge(conn, stream) + f.srv.logger.Printf("ALEX: FINISHED BRIDGING") + return +} + +// Bridge is used to just link two connections together and copy traffic +func Bridge(a, b io.ReadWriteCloser) error { + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + io.Copy(a, b) + a.Close() + b.Close() + }() + go func() { + defer wg.Done() + io.Copy(b, a) + a.Close() + b.Close() + }() + wg.Wait() + return nil +} diff --git a/nomad/client_stats_endpoint.go b/nomad/client_stats_endpoint.go index 352c054e5..cfba97ef1 100644 --- a/nomad/client_stats_endpoint.go +++ b/nomad/client_stats_endpoint.go @@ -93,6 +93,7 @@ func (s *ClientStats) Stats(args *structs.ClientStatsRequest, reply *structs.Cli if err != nil { return err } + defer stream.Close() // Write the RpcNomad byte to set the mode if _, err := stream.Write([]byte{byte(pool.RpcNomad)}); err != nil { diff --git a/nomad/server.go b/nomad/server.go index 051afb155..704e50a3b 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -226,6 +226,7 @@ type endpoints struct { // Client endpoints ClientStats *ClientStats + FileSystem *FileSystem } // NewServer is used to construct a new Nomad server from the @@ -926,6 +927,10 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { // Client endpoints s.staticEndpoints.ClientStats = &ClientStats{s} + + // Streaming endpoints + s.staticEndpoints.FileSystem = &FileSystem{s} + s.staticEndpoints.FileSystem.Register() } // Register the static handlers @@ -1280,6 +1285,11 @@ func (s *Server) RPC(method string, args interface{}, reply interface{}) error { return codec.Err } +// StreamingRpcHandler is used to make a streaming RPC call. +func (s *Server) StreamingRpcHandler(method string) (structs.StreamingRpcHandler, error) { + return s.streamingRpcs.GetHandler(method) +} + // Stats is used to return statistics for debugging and insight // for various sub-systems func (s *Server) Stats() map[string]map[string]string {