diff --git a/client/client.go b/client/client.go index e5329e2a8..1dc8d19b8 100644 --- a/client/client.go +++ b/client/client.go @@ -238,6 +238,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg DiskUsageThreshold: cfg.GCDiskUsageThreshold, InodeUsageThreshold: cfg.GCInodeUsageThreshold, Interval: cfg.GCInterval, + ParallelDestroys: cfg.GCParallelDestroys, ReservedDiskMB: cfg.Node.Reserved.DiskMB, } c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, gcConfig) @@ -1832,10 +1833,11 @@ func (c *Client) removeAlloc(alloc *structs.Allocation) error { delete(c.allocs, alloc.ID) c.allocLock.Unlock() - // Remove the allocrunner from garbage collector - c.garbageCollector.Remove(ar) + // Ensure the GC has a reference and then collect. Collecting through the GC + // applies rate limiting + c.garbageCollector.MarkForCollection(ar) + go c.garbageCollector.Collect(alloc.ID) - ar.Destroy() return nil } diff --git a/client/config/config.go b/client/config/config.go index 02f17c873..6924fd722 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -155,12 +155,16 @@ type Config struct { // collection GCInterval time.Duration - // GCDiskUsageThreshold is the disk usage threshold beyond which the Nomad - // client triggers GC of terminal allocations + // GCParallelDestroys is the number of parallel destroys the garbage + // collector will allow. + GCParallelDestroys int + + // GCDiskUsageThreshold is the disk usage threshold given as a percent + // beyond which the Nomad client triggers GC of terminal allocations GCDiskUsageThreshold float64 - // GCInodeUsageThreshold is the inode usage threshold beyond which the Nomad - // client triggers GC of the terminal allocations + // GCInodeUsageThreshold is the inode usage threshold given as a percent + // beyond which the Nomad client triggers GC of the terminal allocations GCInodeUsageThreshold float64 // LogLevel is the level of the logs to putout @@ -194,6 +198,7 @@ func DefaultConfig() *Config { TLSConfig: &config.TLSConfig{}, LogLevel: "DEBUG", GCInterval: 1 * time.Minute, + GCParallelDestroys: 2, GCDiskUsageThreshold: 80, GCInodeUsageThreshold: 70, } diff --git a/client/gc.go b/client/gc.go index 5861dee06..a07db1415 100644 --- a/client/gc.go +++ b/client/gc.go @@ -16,6 +16,271 @@ const ( MB = 1024 * 1024 ) +// GCConfig allows changing the behaviour of the garbage collector +type GCConfig struct { + DiskUsageThreshold float64 + InodeUsageThreshold float64 + Interval time.Duration + ReservedDiskMB int + ParallelDestroys int +} + +// AllocGarbageCollector garbage collects terminated allocations on a node +type AllocGarbageCollector struct { + allocRunners *IndexedGCAllocPQ + statsCollector stats.NodeStatsCollector + config *GCConfig + logger *log.Logger + destroyCh chan struct{} + shutdownCh chan struct{} +} + +// NewAllocGarbageCollector returns a garbage collector for terminated +// allocations on a node. +func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, config *GCConfig) *AllocGarbageCollector { + // Require at least 1 to make progress + if config.ParallelDestroys <= 0 { + logger.Printf("[WARN] client: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys) + config.ParallelDestroys = 1 + } + + gc := &AllocGarbageCollector{ + allocRunners: NewIndexedGCAllocPQ(), + statsCollector: statsCollector, + config: config, + logger: logger, + destroyCh: make(chan struct{}, config.ParallelDestroys), + shutdownCh: make(chan struct{}), + } + + go gc.run() + return gc +} + +func (a *AllocGarbageCollector) run() { + ticker := time.NewTicker(a.config.Interval) + for { + select { + case <-ticker.C: + if err := a.keepUsageBelowThreshold(); err != nil { + a.logger.Printf("[ERR] client: error garbage collecting allocation: %v", err) + } + case <-a.shutdownCh: + ticker.Stop() + return + } + } +} + +// keepUsageBelowThreshold collects disk usage information and garbage collects +// allocations to make disk space available. +func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { + for { + select { + case <-a.shutdownCh: + return nil + default: + } + + // Check if we have enough free space + err := a.statsCollector.Collect() + if err != nil { + return err + } + + // See if we are below thresholds for used disk space and inode usage + // TODO(diptanu) figure out why this is nil + stats := a.statsCollector.Stats() + if stats == nil { + break + } + + diskStats := stats.AllocDirStats + if diskStats == nil { + break + } + + if diskStats.UsedPercent <= a.config.DiskUsageThreshold && + diskStats.InodesUsedPercent <= a.config.InodeUsageThreshold { + break + } + + // Collect an allocation + gcAlloc := a.allocRunners.Pop() + if gcAlloc == nil { + break + } + + ar := gcAlloc.allocRunner + alloc := ar.Alloc() + a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID) + + // Destroy the alloc runner and wait until it exits + a.destroyAllocRunner(ar) + } + return nil +} + +// destroyAllocRunner is used to destroy an allocation runner. It will acquire a +// lock to restrict parallelism and then destroy the alloc runner, returning +// once the allocation has been destroyed. +func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner) { + // Acquire the destroy lock + select { + case <-a.shutdownCh: + return + case a.destroyCh <- struct{}{}: + } + + ar.Destroy() + + select { + case <-ar.WaitCh(): + case <-a.shutdownCh: + } + + a.logger.Printf("[DEBUG] client: garbage collected %q", ar.Alloc().ID) + + // Release the lock + <-a.destroyCh +} + +func (a *AllocGarbageCollector) Stop() { + close(a.shutdownCh) +} + +// Collect garbage collects a single allocation on a node +func (a *AllocGarbageCollector) Collect(allocID string) error { + gcAlloc, err := a.allocRunners.Remove(allocID) + if err != nil { + return fmt.Errorf("unable to collect allocation %q: %v", allocID, err) + } + + ar := gcAlloc.allocRunner + a.logger.Printf("[INFO] client: garbage collecting allocation %q", ar.Alloc().ID) + + a.destroyAllocRunner(ar) + return nil +} + +// CollectAll garbage collects all termianated allocations on a node +func (a *AllocGarbageCollector) CollectAll() error { + for { + select { + case <-a.shutdownCh: + return nil + default: + } + + gcAlloc := a.allocRunners.Pop() + if gcAlloc == nil { + break + } + + ar := gcAlloc.allocRunner + a.logger.Printf("[INFO] client: garbage collecting alloc runner for alloc %q", ar.Alloc().ID) + go a.destroyAllocRunner(ar) + } + return nil +} + +// MakeRoomFor garbage collects enough number of allocations in the terminal +// state to make room for new allocations +func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error { + totalResource := &structs.Resources{} + for _, alloc := range allocations { + if err := totalResource.Add(alloc.Resources); err != nil { + return err + } + } + + // If the host has enough free space to accomodate the new allocations then + // we don't need to garbage collect terminated allocations + if hostStats := a.statsCollector.Stats(); hostStats != nil { + var availableForAllocations uint64 + if hostStats.AllocDirStats.Available < uint64(a.config.ReservedDiskMB*MB) { + availableForAllocations = 0 + } else { + availableForAllocations = hostStats.AllocDirStats.Available - uint64(a.config.ReservedDiskMB*MB) + } + if uint64(totalResource.DiskMB*MB) < availableForAllocations { + return nil + } + } + + var diskCleared int + for { + select { + case <-a.shutdownCh: + return nil + default: + } + + // Collect host stats and see if we still need to remove older + // allocations + var allocDirStats *stats.DiskStats + if err := a.statsCollector.Collect(); err == nil { + if hostStats := a.statsCollector.Stats(); hostStats != nil { + allocDirStats = hostStats.AllocDirStats + } + } + + if allocDirStats != nil { + if allocDirStats.Available >= uint64(totalResource.DiskMB*MB) { + break + } + } else { + // Falling back to a simpler model to know if we have enough disk + // space if stats collection fails + if diskCleared >= totalResource.DiskMB { + break + } + } + + gcAlloc := a.allocRunners.Pop() + if gcAlloc == nil { + break + } + + ar := gcAlloc.allocRunner + alloc := ar.Alloc() + a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID) + + // Destroy the alloc runner and wait until it exits + a.destroyAllocRunner(ar) + + // Call stats collect again + diskCleared += alloc.Resources.DiskMB + } + return nil +} + +// MarkForCollection starts tracking an allocation for Garbage Collection +func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) error { + if ar == nil { + return fmt.Errorf("nil allocation runner inserted for garbage collection") + } + if ar.Alloc() == nil { + a.logger.Printf("[INFO] client: alloc is nil, so garbage collecting") + a.destroyAllocRunner(ar) + } + + a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID) + return a.allocRunners.Push(ar) +} + +// Remove removes an alloc runner without garbage collecting it +func (a *AllocGarbageCollector) Remove(ar *AllocRunner) { + if ar == nil || ar.Alloc() == nil { + return + } + + alloc := ar.Alloc() + if _, err := a.allocRunners.Remove(alloc.ID); err == nil { + a.logger.Printf("[INFO] client: removed alloc runner %v from garbage collector", alloc.ID) + } +} + // GCAlloc wraps an allocation runner and an index enabling it to be used within // a PQ type GCAlloc struct { @@ -78,7 +343,8 @@ func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error { alloc := ar.Alloc() if _, ok := i.index[alloc.ID]; ok { - return fmt.Errorf("alloc %v already being tracked for GC", alloc.ID) + // No work to do + return nil } gcAlloc := &GCAlloc{ timeStamp: time.Now(), @@ -121,224 +387,3 @@ func (i *IndexedGCAllocPQ) Length() int { return len(i.heap) } - -// GCConfig allows changing the behaviour of the garbage collector -type GCConfig struct { - DiskUsageThreshold float64 - InodeUsageThreshold float64 - Interval time.Duration - ReservedDiskMB int -} - -// AllocGarbageCollector garbage collects terminated allocations on a node -type AllocGarbageCollector struct { - allocRunners *IndexedGCAllocPQ - statsCollector stats.NodeStatsCollector - config *GCConfig - logger *log.Logger - shutdownCh chan struct{} -} - -// NewAllocGarbageCollector returns a garbage collector for terminated -// allocations on a node. -func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, config *GCConfig) *AllocGarbageCollector { - gc := &AllocGarbageCollector{ - allocRunners: NewIndexedGCAllocPQ(), - statsCollector: statsCollector, - config: config, - logger: logger, - shutdownCh: make(chan struct{}), - } - go gc.run() - - return gc -} - -func (a *AllocGarbageCollector) run() { - ticker := time.NewTicker(a.config.Interval) - for { - select { - case <-ticker.C: - if err := a.keepUsageBelowThreshold(); err != nil { - a.logger.Printf("[ERR] client: error garbage collecting allocation: %v", err) - } - case <-a.shutdownCh: - ticker.Stop() - return - } - } -} - -// keepUsageBelowThreshold collects disk usage information and garbage collects -// allocations to make disk space available. -func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { - for { - // Check if we have enough free space - err := a.statsCollector.Collect() - if err != nil { - return err - } - - // See if we are below thresholds for used disk space and inode usage - // TODO(diptanu) figure out why this is nil - stats := a.statsCollector.Stats() - if stats == nil { - break - } - - diskStats := stats.AllocDirStats - if diskStats == nil { - break - } - - if diskStats.UsedPercent <= a.config.DiskUsageThreshold && - diskStats.InodesUsedPercent <= a.config.InodeUsageThreshold { - break - } - - // Collect an allocation - gcAlloc := a.allocRunners.Pop() - if gcAlloc == nil { - break - } - - ar := gcAlloc.allocRunner - alloc := ar.Alloc() - a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID) - - // Destroy the alloc runner and wait until it exits - ar.Destroy() - select { - case <-ar.WaitCh(): - case <-a.shutdownCh: - } - } - return nil -} - -func (a *AllocGarbageCollector) Stop() { - close(a.shutdownCh) -} - -// Collect garbage collects a single allocation on a node -func (a *AllocGarbageCollector) Collect(allocID string) error { - gcAlloc, err := a.allocRunners.Remove(allocID) - if err != nil { - return fmt.Errorf("unable to collect allocation %q: %v", allocID, err) - } - - ar := gcAlloc.allocRunner - a.logger.Printf("[INFO] client: garbage collecting allocation %q", ar.Alloc().ID) - ar.Destroy() - - return nil -} - -// CollectAll garbage collects all termianated allocations on a node -func (a *AllocGarbageCollector) CollectAll() error { - for { - gcAlloc := a.allocRunners.Pop() - if gcAlloc == nil { - break - } - ar := gcAlloc.allocRunner - a.logger.Printf("[INFO] client: garbage collecting alloc runner for alloc %q", ar.Alloc().ID) - ar.Destroy() - } - return nil -} - -// MakeRoomFor garbage collects enough number of allocations in the terminal -// state to make room for new allocations -func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error { - totalResource := &structs.Resources{} - for _, alloc := range allocations { - if err := totalResource.Add(alloc.Resources); err != nil { - return err - } - } - - // If the host has enough free space to accomodate the new allocations then - // we don't need to garbage collect terminated allocations - if hostStats := a.statsCollector.Stats(); hostStats != nil { - var availableForAllocations uint64 - if hostStats.AllocDirStats.Available < uint64(a.config.ReservedDiskMB*MB) { - availableForAllocations = 0 - } else { - availableForAllocations = hostStats.AllocDirStats.Available - uint64(a.config.ReservedDiskMB*MB) - } - if uint64(totalResource.DiskMB*MB) < availableForAllocations { - return nil - } - } - - var diskCleared int - for { - // Collect host stats and see if we still need to remove older - // allocations - var allocDirStats *stats.DiskStats - if err := a.statsCollector.Collect(); err == nil { - if hostStats := a.statsCollector.Stats(); hostStats != nil { - allocDirStats = hostStats.AllocDirStats - } - } - - if allocDirStats != nil { - if allocDirStats.Available >= uint64(totalResource.DiskMB*MB) { - break - } - } else { - // Falling back to a simpler model to know if we have enough disk - // space if stats collection fails - if diskCleared >= totalResource.DiskMB { - break - } - } - - gcAlloc := a.allocRunners.Pop() - if gcAlloc == nil { - break - } - - ar := gcAlloc.allocRunner - alloc := ar.Alloc() - a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID) - - // Destroy the alloc runner and wait until it exits - ar.Destroy() - select { - case <-ar.WaitCh(): - case <-a.shutdownCh: - } - - // Call stats collect again - diskCleared += alloc.Resources.DiskMB - } - return nil -} - -// MarkForCollection starts tracking an allocation for Garbage Collection -func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) error { - if ar == nil { - return fmt.Errorf("nil allocation runner inserted for garbage collection") - } - if ar.Alloc() == nil { - a.logger.Printf("[INFO] client: alloc is nil, so garbage collecting") - ar.Destroy() - } - - a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID) - return a.allocRunners.Push(ar) -} - -// Remove removes an alloc runner without garbage collecting it -func (a *AllocGarbageCollector) Remove(ar *AllocRunner) { - if ar == nil || ar.Alloc() == nil { - return - } - - alloc := ar.Alloc() - if _, err := a.allocRunners.Remove(alloc.ID); err == nil { - a.logger.Printf("[INFO] client: removed alloc runner %v from garbage collector", alloc.ID) - } -} diff --git a/command/agent/agent.go b/command/agent/agent.go index 61d1985c0..4c9ca7d00 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -310,6 +310,7 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) { // Set the GC related configs conf.GCInterval = a.config.Client.GCInterval + conf.GCParallelDestroys = a.config.Client.GCParallelDestroys conf.GCDiskUsageThreshold = a.config.Client.GCDiskUsageThreshold conf.GCInodeUsageThreshold = a.config.Client.GCInodeUsageThreshold conf.NoHostUUID = a.config.Client.NoHostUUID diff --git a/command/agent/config-test-fixtures/basic.hcl b/command/agent/config-test-fixtures/basic.hcl index 8974fdf35..28d71e64a 100644 --- a/command/agent/config-test-fixtures/basic.hcl +++ b/command/agent/config-test-fixtures/basic.hcl @@ -54,6 +54,7 @@ client { collection_interval = "5s" } gc_interval = "6s" + gc_parallel_destroys = 6 gc_disk_usage_threshold = 82 gc_inode_usage_threshold = 91 no_host_uuid = true diff --git a/command/agent/config.go b/command/agent/config.go index ccb5db560..ce14d03b6 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -202,6 +202,10 @@ type ClientConfig struct { // collection GCInterval time.Duration `mapstructure:"gc_interval"` + // GCParallelDestroys is the number of parallel destroys the garbage + // collector will allow. + GCParallelDestroys int `mapstructure:"gc_parallel_destroys"` + // GCInodeUsageThreshold is the inode usage threshold beyond which the Nomad // client triggers GC of the terminal allocations GCDiskUsageThreshold float64 `mapstructure:"gc_disk_usage_threshold"` @@ -524,6 +528,7 @@ func DefaultConfig() *Config { ClientMaxPort: 14512, Reserved: &Resources{}, GCInterval: 1 * time.Minute, + GCParallelDestroys: 2, GCInodeUsageThreshold: 70, GCDiskUsageThreshold: 80, }, @@ -929,6 +934,9 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig { if b.GCInterval != 0 { result.GCInterval = b.GCInterval } + if b.GCParallelDestroys != 0 { + result.GCParallelDestroys = b.GCParallelDestroys + } if b.GCDiskUsageThreshold != 0 { result.GCDiskUsageThreshold = b.GCDiskUsageThreshold } diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index ea2e4e0fa..f2bda68f1 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -344,6 +344,7 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error { "gc_interval", "gc_disk_usage_threshold", "gc_inode_usage_threshold", + "gc_parallel_destroys", "no_host_uuid", } if err := checkHCLKeys(listVal, valid); err != nil { diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index f0da05e7f..db5dca331 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -71,6 +71,7 @@ func TestConfig_Parse(t *testing.T) { ParsedReservedPorts: []int{1, 10, 11, 12, 100}, }, GCInterval: 6 * time.Second, + GCParallelDestroys: 6, GCDiskUsageThreshold: 82, GCInodeUsageThreshold: 91, NoHostUUID: true, diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 9cce09e24..ae5f56533 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -212,6 +212,7 @@ func TestConfig_Merge(t *testing.T) { ParsedReservedPorts: []int{1, 2, 3}, }, GCInterval: 6 * time.Second, + GCParallelDestroys: 6, GCDiskUsageThreshold: 71, GCInodeUsageThreshold: 86, }, diff --git a/website/source/docs/agent/configuration/client.html.md b/website/source/docs/agent/configuration/client.html.md index 77aa64e77..025107d29 100644 --- a/website/source/docs/agent/configuration/client.html.md +++ b/website/source/docs/agent/configuration/client.html.md @@ -92,6 +92,10 @@ client { - `gc_inode_usage_threshold` `(float: 70)` - Specifies the inode usage percent which Nomad tries to maintain by garbage collecting terminal allocations. +- `gc_parallel_destroys` `(int: 2)` - Specifies the maximum number of + parallel destroys allowed by the garbage collector. This value should be + relatively low to avoid high resource usage during garbage collections. + - `no_host_uuid` `(bool: false)` - Force the UUID generated by the client to be randomly generated and not be based on the host's UUID.