client: 404 when accessing files for GC'ed alloc (#18232)

When an allocation is garbage collected from the client, but not from
the servers, the API request is routed to the client and the client
does attempt to read the file, but the alloc dir has already been
deleted, resulting in a 500 error.

This happens because the client GC only destroys the alloc runner
(deleting the alloc dir), but it keeps a reference to the alloc runner
until the alloc is garbage collected from the servers as well.

This commit adjusts this logic by checking if the alloc runner (and the
alloc files) has been destroyed, returning a 404 if so.
This commit is contained in:
Luiz Aoqui
2023-08-21 16:09:24 -04:00
committed by GitHub
parent 9fa39eb829
commit 14a38bee7b
3 changed files with 237 additions and 31 deletions

3
.changelog/18232.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
client: return 404 instead of 500 when trying to access logs and files from allocations that have been garbage collected
```

View File

@@ -169,32 +169,42 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&req); err != nil {
handleStreamResultError(err, pointer.Of(int64(500)), encoder)
handleStreamResultError(err, pointer.Of(int64(http.StatusInternalServerError)), encoder)
return
}
if req.AllocID == "" {
handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(400)), encoder)
return
}
alloc, err := f.c.GetAlloc(req.AllocID)
if err != nil {
handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(404)), encoder)
handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
return
}
ar, err := f.c.getAllocRunner(req.AllocID)
if err != nil {
handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(http.StatusNotFound)), encoder)
return
}
if ar.IsDestroyed() {
handleStreamResultError(
fmt.Errorf("state for allocation %s not found on client", req.AllocID),
pointer.Of(int64(http.StatusNotFound)),
encoder,
)
return
}
alloc := ar.Alloc()
// Check read permissions
if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil {
handleStreamResultError(err, pointer.Of(int64(403)), encoder)
handleStreamResultError(err, pointer.Of(int64(http.StatusForbidden)), encoder)
return
} else if aclObj != nil && !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadFS) {
handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(403)), encoder)
handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(http.StatusForbidden)), encoder)
return
}
// Validate the arguments
if req.Path == "" {
handleStreamResultError(pathNotPresentErr, pointer.Of(int64(400)), encoder)
handleStreamResultError(pathNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
return
}
switch req.Origin {
@@ -202,15 +212,15 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
case "":
req.Origin = "start"
default:
handleStreamResultError(invalidOrigin, pointer.Of(int64(400)), encoder)
handleStreamResultError(invalidOrigin, pointer.Of(int64(http.StatusBadRequest)), encoder)
return
}
fs, err := f.c.GetAllocFS(req.AllocID)
if err != nil {
code := pointer.Of(int64(500))
code := pointer.Of(int64(http.StatusInternalServerError))
if structs.IsErrUnknownAllocation(err) {
code = pointer.Of(int64(404))
code = pointer.Of(int64(http.StatusNotFound))
}
handleStreamResultError(err, code, encoder)
@@ -220,13 +230,13 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
// Calculate the offset
fileInfo, err := fs.Stat(req.Path)
if err != nil {
handleStreamResultError(err, pointer.Of(int64(400)), encoder)
handleStreamResultError(err, pointer.Of(int64(http.StatusBadRequest)), encoder)
return
}
if fileInfo.IsDir {
handleStreamResultError(
fmt.Errorf("file %q is a directory", req.Path),
pointer.Of(int64(400)), encoder)
pointer.Of(int64(http.StatusBadRequest)), encoder)
return
}
@@ -328,7 +338,7 @@ OUTER:
}
if streamErr != nil {
handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder)
handleStreamResultError(streamErr, pointer.Of(int64(http.StatusInternalServerError)), encoder)
return
}
}
@@ -344,19 +354,29 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&req); err != nil {
handleStreamResultError(err, pointer.Of(int64(500)), encoder)
handleStreamResultError(err, pointer.Of(int64(http.StatusInternalServerError)), encoder)
return
}
if req.AllocID == "" {
handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(400)), encoder)
handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
return
}
alloc, err := f.c.GetAlloc(req.AllocID)
ar, err := f.c.getAllocRunner(req.AllocID)
if err != nil {
handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(404)), encoder)
handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(http.StatusNotFound)), encoder)
return
}
if ar.IsDestroyed() {
handleStreamResultError(
fmt.Errorf("state for allocation %s not found on client", req.AllocID),
pointer.Of(int64(http.StatusNotFound)),
encoder,
)
return
}
alloc := ar.Alloc()
// Check read permissions
if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil {
@@ -373,13 +393,13 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
// Validate the arguments
if req.Task == "" {
handleStreamResultError(taskNotPresentErr, pointer.Of(int64(400)), encoder)
handleStreamResultError(taskNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
return
}
switch req.LogType {
case "stdout", "stderr":
default:
handleStreamResultError(logTypeNotPresentErr, pointer.Of(int64(400)), encoder)
handleStreamResultError(logTypeNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
return
}
switch req.Origin {
@@ -387,15 +407,15 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
case "":
req.Origin = "start"
default:
handleStreamResultError(invalidOrigin, pointer.Of(int64(400)), encoder)
handleStreamResultError(invalidOrigin, pointer.Of(int64(http.StatusBadRequest)), encoder)
return
}
fs, err := f.c.GetAllocFS(req.AllocID)
if err != nil {
code := pointer.Of(int64(500))
code := pointer.Of(int64(http.StatusInternalServerError))
if structs.IsErrUnknownAllocation(err) {
code = pointer.Of(int64(404))
code = pointer.Of(int64(http.StatusNotFound))
}
handleStreamResultError(err, code, encoder)
@@ -404,9 +424,9 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
allocState, err := f.c.GetAllocState(req.AllocID)
if err != nil {
code := pointer.Of(int64(500))
code := pointer.Of(int64(http.StatusInternalServerError))
if structs.IsErrUnknownAllocation(err) {
code = pointer.Of(int64(404))
code = pointer.Of(int64(http.StatusNotFound))
}
handleStreamResultError(err, code, encoder)
@@ -418,7 +438,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
if taskState == nil {
handleStreamResultError(
fmt.Errorf("unknown task name %q", req.Task),
pointer.Of(int64(400)),
pointer.Of(int64(http.StatusBadRequest)),
encoder)
return
}
@@ -426,7 +446,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
if taskState.StartedAt.IsZero() {
handleStreamResultError(
fmt.Errorf("task %q not started yet. No logs available", req.Task),
pointer.Of(int64(404)),
pointer.Of(int64(http.StatusNotFound)),
encoder)
return
}
@@ -512,7 +532,7 @@ OUTER:
if streamErr != nil {
// If error has a Code, use it
var code int64 = 500
var code int64 = http.StatusInternalServerError
if codedErr, ok := streamErr.(interface{ Code() int }); ok {
code = int64(codedErr.Code())
}

View File

@@ -9,6 +9,7 @@ import (
"io"
"math"
"net"
"net/http"
"os"
"path/filepath"
"reflect"
@@ -31,6 +32,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
@@ -394,6 +396,96 @@ OUTER:
}
}
// TestFS_Stream_GC asserts that reading files from an alloc that has been
// GC'ed from the client returns a 404 error.
func TestFS_Stream_GC(t *testing.T) {
ci.Parallel(t)
// Start a server and client.
s, cleanupS := nomad.TestServer(t, nil)
t.Cleanup(cleanupS)
testutil.WaitForLeader(t, s.RPC)
c, cleanupC := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
t.Cleanup(func() { cleanupC() })
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}
// Wait for alloc to be running.
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
// GC alloc from the client.
ar, err := c.getAllocRunner(alloc.ID)
must.NoError(t, err)
c.garbageCollector.MarkForCollection(alloc.ID, ar)
must.True(t, c.CollectAllocation(alloc.ID))
// Build the request.
req := &cstructs.FsStreamRequest{
AllocID: alloc.ID,
Path: "alloc/logs/web.stdout.0",
PlainText: true,
Follow: true,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Get the handler.
handler, err := c.StreamingRpcHandler("FileSystem.Stream")
must.NoError(t, err)
// Create a pipe.
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler.
go handler(p2)
// Start the decoder.
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
must.NoError(t, encoder.Encode(req))
for {
select {
case <-time.After(3 * time.Second):
t.Fatal("timeout")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
must.Error(t, msg.Error)
must.ErrorContains(t, msg.Error, "not found on client")
must.Eq(t, http.StatusNotFound, *msg.Error.Code)
return
}
}
}
func TestFS_Stream_ACL(t *testing.T) {
ci.Parallel(t)
@@ -1015,13 +1107,104 @@ func TestFS_Logs_TaskPending(t *testing.T) {
case msg := <-streamMsg:
require.NotNil(msg.Error)
require.NotNil(msg.Error.Code)
require.EqualValues(404, *msg.Error.Code)
require.EqualValues(http.StatusNotFound, *msg.Error.Code)
require.Contains(msg.Error.Message, "not started")
return
}
}
}
// TestFS_Logs_GC asserts that reading logs from an alloc that has been GC'ed
// from the client returns a 404 error.
func TestFS_Logs_GC(t *testing.T) {
ci.Parallel(t)
// Start a server and client.
s, cleanupS := nomad.TestServer(t, nil)
t.Cleanup(cleanupS)
testutil.WaitForLeader(t, s.RPC)
c, cleanupC := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
t.Cleanup(func() { cleanupC() })
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}
// Wait for alloc to be running.
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]
// GC alloc from the client.
ar, err := c.getAllocRunner(alloc.ID)
must.NoError(t, err)
c.garbageCollector.MarkForCollection(alloc.ID, ar)
must.True(t, c.CollectAllocation(alloc.ID))
// Build the request.
req := &cstructs.FsLogsRequest{
AllocID: alloc.ID,
Task: job.TaskGroups[0].Tasks[0].Name,
LogType: "stdout",
Origin: "start",
PlainText: true,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Get the handler.
handler, err := c.StreamingRpcHandler("FileSystem.Logs")
must.NoError(t, err)
// Create a pipe.
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler.
go handler(p2)
// Start the decoder.
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// Send the request.
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
must.NoError(t, encoder.Encode(req))
for {
select {
case <-time.After(3 * time.Second):
t.Fatal("timeout")
case err := <-errCh:
t.Fatalf("unexpected stream error: %v", err)
case msg := <-streamMsg:
must.Error(t, msg.Error)
must.ErrorContains(t, msg.Error, "not found on client")
must.Eq(t, http.StatusNotFound, *msg.Error.Code)
return
}
}
}
func TestFS_Logs_ACL(t *testing.T) {
ci.Parallel(t)
require := require.New(t)