From 13071e98468e5069cfac686452ab3bd853878fda Mon Sep 17 00:00:00 2001 From: Ivo Verberk Date: Thu, 11 Aug 2016 09:20:53 +0200 Subject: [PATCH] Disk resource monitoring and enforcement --- api/tasks.go | 1 + client/alloc_runner.go | 49 ++++++++- client/alloc_runner_test.go | 107 ++++++++++++++++++ client/allocdir/alloc_dir.go | 137 +++++++++++++++++++++++- client/allocdir/alloc_dir_test.go | 10 +- client/driver/driver_test.go | 2 +- client/driver/executor/executor_test.go | 5 +- client/task_runner.go | 26 +++-- client/task_runner_test.go | 16 +-- command/agent/fs_endpoint_test.go | 3 +- nomad/structs/structs.go | 46 +++++++- 11 files changed, 367 insertions(+), 35 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index ab5fbbc52..82266f27c 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -218,6 +218,7 @@ const ( TaskNotRestarting = "Not Restarting" TaskDownloadingArtifacts = "Downloading Artifacts" TaskArtifactDownloadFailed = "Failed Artifact Download" + TaskDiskExceeded = "Disk Exceeded" ) // TaskEvent is an event that effects the state of a task and contains meta-data diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 1347407d4..e2705a341 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -25,6 +25,10 @@ const ( // update will transfer all past state information. If not other transition // has occurred up to this limit, we will send to the server. taskReceivedSyncLimit = 30 * time.Second + + // watchdogInterval is the interval at which resource constraints for the + // allocation are being checked and enforced. + watchdogInterval = 5 * time.Second ) // AllocStateUpdater is used to update the status of an allocation @@ -237,6 +241,12 @@ func (r *AllocRunner) Alloc() *structs.Allocation { if r.allocClientStatus != "" || r.allocClientDescription != "" { alloc.ClientStatus = r.allocClientStatus alloc.ClientDescription = r.allocClientDescription + + // Copy over the task states so we don't lose them + r.taskStatusLock.RLock() + alloc.TaskStates = copyTaskStates(r.taskStates) + r.taskStatusLock.RUnlock() + r.allocLock.Unlock() return alloc } @@ -328,7 +338,7 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv for task, tr := range r.tasks { if task != taskName { destroyingTasks = append(destroyingTasks, task) - tr.Destroy() + tr.Destroy(structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName)) } } if len(destroyingTasks) > 0 { @@ -376,7 +386,7 @@ func (r *AllocRunner) Run() { // Create the execution context r.ctxLock.Lock() if r.ctx == nil { - allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID)) + allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID), r.Alloc().Resources.DiskMB) if err := allocDir.Build(tg.Tasks); err != nil { r.logger.Printf("[WARN] client: failed to build task directories: %v", err) r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup)) @@ -413,6 +423,16 @@ func (r *AllocRunner) Run() { } r.taskLock.Unlock() + // Start watching the shared allocation directory for disk usage + go r.ctx.AllocDir.StartDiskWatcher() + + watchdog := time.NewTicker(watchdogInterval) + defer watchdog.Stop() + + // taskDestroyEvent contains an event that caused the destroyment of a task + // in the allocation. + var taskDestroyEvent *structs.TaskEvent + OUTER: // Wait for updates for { @@ -425,6 +445,7 @@ OUTER: // Check if we're in a terminal status if update.TerminalStatus() { + taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled) break OUTER } @@ -433,7 +454,14 @@ OUTER: for _, tr := range runners { tr.Update(update) } + case <-watchdog.C: + if event, desc := r.checkResources(); event != nil { + r.setStatus(structs.AllocClientStatusFailed, desc) + taskDestroyEvent = event + break OUTER + } case <-r.destroyCh: + taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled) break OUTER } } @@ -441,7 +469,7 @@ OUTER: // Destroy each sub-task runners := r.getTaskRunners() for _, tr := range runners { - tr.Destroy() + tr.Destroy(taskDestroyEvent) } // Wait for termination of the task runners @@ -452,11 +480,26 @@ OUTER: // Final state sync r.syncStatus() + // Stop watching the shared allocation directory + r.ctx.AllocDir.StopDiskWatcher() + // Block until we should destroy the state of the alloc r.handleDestroy() r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID) } +// checkResources monitors and enforces alloc resource usage. It returns an +// appropriate task event describing why the allocation had to be killed. +func (r *AllocRunner) checkResources() (*structs.TaskEvent, string) { + diskSize := r.ctx.AllocDir.GetSize() + diskLimit := r.Alloc().Resources.DiskInBytes() + if diskSize > diskLimit { + return structs.NewTaskEvent(structs.TaskDiskExceeded).SetDiskLimit(diskLimit).SetDiskSize(diskSize), + "shared allocation directory exceeded the allowed disk space" + } + return nil, "" +} + // handleDestroy blocks till the AllocRunner should be destroyed and does the // necessary cleanup. func (r *AllocRunner) handleDestroy() { diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index f97844799..9bdfcf9e8 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -1,6 +1,7 @@ package client import ( + "bufio" "fmt" "os" "testing" @@ -149,6 +150,112 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { }) } +func TestAllocRunner_DiskExceeded_Destroy(t *testing.T) { + ctestutil.ExecCompatible(t) + upd, ar := testAllocRunner(false) + + // Ensure task takes some time + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Config["command"] = "/bin/sleep" + task.Config["args"] = []string{"60"} + go ar.Run() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, fmt.Errorf("No updates") + } + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Create a 20mb file in the shared alloc directory, which should cause the + // allocation to terminate in a failed state. + name := ar.ctx.AllocDir.SharedDir + "/20mb.bin" + f, err := os.Create(name) + if err != nil { + t.Fatal("unable to create file: %v", err) + } + + defer func() { + if err := f.Close(); err != nil { + t.Fatal("unable to close file: %v", err) + } + os.Remove(name) + }() + + // write 20 megabytes (1280 * 16384 bytes) of zeros to the file + w := bufio.NewWriter(f) + buf := make([]byte, 16384) + for i := 0; i < 1280; i++ { + if _, err := w.Write(buf); err != nil { + t.Fatal("unable to write to file: %v", err) + } + } + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, nil + } + + // Check the status has changed. + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusFailed { + return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed) + } + + // Check the state still exists + if _, err := os.Stat(ar.stateFilePath()); err != nil { + return false, fmt.Errorf("state file destroyed: %v", err) + } + + // Check the alloc directory still exists + if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil { + return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Send the destroy signal and ensure the AllocRunner cleans up. + ar.Destroy() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, nil + } + + // Check the status has changed. + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusFailed { + return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed) + } + + // Check the state was cleaned + if _, err := os.Stat(ar.stateFilePath()); err == nil { + return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath()) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + // Check the alloc directory was cleaned + if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil { + return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} func TestAllocRunner_Destroy(t *testing.T) { ctestutil.ExecCompatible(t) upd, ar := testAllocRunner(false) diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index f9e6c05f9..588f70799 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -4,8 +4,11 @@ import ( "fmt" "io" "io/ioutil" + "log" + "math" "os" "path/filepath" + "sync" "time" "gopkg.in/tomb.v1" @@ -15,6 +18,20 @@ import ( "github.com/hpcloud/tail/watch" ) +const ( + // The minimum frequency to use for disk monitoring. + minCheckDiskInterval = 3 * time.Minute + + // The maximum frequency to use for disk monitoring. + maxCheckDiskInterval = 15 * time.Second + + // The amount of time that maxCheckDiskInterval is always used after + // starting the allocation. This prevents unbounded disk usage that would + // otherwise be possible for a number of minutes if we started with the + // minCheckDiskInterval. + checkDiskMaxEnforcePeriod = 5 * time.Minute +) + var ( // The name of the directory that is shared across tasks in a task group. SharedAllocName = "alloc" @@ -44,6 +61,33 @@ type AllocDir struct { // TaskDirs is a mapping of task names to their non-shared directory. TaskDirs map[string]string + + // Size is the total consumed disk size of the shared directory in bytes + size int64 + sizeLock sync.RWMutex + + // The minimum frequency to use for disk monitoring. + MinCheckDiskInterval time.Duration + + // The maximum frequency to use for disk monitoring. + MaxCheckDiskInterval time.Duration + + // The amount of time that maxCheckDiskInterval is always used after + // starting the allocation. This prevents unbounded disk usage that would + // otherwise be possible for a number of minutes if we started with the + // minCheckDiskInterval. + CheckDiskMaxEnforcePeriod time.Duration + + // running reflects the state of the disk watcher process. + running bool + + // watchCh signals that the alloc directory is being torn down and that + // any monitoring on it should stop. + watchCh chan struct{} + + // maxSize represents the total amount of megabytes that the shared allocation + // directory is allowed to consume. + maxSize int } // AllocFileInfo holds information about a file inside the AllocDir @@ -64,14 +108,24 @@ type AllocDirFS interface { ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error) } -func NewAllocDir(allocDir string) *AllocDir { - d := &AllocDir{AllocDir: allocDir, TaskDirs: make(map[string]string)} +// NewAllocDir initializes the AllocDir struct with allocDir as base path for +// the allocation directory and maxSize as the maximum allowed size in megabytes. +func NewAllocDir(allocDir string, maxSize int) *AllocDir { + d := &AllocDir{ + AllocDir: allocDir, + MaxCheckDiskInterval: maxCheckDiskInterval, + MinCheckDiskInterval: minCheckDiskInterval, + CheckDiskMaxEnforcePeriod: checkDiskMaxEnforcePeriod, + TaskDirs: make(map[string]string), + maxSize: maxSize, + } d.SharedDir = filepath.Join(d.AllocDir, SharedAllocName) return d } // Tears down previously build directory structure. func (d *AllocDir) Destroy() error { + // Unmount all mounted shared alloc dirs. var mErr multierror.Error if err := d.UnmountAll(); err != nil { @@ -397,3 +451,82 @@ func (d *AllocDir) pathExists(path string) bool { } return true } + +// GetSize returns the size of the shared allocation directory. +func (d *AllocDir) GetSize() int64 { + d.sizeLock.Lock() + defer d.sizeLock.Unlock() + + return d.size +} + +// setSize sets the size of the shared allocation directory. +func (d *AllocDir) setSize(size int64) { + d.sizeLock.Lock() + defer d.sizeLock.Unlock() + + d.size = size +} + +// StartDiskWatcher periodically checks the disk space consumed by the shared +// allocation directory. +func (d *AllocDir) StartDiskWatcher() { + start := time.Now() + + sync := time.NewTimer(d.MaxCheckDiskInterval) + defer sync.Stop() + + d.running = true + d.watchCh = make(chan struct{}) + + for { + select { + case <-d.watchCh: + return + case <-sync.C: + if err := d.syncDiskUsage(); err != nil { + log.Printf("[WARN] client: failed to sync disk usage: %v", err) + } + // Calculate the disk ratio. + diskRatio := float64(d.size) / float64(d.maxSize*structs.BytesInMegabyte) + + // Exponentially decrease the interval when the disk ratio increases. + nextInterval := time.Duration(int64(1.0/(0.1*math.Pow(diskRatio, 2))+5)) * time.Second + + // Use the maximum interval for the first five minutes or if the + // disk ratio is sufficiently high. Also use the minimum check interval + // if the disk ratio becomes low enough. + if nextInterval < d.MaxCheckDiskInterval || time.Since(start) < d.CheckDiskMaxEnforcePeriod { + nextInterval = d.MaxCheckDiskInterval + } else if nextInterval > d.MinCheckDiskInterval { + nextInterval = d.MinCheckDiskInterval + } + sync.Reset(nextInterval) + } + } +} + +// StopDiskWatcher closes the watch channel which causes the disk monitoring to stop. +func (d *AllocDir) StopDiskWatcher() { + if d.running { + d.running = false + close(d.watchCh) + } +} + +// syncDiskUsage walks the shared allocation directory recursively and +// calculates the total consumed disk space. +func (d *AllocDir) syncDiskUsage() error { + var size int64 + err := filepath.Walk(d.SharedDir, + func(path string, info os.FileInfo, err error) error { + // Ignore paths that do not have a valid FileInfo object + if err == nil { + size += info.Size() + } + return nil + }) + // Store the disk consumption + d.setSize(size) + return err +} diff --git a/client/allocdir/alloc_dir_test.go b/client/allocdir/alloc_dir_test.go index 6f459fff4..aa0f5e49b 100644 --- a/client/allocdir/alloc_dir_test.go +++ b/client/allocdir/alloc_dir_test.go @@ -52,7 +52,7 @@ func TestAllocDir_BuildAlloc(t *testing.T) { } defer os.RemoveAll(tmp) - d := NewAllocDir(tmp) + d := NewAllocDir(tmp, structs.DefaultResources().DiskMB) defer d.Destroy() tasks := []*structs.Task{t1, t2} if err := d.Build(tasks); err != nil { @@ -83,7 +83,7 @@ func TestAllocDir_LogDir(t *testing.T) { } defer os.RemoveAll(tmp) - d := NewAllocDir(tmp) + d := NewAllocDir(tmp, structs.DefaultResources().DiskMB) defer d.Destroy() expected := filepath.Join(d.AllocDir, SharedAllocName, LogDirName) @@ -99,7 +99,7 @@ func TestAllocDir_EmbedNonExistent(t *testing.T) { } defer os.RemoveAll(tmp) - d := NewAllocDir(tmp) + d := NewAllocDir(tmp, structs.DefaultResources().DiskMB) defer d.Destroy() tasks := []*structs.Task{t1, t2} if err := d.Build(tasks); err != nil { @@ -121,7 +121,7 @@ func TestAllocDir_EmbedDirs(t *testing.T) { } defer os.RemoveAll(tmp) - d := NewAllocDir(tmp) + d := NewAllocDir(tmp, structs.DefaultResources().DiskMB) defer d.Destroy() tasks := []*structs.Task{t1, t2} if err := d.Build(tasks); err != nil { @@ -182,7 +182,7 @@ func TestAllocDir_MountSharedAlloc(t *testing.T) { } defer os.RemoveAll(tmp) - d := NewAllocDir(tmp) + d := NewAllocDir(tmp, structs.DefaultResources().DiskMB) defer d.Destroy() tasks := []*structs.Task{t1, t2} if err := d.Build(tasks); err != nil { diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index 312f2748b..96c490133 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -78,7 +78,7 @@ func testConfig() *config.Config { func testDriverContexts(task *structs.Task) (*DriverContext, *ExecContext) { cfg := testConfig() - allocDir := allocdir.NewAllocDir(filepath.Join(cfg.AllocDir, structs.GenerateUUID())) + allocDir := allocdir.NewAllocDir(filepath.Join(cfg.AllocDir, structs.GenerateUUID()), task.Resources.DiskMB) allocDir.Build([]*structs.Task{task}) alloc := mock.Alloc() execCtx := NewExecContext(allocDir, alloc.ID) diff --git a/client/driver/executor/executor_test.go b/client/driver/executor/executor_test.go index 6844f9538..50ec6353b 100644 --- a/client/driver/executor/executor_test.go +++ b/client/driver/executor/executor_test.go @@ -11,14 +11,13 @@ import ( "testing" "time" - "github.com/mitchellh/go-ps" - "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/driver/env" "github.com/hashicorp/nomad/client/testutil" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" tu "github.com/hashicorp/nomad/testutil" + "github.com/mitchellh/go-ps" ) var ( @@ -38,7 +37,7 @@ func mockAllocDir(t *testing.T) (*structs.Task, *allocdir.AllocDir) { alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] - allocDir := allocdir.NewAllocDir(filepath.Join(os.TempDir(), alloc.ID)) + allocDir := allocdir.NewAllocDir(filepath.Join(os.TempDir(), alloc.ID), task.Resources.DiskMB) if err := allocDir.Build([]*structs.Task{task}); err != nil { log.Panicf("allocDir.Build() failed: %v", err) } diff --git a/client/task_runner.go b/client/task_runner.go index 89d5f9f14..c15153771 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -65,10 +65,11 @@ type TaskRunner struct { // downloaded artifactsDownloaded bool - destroy bool - destroyCh chan struct{} - destroyLock sync.Mutex - waitCh chan struct{} + destroy bool + destroyCh chan struct{} + destroyLock sync.Mutex + destroyEvent *structs.TaskEvent + waitCh chan struct{} } // taskRunnerState is used to snapshot the state of the task runner @@ -298,7 +299,7 @@ func (r *TaskRunner) validateTask() error { } func (r *TaskRunner) run() { - // Predeclare things so we an jump to the RESTART + // Predeclare things so we can jump to the RESTART var handleEmpty bool var stopCollection chan struct{} @@ -403,6 +404,9 @@ func (r *TaskRunner) run() { // Store that the task has been destroyed and any associated error. r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(err)) + // Store that task event that provides context on the task destroy. + r.setState(structs.TaskStateDead, r.destroyEvent) + r.runningLock.Lock() r.running = false r.runningLock.Unlock() @@ -446,8 +450,8 @@ func (r *TaskRunner) run() { destroyed := r.destroy r.destroyLock.Unlock() if destroyed { - r.logger.Printf("[DEBUG] client: Not restarting task: %v because it's destroyed by user", r.task.Name) - r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled)) + r.logger.Printf("[DEBUG] client: Not restarting task: %v because it has been destroyed due to: %s", r.task.Name, r.destroyEvent.Message) + r.setState(structs.TaskStateDead, r.destroyEvent) return } @@ -459,7 +463,7 @@ func (r *TaskRunner) run() { } } -// startTask creates the driver and start the task. +// startTask creates the driver and starts the task. func (r *TaskRunner) startTask() error { // Create a driver driver, err := r.createDriver() @@ -616,8 +620,9 @@ func (r *TaskRunner) Update(update *structs.Allocation) { } } -// Destroy is used to indicate that the task context should be destroyed -func (r *TaskRunner) Destroy() { +// Destroy is used to indicate that the task context should be destroyed. The +// event parameter provides a context for the destroy. +func (r *TaskRunner) Destroy(event *structs.TaskEvent) { r.destroyLock.Lock() defer r.destroyLock.Unlock() @@ -625,6 +630,7 @@ func (r *TaskRunner) Destroy() { return } r.destroy = true + r.destroyEvent = event close(r.destroyCh) } diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 58c367fa3..aeb3dfcc6 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -55,7 +55,7 @@ func testTaskRunnerFromAlloc(restarts bool, alloc *structs.Allocation) (*MockTas // we have a mock so that doesn't happen. task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}} - allocDir := allocdir.NewAllocDir(filepath.Join(conf.AllocDir, alloc.ID)) + allocDir := allocdir.NewAllocDir(filepath.Join(conf.AllocDir, alloc.ID), task.Resources.DiskMB) allocDir.Build([]*structs.Task{task}) ctx := driver.NewExecContext(allocDir, alloc.ID) @@ -71,7 +71,7 @@ func TestTaskRunner_SimpleRun(t *testing.T) { upd, tr := testTaskRunner(false) tr.MarkReceived() go tr.Run() - defer tr.Destroy() + defer tr.Destroy(structs.TaskKilled) defer tr.ctx.AllocDir.Destroy() select { @@ -138,7 +138,7 @@ func TestTaskRunner_Destroy(t *testing.T) { } // Begin the tear down - tr.Destroy() + tr.Destroy(structs.TaskKilled) select { case <-tr.WaitCh(): @@ -171,7 +171,7 @@ func TestTaskRunner_Update(t *testing.T) { tr.task.Config["command"] = "/bin/sleep" tr.task.Config["args"] = []string{"100"} go tr.Run() - defer tr.Destroy() + defer tr.Destroy(structs.TaskKilled) defer tr.ctx.AllocDir.Destroy() // Update the task definition @@ -225,7 +225,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { tr.task.Config["command"] = "/bin/sleep" tr.task.Config["args"] = []string{"10"} go tr.Run() - defer tr.Destroy() + defer tr.Destroy(structs.TaskKilled) // Snapshot state time.Sleep(2 * time.Second) @@ -240,7 +240,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { t.Fatalf("err: %v", err) } go tr2.Run() - defer tr2.Destroy() + defer tr2.Destroy(structs.TaskKilled) // Destroy and wait testutil.WaitForResult(func() (bool, error) { @@ -272,7 +272,7 @@ func TestTaskRunner_Download_List(t *testing.T) { upd, tr := testTaskRunnerFromAlloc(false, alloc) tr.MarkReceived() go tr.Run() - defer tr.Destroy() + defer tr.Destroy(structs.TaskKilled) defer tr.ctx.AllocDir.Destroy() select { @@ -337,7 +337,7 @@ func TestTaskRunner_Download_Retries(t *testing.T) { upd, tr := testTaskRunnerFromAlloc(true, alloc) tr.MarkReceived() go tr.Run() - defer tr.Destroy() + defer tr.Destroy(structs.TaskKilled) defer tr.ctx.AllocDir.Destroy() select { diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index 152832e99..bc1977379 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -16,6 +16,7 @@ import ( "time" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/ugorji/go/codec" ) @@ -448,7 +449,7 @@ func tempAllocDir(t *testing.T) *allocdir.AllocDir { t.Fatalf("failed to chmod dir: %v", err) } - return allocdir.NewAllocDir(dir) + return allocdir.NewAllocDir(dir, structs.DefaultResources().DiskMB) } type nopWriteCloser struct { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 516003de1..f50cf7f3e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -721,6 +721,10 @@ type Resources struct { Networks []*NetworkResource } +const ( + BytesInMegabyte = 1024 * 1024 +) + // DefaultResources returns the default resources for a task. func DefaultResources() *Resources { return &Resources{ @@ -731,6 +735,11 @@ func DefaultResources() *Resources { } } +// DiskInBytes returns the amount of disk resources in bytes. +func (r *Resources) DiskInBytes() int64 { + return int64(r.DiskMB * BytesInMegabyte) +} + // Merge merges this resource with another resource. func (r *Resources) Merge(other *Resources) { if other.CPU != 0 { @@ -2131,7 +2140,7 @@ func (ts *TaskState) Copy() *TaskState { return copy } -// Failed returns if the task has has failed. +// Failed returns true if the task has has failed. func (ts *TaskState) Failed() bool { l := len(ts.Events) if ts.State != TaskStateDead || l == 0 { @@ -2139,7 +2148,7 @@ func (ts *TaskState) Failed() bool { } switch ts.Events[l-1].Type { - case TaskNotRestarting, TaskArtifactDownloadFailed, TaskFailedValidation: + case TaskDiskExceeded, TaskNotRestarting, TaskArtifactDownloadFailed, TaskFailedValidation: return true default: return false @@ -2201,6 +2210,14 @@ const ( // TaskArtifactDownloadFailed indicates that downloading the artifacts // failed. TaskArtifactDownloadFailed = "Failed Artifact Download" + + // TaskDiskExceeded indicates that one of the tasks in a taskgroup has + // exceeded the requested disk resources. + TaskDiskExceeded = "Disk Resources Exceeded" + + // TaskSiblingFailed indicates that a sibling task in the task group has + // failed. + TaskSiblingFailed = "Sibling task failed" ) // TaskEvent is an event that effects the state of a task and contains meta-data @@ -2234,6 +2251,16 @@ type TaskEvent struct { // Validation fields ValidationError string // Validation error + + // The maximum allowed task disk size. + DiskLimit int64 + + // The recorded task disk size. + DiskSize int64 + + // Name of the sibling task that caused termination of the task that + // the TaskEvent refers to. + FailedSibling string } func (te *TaskEvent) GoString() string { @@ -2316,6 +2343,21 @@ func (e *TaskEvent) SetKillTimeout(timeout time.Duration) *TaskEvent { return e } +func (e *TaskEvent) SetDiskLimit(limit int64) *TaskEvent { + e.DiskLimit = limit + return e +} + +func (e *TaskEvent) SetDiskSize(size int64) *TaskEvent { + e.DiskSize = size + return e +} + +func (e *TaskEvent) SetFailedSibling(sibling string) *TaskEvent { + e.FailedSibling = sibling + return e +} + // TaskArtifact is an artifact to download before running the task. type TaskArtifact struct { // GetterSource is the source to download an artifact using go-getter