Merge pull request #1845 from hashicorp/f-remove-disk-usage-acct

Remove disk usage enforcement
This commit is contained in:
Alex Dadgar
2016-10-22 19:01:51 -07:00
committed by GitHub
13 changed files with 20 additions and 300 deletions

View File

@@ -247,7 +247,6 @@ const (
TaskNotRestarting = "Not Restarting"
TaskDownloadingArtifacts = "Downloading Artifacts"
TaskArtifactDownloadFailed = "Failed Artifact Download"
TaskDiskExceeded = "Disk Exceeded"
TaskVaultRenewalFailed = "Vault token renewal failed"
TaskSiblingFailed = "Sibling task failed"
TaskSignaling = "Signaling"

View File

@@ -26,10 +26,6 @@ const (
// update will transfer all past state information. If not other transition
// has occurred up to this limit, we will send to the server.
taskReceivedSyncLimit = 30 * time.Second
// watchdogInterval is the interval at which resource constraints for the
// allocation are being checked and enforced.
watchdogInterval = 5 * time.Second
)
// AllocStateUpdater is used to update the status of an allocation
@@ -404,7 +400,7 @@ func (r *AllocRunner) Run() {
// Create the execution context
r.ctxLock.Lock()
if r.ctx == nil {
allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID), r.Alloc().Resources.DiskMB)
allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID))
if err := allocDir.Build(tg.Tasks); err != nil {
r.logger.Printf("[WARN] client: failed to build task directories: %v", err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup))
@@ -449,12 +445,6 @@ func (r *AllocRunner) Run() {
}
r.taskLock.Unlock()
// Start watching the shared allocation directory for disk usage
go r.ctx.AllocDir.StartDiskWatcher()
watchdog := time.NewTicker(watchdogInterval)
defer watchdog.Stop()
// taskDestroyEvent contains an event that caused the destroyment of a task
// in the allocation.
var taskDestroyEvent *structs.TaskEvent
@@ -480,12 +470,6 @@ OUTER:
for _, tr := range runners {
tr.Update(update)
}
case <-watchdog.C:
if event, desc := r.checkResources(); event != nil {
r.setStatus(structs.AllocClientStatusFailed, desc)
taskDestroyEvent = event
break OUTER
}
case <-r.destroyCh:
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
break OUTER
@@ -495,9 +479,6 @@ OUTER:
// Kill the task runners
r.destroyTaskRunners(taskDestroyEvent)
// Stop watching the shared allocation directory
r.ctx.AllocDir.StopDiskWatcher()
// Block until we should destroy the state of the alloc
r.handleDestroy()
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
@@ -527,18 +508,6 @@ func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) {
r.syncStatus()
}
// checkResources monitors and enforces alloc resource usage. It returns an
// appropriate task event describing why the allocation had to be killed.
func (r *AllocRunner) checkResources() (*structs.TaskEvent, string) {
diskSize := r.ctx.AllocDir.GetSize()
diskLimit := r.Alloc().Resources.DiskInBytes()
if diskSize > diskLimit {
return structs.NewTaskEvent(structs.TaskDiskExceeded).SetDiskLimit(diskLimit).SetDiskSize(diskSize),
"shared allocation directory exceeded the allowed disk space"
}
return nil, ""
}
// handleDestroy blocks till the AllocRunner should be destroyed and does the
// necessary cleanup.
func (r *AllocRunner) handleDestroy() {

View File

@@ -1,7 +1,6 @@
package client
import (
"bufio"
"fmt"
"io/ioutil"
"os"
@@ -209,112 +208,6 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
})
}
func TestAllocRunner_DiskExceeded_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"60"}
go ar.Run()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Create a 20mb file in the alloc directory, which should cause the
// allocation to terminate in a failed state.
name := ar.ctx.AllocDir.AllocDir + "/20mb.bin"
f, err := os.Create(name)
if err != nil {
t.Fatal("unable to create file: %v", err)
}
defer func() {
if err := f.Close(); err != nil {
t.Fatal("unable to close file: %v", err)
}
os.Remove(name)
}()
// write 20 megabytes (1280 * 16384 bytes) of zeros to the file
w := bufio.NewWriter(f)
buf := make([]byte, 16384)
for i := 0; i < 1280; i++ {
if _, err := w.Write(buf); err != nil {
t.Fatal("unable to write to file: %v", err)
}
}
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
// Check the status has changed.
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusFailed {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed)
}
// Check the state still exists
if _, err := os.Stat(ar.stateFilePath()); err != nil {
return false, fmt.Errorf("state file destroyed: %v", err)
}
// Check the alloc directory still exists
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil {
return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Send the destroy signal and ensure the AllocRunner cleans up.
ar.Destroy()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
// Check the status has changed.
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusFailed {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed)
}
// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}
// Check the alloc directory was cleaned
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil {
return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir)
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestAllocRunner_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)

View File

@@ -5,11 +5,8 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
"path/filepath"
"sync"
"time"
"gopkg.in/tomb.v1"
@@ -19,20 +16,6 @@ import (
"github.com/hpcloud/tail/watch"
)
const (
// The minimum frequency to use for disk monitoring.
minCheckDiskInterval = 3 * time.Minute
// The maximum frequency to use for disk monitoring.
maxCheckDiskInterval = 15 * time.Second
// The amount of time that maxCheckDiskInterval is always used after
// starting the allocation. This prevents unbounded disk usage that would
// otherwise be possible for a number of minutes if we started with the
// minCheckDiskInterval.
checkDiskMaxEnforcePeriod = 5 * time.Minute
)
var (
// The name of the directory that is shared across tasks in a task group.
SharedAllocName = "alloc"
@@ -66,33 +49,6 @@ type AllocDir struct {
// TaskDirs is a mapping of task names to their non-shared directory.
TaskDirs map[string]string
// Size is the total consumed disk size of the shared directory in bytes
size int64
sizeLock sync.RWMutex
// The minimum frequency to use for disk monitoring.
MinCheckDiskInterval time.Duration
// The maximum frequency to use for disk monitoring.
MaxCheckDiskInterval time.Duration
// The amount of time that maxCheckDiskInterval is always used after
// starting the allocation. This prevents unbounded disk usage that would
// otherwise be possible for a number of minutes if we started with the
// minCheckDiskInterval.
CheckDiskMaxEnforcePeriod time.Duration
// running reflects the state of the disk watcher process.
running bool
// watchCh signals that the alloc directory is being torn down and that
// any monitoring on it should stop.
watchCh chan struct{}
// MaxSize represents the total amount of megabytes that the shared allocation
// directory is allowed to consume.
MaxSize int
}
// AllocFileInfo holds information about a file inside the AllocDir
@@ -115,15 +71,11 @@ type AllocDirFS interface {
}
// NewAllocDir initializes the AllocDir struct with allocDir as base path for
// the allocation directory and maxSize as the maximum allowed size in megabytes.
func NewAllocDir(allocDir string, maxSize int) *AllocDir {
// the allocation directory.
func NewAllocDir(allocDir string) *AllocDir {
d := &AllocDir{
AllocDir: allocDir,
MaxCheckDiskInterval: maxCheckDiskInterval,
MinCheckDiskInterval: minCheckDiskInterval,
CheckDiskMaxEnforcePeriod: checkDiskMaxEnforcePeriod,
TaskDirs: make(map[string]string),
MaxSize: maxSize,
AllocDir: allocDir,
TaskDirs: make(map[string]string),
}
d.SharedDir = filepath.Join(d.AllocDir, SharedAllocName)
return d
@@ -597,85 +549,6 @@ func (d *AllocDir) pathExists(path string) bool {
return true
}
// GetSize returns the size of the shared allocation directory.
func (d *AllocDir) GetSize() int64 {
d.sizeLock.Lock()
defer d.sizeLock.Unlock()
return d.size
}
// setSize sets the size of the shared allocation directory.
func (d *AllocDir) setSize(size int64) {
d.sizeLock.Lock()
defer d.sizeLock.Unlock()
d.size = size
}
// StartDiskWatcher periodically checks the disk space consumed by the shared
// allocation directory.
func (d *AllocDir) StartDiskWatcher() {
start := time.Now()
sync := time.NewTimer(d.MaxCheckDiskInterval)
defer sync.Stop()
d.running = true
d.watchCh = make(chan struct{})
for {
select {
case <-d.watchCh:
return
case <-sync.C:
if err := d.syncDiskUsage(); err != nil {
log.Printf("[WARN] client: failed to sync disk usage: %v", err)
}
// Calculate the disk ratio.
diskRatio := float64(d.size) / float64(d.MaxSize*structs.BytesInMegabyte)
// Exponentially decrease the interval when the disk ratio increases.
nextInterval := time.Duration(int64(1.0/(0.1*math.Pow(diskRatio, 2))+5)) * time.Second
// Use the maximum interval for the first five minutes or if the
// disk ratio is sufficiently high. Also use the minimum check interval
// if the disk ratio becomes low enough.
if nextInterval < d.MaxCheckDiskInterval || time.Since(start) < d.CheckDiskMaxEnforcePeriod {
nextInterval = d.MaxCheckDiskInterval
} else if nextInterval > d.MinCheckDiskInterval {
nextInterval = d.MinCheckDiskInterval
}
sync.Reset(nextInterval)
}
}
}
// StopDiskWatcher closes the watch channel which causes the disk monitoring to stop.
func (d *AllocDir) StopDiskWatcher() {
if d.running {
d.running = false
close(d.watchCh)
}
}
// syncDiskUsage walks the allocation directory recursively and
// calculates the total consumed disk space.
func (d *AllocDir) syncDiskUsage() error {
var size int64
err := filepath.Walk(d.AllocDir,
func(path string, info os.FileInfo, err error) error {
// Ignore paths that do not have a valid FileInfo object
if err == nil {
size += info.Size()
}
return nil
})
// Store the disk consumption
d.setSize(size)
return err
}
func (d *AllocDir) GetSecretDir(task string) (string, error) {
if t, ok := d.TaskDirs[task]; !ok {
return "", fmt.Errorf("Allocation directory doesn't contain task %q", task)

View File

@@ -58,7 +58,7 @@ func TestAllocDir_BuildAlloc(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
d := NewAllocDir(tmp)
defer d.Destroy()
tasks := []*structs.Task{t1, t2}
if err := d.Build(tasks); err != nil {
@@ -93,7 +93,7 @@ func TestAllocDir_LogDir(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
d := NewAllocDir(tmp)
defer d.Destroy()
expected := filepath.Join(d.AllocDir, SharedAllocName, LogDirName)
@@ -109,7 +109,7 @@ func TestAllocDir_EmbedNonExistent(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
d := NewAllocDir(tmp)
defer d.Destroy()
tasks := []*structs.Task{t1, t2}
if err := d.Build(tasks); err != nil {
@@ -131,7 +131,7 @@ func TestAllocDir_EmbedDirs(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
d := NewAllocDir(tmp)
defer d.Destroy()
tasks := []*structs.Task{t1, t2}
if err := d.Build(tasks); err != nil {
@@ -192,7 +192,7 @@ func TestAllocDir_MountSharedAlloc(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
d := NewAllocDir(tmp)
defer d.Destroy()
tasks := []*structs.Task{t1, t2}
if err := d.Build(tasks); err != nil {
@@ -240,7 +240,7 @@ func TestAllocDir_Snapshot(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
d := NewAllocDir(tmp)
defer d.Destroy()
tasks := []*structs.Task{t1, t2}
@@ -299,10 +299,10 @@ func TestAllocDir_Move(t *testing.T) {
defer os.RemoveAll(tmp)
// Create two alloc dirs
d1 := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
d1 := NewAllocDir(tmp)
defer d1.Destroy()
d2 := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
d2 := NewAllocDir(tmp)
defer d2.Destroy()
tasks := []*structs.Task{t1, t2}
@@ -356,7 +356,7 @@ func TestAllocDir_EscapeChecking(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
d := NewAllocDir(tmp)
defer d.Destroy()
tasks := []*structs.Task{t1, t2}
if err := d.Build(tasks); err != nil {

View File

@@ -1516,7 +1516,7 @@ func (c *Client) migrateRemoteAllocDir(alloc *structs.Allocation, allocID string
// If the snapshot has ended then we create the previous
// allocdir
if err == io.EOF {
prevAllocDir := allocdir.NewAllocDir(pathToAllocDir, 0)
prevAllocDir := allocdir.NewAllocDir(pathToAllocDir)
return prevAllocDir, nil
}
// If there is an error then we avoid creating the alloc dir

View File

@@ -974,7 +974,7 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config) (*structs.Task, Driver
Resources: basicResources,
}
allocDir := allocdir.NewAllocDir(filepath.Join(cfg.AllocDir, structs.GenerateUUID()), task.Resources.DiskMB)
allocDir := allocdir.NewAllocDir(filepath.Join(cfg.AllocDir, structs.GenerateUUID()))
allocDir.Build([]*structs.Task{task})
alloc := mock.Alloc()
execCtx := NewExecContext(allocDir, alloc.ID)

View File

@@ -79,7 +79,7 @@ func testConfig() *config.Config {
func testDriverContexts(task *structs.Task) (*DriverContext, *ExecContext) {
cfg := testConfig()
allocDir := allocdir.NewAllocDir(filepath.Join(cfg.AllocDir, structs.GenerateUUID()), task.Resources.DiskMB)
allocDir := allocdir.NewAllocDir(filepath.Join(cfg.AllocDir, structs.GenerateUUID()))
allocDir.Build([]*structs.Task{task})
alloc := mock.Alloc()
execCtx := NewExecContext(allocDir, alloc.ID)

View File

@@ -37,7 +37,7 @@ func mockAllocDir(t *testing.T) (*structs.Task, *allocdir.AllocDir) {
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
allocDir := allocdir.NewAllocDir(filepath.Join(os.TempDir(), alloc.ID), task.Resources.DiskMB)
allocDir := allocdir.NewAllocDir(filepath.Join(os.TempDir(), alloc.ID))
if err := allocDir.Build([]*structs.Task{task}); err != nil {
log.Panicf("allocDir.Build() failed: %v", err)
}

View File

@@ -58,7 +58,7 @@ func testTaskRunnerFromAlloc(restarts bool, alloc *structs.Allocation) (*MockTas
// we have a mock so that doesn't happen.
task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}}
allocDir := allocdir.NewAllocDir(filepath.Join(conf.AllocDir, alloc.ID), task.Resources.DiskMB)
allocDir := allocdir.NewAllocDir(filepath.Join(conf.AllocDir, alloc.ID))
allocDir.Build([]*structs.Task{task})
vclient := vaultclient.NewMockVaultClient()

View File

@@ -449,7 +449,7 @@ func tempAllocDir(t *testing.T) *allocdir.AllocDir {
t.Fatalf("failed to chmod dir: %v", err)
}
return allocdir.NewAllocDir(dir, structs.DefaultResources().DiskMB)
return allocdir.NewAllocDir(dir)
}
type nopWriteCloser struct {

View File

@@ -343,12 +343,6 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
} else {
desc = "Task exceeded restart policy"
}
case api.TaskDiskExceeded:
if event.DiskLimit != 0 && event.DiskSize != 0 {
desc = fmt.Sprintf("Disk size exceeded maximum: %d > %d", event.DiskSize, event.DiskLimit)
} else {
desc = "Task exceeded disk quota"
}
case api.TaskVaultRenewalFailed:
if event.VaultError != "" {
desc = event.VaultError

View File

@@ -2474,9 +2474,6 @@ type TaskEvent struct {
// The maximum allowed task disk size.
DiskLimit int64
// The recorded task disk size.
DiskSize int64
// Name of the sibling task that caused termination of the task that
// the TaskEvent refers to.
FailedSibling string
@@ -2600,11 +2597,6 @@ func (e *TaskEvent) SetDiskLimit(limit int64) *TaskEvent {
return e
}
func (e *TaskEvent) SetDiskSize(size int64) *TaskEvent {
e.DiskSize = size
return e
}
func (e *TaskEvent) SetFailedSibling(sibling string) *TaskEvent {
e.FailedSibling = sibling
return e