diff --git a/CHANGELOG.md b/CHANGELOG.md index ff0ccfc6f..013f01505 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +## 0.7.1 (Unreleased) + +IMPROVEMENTS: + * api: Allocations now track and return modify time in addition to create time. + * cli: Allocation create and modify times are displayed in a human readable relative format like `6 h ago`. + * core: Allow agents to be run in `rpc_upgrade_mode` when migrating a cluster + to TLS rather than changing `heartbeat_grace`. + +BUG FIXES: + + ## 0.7.0 (November 1, 2017) __BACKWARDS INCOMPATIBILITIES:__ diff --git a/api/allocations.go b/api/allocations.go index c01cbfc45..f23b83551 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -95,6 +95,7 @@ type Allocation struct { ModifyIndex uint64 AllocModifyIndex uint64 CreateTime int64 + ModifyTime int64 } // AllocationMetric is used to deserialize allocation metrics. @@ -132,11 +133,12 @@ type AllocationListStub struct { CreateIndex uint64 ModifyIndex uint64 CreateTime int64 + ModifyTime int64 } // AllocDeploymentStatus captures the status of the allocation as part of the // deployment. This can include things like if the allocation has been marked as -// heatlhy. +// healthy. type AllocDeploymentStatus struct { Healthy *bool ModifyIndex uint64 diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 72aefe4b7..1a396ac05 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -76,8 +76,13 @@ type AllocRunner struct { // to call. prevAlloc prevAllocWatcher + // ctx is cancelled with exitFn to cause the alloc to be destroyed + // (stopped and GC'd). ctx context.Context exitFn context.CancelFunc + + // waitCh is closed when the Run method exits. At that point the alloc + // has stopped and been GC'd. waitCh chan struct{} // State related fields @@ -917,11 +922,6 @@ func (r *AllocRunner) handleDestroy() { // state as we wait for a destroy. alloc := r.Alloc() - //TODO(schmichael) updater can cause a GC which can block on this alloc - // runner shutting down. Since handleDestroy can be called by Run() we - // can't block shutdown here as it would cause a deadlock. - go r.updater(alloc) - // Broadcast and persist state synchronously r.sendBroadcast(alloc) if err := r.saveAllocRunnerState(); err != nil { @@ -935,6 +935,11 @@ func (r *AllocRunner) handleDestroy() { r.logger.Printf("[ERR] client: alloc %q unable unmount task directories: %v", r.allocID, err) } + // Update the server with the alloc's status -- also marks the alloc as + // being eligible for GC, so from this point on the alloc can be gc'd + // at any time. + r.updater(alloc) + for { select { case <-r.ctx.Done(): @@ -1065,6 +1070,17 @@ func (r *AllocRunner) Destroy() { r.allocBroadcast.Close() } +// IsDestroyed returns true if the AllocRunner is not running and has been +// destroyed (GC'd). +func (r *AllocRunner) IsDestroyed() bool { + select { + case <-r.waitCh: + return true + default: + return false + } +} + // WaitCh returns a channel to wait for termination func (r *AllocRunner) WaitCh() <-chan struct{} { return r.waitCh diff --git a/client/client.go b/client/client.go index 79fbba606..21ce75353 100644 --- a/client/client.go +++ b/client/client.go @@ -124,7 +124,8 @@ type Client struct { // successfully serversDiscoveredCh chan struct{} - // allocs is the current set of allocations + // allocs maps alloc IDs to their AllocRunner. This map includes all + // AllocRunners - running and GC'd - until the server GCs them. allocs map[string]*AllocRunner allocLock sync.RWMutex @@ -486,15 +487,16 @@ func (c *Client) Stats() map[string]map[string]string { return stats } -// CollectAllocation garbage collects a single allocation -func (c *Client) CollectAllocation(allocID string) error { +// CollectAllocation garbage collects a single allocation on a node. Returns +// true if alloc was found and garbage collected; otherwise false. +func (c *Client) CollectAllocation(allocID string) bool { return c.garbageCollector.Collect(allocID) } // CollectAllAllocs garbage collects all allocations on a node in the terminal // state -func (c *Client) CollectAllAllocs() error { - return c.garbageCollector.CollectAll() +func (c *Client) CollectAllAllocs() { + c.garbageCollector.CollectAll() } // Node returns the locally registered node @@ -721,11 +723,16 @@ func (c *Client) getAllocRunners() map[string]*AllocRunner { return runners } -// NumAllocs returns the number of allocs this client has. Used to +// NumAllocs returns the number of un-GC'd allocs this client has. Used to // fulfill the AllocCounter interface for the GC. func (c *Client) NumAllocs() int { + n := 0 c.allocLock.RLock() - n := len(c.allocs) + for _, a := range c.allocs { + if !a.IsDestroyed() { + n++ + } + } c.allocLock.RUnlock() return n } @@ -1205,6 +1212,7 @@ func (c *Client) updateNodeStatus() error { for _, s := range resp.Servers { addr, err := resolveServer(s.RPCAdvertiseAddr) if err != nil { + c.logger.Printf("[WARN] client: ignoring invalid server %q: %v", s.RPCAdvertiseAddr, err) continue } e := endpoint{name: s.RPCAdvertiseAddr, addr: addr} @@ -1234,9 +1242,19 @@ func (c *Client) updateNodeStatus() error { // updateAllocStatus is used to update the status of an allocation func (c *Client) updateAllocStatus(alloc *structs.Allocation) { if alloc.Terminated() { - // Terminated, mark for GC - if ar, ok := c.getAllocRunners()[alloc.ID]; ok { + // Terminated, mark for GC if we're still tracking this alloc + // runner. If it's not being tracked that means the server has + // already GC'd it (see removeAlloc). + c.allocLock.RLock() + ar, ok := c.allocs[alloc.ID] + c.allocLock.RUnlock() + + if ok { c.garbageCollector.MarkForCollection(ar) + + // Trigger a GC in case we're over thresholds and just + // waiting for eligible allocs. + c.garbageCollector.Trigger() } } @@ -1531,9 +1549,7 @@ func (c *Client) runAllocs(update *allocUpdates) { // Remove the old allocations for _, remove := range diff.removed { - if err := c.removeAlloc(remove); err != nil { - c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", remove.ID, err) - } + c.removeAlloc(remove) } // Update the existing allocations @@ -1544,6 +1560,11 @@ func (c *Client) runAllocs(update *allocUpdates) { } } + // Make room for new allocations before running + if err := c.garbageCollector.MakeRoomFor(diff.added); err != nil { + c.logger.Printf("[ERR] client: error making room for new allocations: %v", err) + } + // Start the new allocations for _, add := range diff.added { migrateToken := update.migrateTokens[add.ID] @@ -1552,26 +1573,33 @@ func (c *Client) runAllocs(update *allocUpdates) { add.ID, err) } } + + // Trigger the GC once more now that new allocs are started that could + // have caused thesholds to be exceeded + c.garbageCollector.Trigger() } -// removeAlloc is invoked when we should remove an allocation -func (c *Client) removeAlloc(alloc *structs.Allocation) error { +// removeAlloc is invoked when we should remove an allocation because it has +// been removed by the server. +func (c *Client) removeAlloc(alloc *structs.Allocation) { c.allocLock.Lock() ar, ok := c.allocs[alloc.ID] if !ok { c.allocLock.Unlock() c.logger.Printf("[WARN] client: missing context for alloc '%s'", alloc.ID) - return nil + return } + + // Stop tracking alloc runner as it's been GC'd by the server delete(c.allocs, alloc.ID) c.allocLock.Unlock() // Ensure the GC has a reference and then collect. Collecting through the GC // applies rate limiting c.garbageCollector.MarkForCollection(ar) - go c.garbageCollector.Collect(alloc.ID) - return nil + // GC immediately since the server has GC'd it + go c.garbageCollector.Collect(alloc.ID) } // updateAlloc is invoked when we should update an allocation @@ -1592,9 +1620,9 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error { func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error { // Check if we already have an alloc runner c.allocLock.Lock() + defer c.allocLock.Unlock() if _, ok := c.allocs[alloc.ID]; ok { c.logger.Printf("[DEBUG]: client: dropping duplicate add allocation request: %q", alloc.ID) - c.allocLock.Unlock() return nil } @@ -1618,14 +1646,6 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err) } - // Must release allocLock as GC acquires it to count allocs - c.allocLock.Unlock() - - // Make room for the allocation before running it - if err := c.garbageCollector.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { - c.logger.Printf("[ERR] client: error making room for allocation: %v", err) - } - go ar.Run() return nil } diff --git a/client/client_test.go b/client/client_test.go index fbd49de2c..3492557f7 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -74,6 +74,9 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) { cb(config) } + // Enable raft as leader if we have bootstrap on + config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap + for i := 10; i >= 0; i-- { ports := freeport.GetT(t, 2) config.RPCAddr = &net.TCPAddr{ @@ -657,7 +660,6 @@ func TestClient_WatchAllocs(t *testing.T) { alloc2.JobID = job.ID alloc2.Job = job - // Insert at zero so they are pulled state := s1.State() if err := state.UpsertJob(100, job); err != nil { t.Fatal(err) @@ -681,23 +683,20 @@ func TestClient_WatchAllocs(t *testing.T) { }) // Delete one allocation - err = state.DeleteEval(103, nil, []string{alloc1.ID}) - if err != nil { + if err := state.DeleteEval(103, nil, []string{alloc1.ID}); err != nil { t.Fatalf("err: %v", err) } // Update the other allocation. Have to make a copy because the allocs are // shared in memory in the test and the modify index would be updated in the // alloc runner. - alloc2_2 := new(structs.Allocation) - *alloc2_2 = *alloc2 + alloc2_2 := alloc2.Copy() alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop - err = state.UpsertAllocs(104, []*structs.Allocation{alloc2_2}) - if err != nil { - t.Fatalf("err: %v", err) + if err := state.UpsertAllocs(104, []*structs.Allocation{alloc2_2}); err != nil { + t.Fatalf("err upserting stopped alloc: %v", err) } - // One allocations should get de-registered + // One allocation should get GC'd and removed testutil.WaitForResult(func() (bool, error) { c1.allocLock.RLock() num := len(c1.allocs) diff --git a/client/gc.go b/client/gc.go index 728b9c920..a7332ebc4 100644 --- a/client/gc.go +++ b/client/gc.go @@ -28,21 +28,36 @@ type GCConfig struct { ParallelDestroys int } -// AllocCounter is used by AllocGarbageCollector to discover how many -// allocations a node has and is generally fulfilled by the Client. +// AllocCounter is used by AllocGarbageCollector to discover how many un-GC'd +// allocations a client has and is generally fulfilled by the Client. type AllocCounter interface { NumAllocs() int } // AllocGarbageCollector garbage collects terminated allocations on a node type AllocGarbageCollector struct { - allocRunners *IndexedGCAllocPQ + config *GCConfig + + // allocRunners marked for GC + allocRunners *IndexedGCAllocPQ + + // statsCollector for node based thresholds (eg disk) statsCollector stats.NodeStatsCollector - allocCounter AllocCounter - config *GCConfig - logger *log.Logger - destroyCh chan struct{} - shutdownCh chan struct{} + + // allocCounter return the number of un-GC'd allocs on this node + allocCounter AllocCounter + + // destroyCh is a semaphore for rate limiting concurrent garbage + // collections + destroyCh chan struct{} + + // shutdownCh is closed when the GC's run method should exit + shutdownCh chan struct{} + + // triggerCh is ticked by the Trigger method to cause a GC + triggerCh chan struct{} + + logger *log.Logger } // NewAllocGarbageCollector returns a garbage collector for terminated @@ -51,7 +66,7 @@ type AllocGarbageCollector struct { func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, ac AllocCounter, config *GCConfig) *AllocGarbageCollector { // Require at least 1 to make progress if config.ParallelDestroys <= 0 { - logger.Printf("[WARN] client: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys) + logger.Printf("[WARN] client.gc: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys) config.ParallelDestroys = 1 } @@ -63,6 +78,7 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats logger: logger, destroyCh: make(chan struct{}, config.ParallelDestroys), shutdownCh: make(chan struct{}), + triggerCh: make(chan struct{}, 1), } return gc @@ -71,16 +87,28 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats // Run the periodic garbage collector. func (a *AllocGarbageCollector) Run() { ticker := time.NewTicker(a.config.Interval) + a.logger.Printf("[DEBUG] client.gc: GC'ing ever %v", a.config.Interval) for { select { + case <-a.triggerCh: case <-ticker.C: - if err := a.keepUsageBelowThreshold(); err != nil { - a.logger.Printf("[ERR] client: error garbage collecting allocation: %v", err) - } case <-a.shutdownCh: ticker.Stop() return } + + if err := a.keepUsageBelowThreshold(); err != nil { + a.logger.Printf("[ERR] client.gc: error garbage collecting allocation: %v", err) + } + } +} + +// Force the garbage collector to run. +func (a *AllocGarbageCollector) Trigger() { + select { + case a.triggerCh <- struct{}{}: + default: + // already triggered } } @@ -95,25 +123,16 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { } // Check if we have enough free space - err := a.statsCollector.Collect() - if err != nil { + if err := a.statsCollector.Collect(); err != nil { return err } // See if we are below thresholds for used disk space and inode usage - // TODO(diptanu) figure out why this is nil - stats := a.statsCollector.Stats() - if stats == nil { - break - } - - diskStats := stats.AllocDirStats - if diskStats == nil { - break - } - + diskStats := a.statsCollector.Stats().AllocDirStats reason := "" + liveAllocs := a.allocCounter.NumAllocs() + switch { case diskStats.UsedPercent > a.config.DiskUsageThreshold: reason = fmt.Sprintf("disk usage of %.0f is over gc threshold of %.0f", @@ -121,19 +140,19 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { case diskStats.InodesUsedPercent > a.config.InodeUsageThreshold: reason = fmt.Sprintf("inode usage of %.0f is over gc threshold of %.0f", diskStats.InodesUsedPercent, a.config.InodeUsageThreshold) - case a.numAllocs() > a.config.MaxAllocs: - reason = fmt.Sprintf("number of allocations is over the limit (%d)", a.config.MaxAllocs) + case liveAllocs > a.config.MaxAllocs: + reason = fmt.Sprintf("number of allocations (%d) is over the limit (%d)", liveAllocs, a.config.MaxAllocs) } - // No reason to gc, exit if reason == "" { + // No reason to gc, exit break } // Collect an allocation gcAlloc := a.allocRunners.Pop() if gcAlloc == nil { - a.logger.Printf("[WARN] client: garbage collection due to %s skipped because no terminal allocations", reason) + a.logger.Printf("[WARN] client.gc: garbage collection due to %s skipped because no terminal allocations", reason) break } @@ -151,7 +170,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason strin if alloc := ar.Alloc(); alloc != nil { id = alloc.ID } - a.logger.Printf("[INFO] client: garbage collecting allocation %s due to %s", id, reason) + a.logger.Printf("[INFO] client.gc: garbage collecting allocation %s due to %s", id, reason) // Acquire the destroy lock select { @@ -167,7 +186,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason strin case <-a.shutdownCh: } - a.logger.Printf("[DEBUG] client: garbage collected %q", ar.Alloc().ID) + a.logger.Printf("[DEBUG] client.gc: garbage collected %q", ar.Alloc().ID) // Release the lock <-a.destroyCh @@ -177,41 +196,47 @@ func (a *AllocGarbageCollector) Stop() { close(a.shutdownCh) } -// Collect garbage collects a single allocation on a node -func (a *AllocGarbageCollector) Collect(allocID string) error { - gcAlloc, err := a.allocRunners.Remove(allocID) - if err != nil { - return fmt.Errorf("unable to collect allocation %q: %v", allocID, err) +// Collect garbage collects a single allocation on a node. Returns true if +// alloc was found and garbage collected; otherwise false. +func (a *AllocGarbageCollector) Collect(allocID string) bool { + if gcAlloc := a.allocRunners.Remove(allocID); gcAlloc != nil { + a.destroyAllocRunner(gcAlloc.allocRunner, "forced collection") + return true } - a.destroyAllocRunner(gcAlloc.allocRunner, "forced collection") - return nil + + a.logger.Printf("[DEBUG] client.gc: alloc %s is invalid or was already garbage collected", allocID) + return false } // CollectAll garbage collects all termianated allocations on a node -func (a *AllocGarbageCollector) CollectAll() error { +func (a *AllocGarbageCollector) CollectAll() { for { select { case <-a.shutdownCh: - return nil + return default: } gcAlloc := a.allocRunners.Pop() if gcAlloc == nil { - break + return } - go a.destroyAllocRunner(gcAlloc.allocRunner, "forced full collection") + go a.destroyAllocRunner(gcAlloc.allocRunner, "forced full node collection") } - return nil } // MakeRoomFor garbage collects enough number of allocations in the terminal // state to make room for new allocations func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error { + if len(allocations) == 0 { + // Nothing to make room for! + return nil + } + // GC allocs until below the max limit + the new allocations max := a.config.MaxAllocs - len(allocations) - for a.numAllocs() > max { + for a.allocCounter.NumAllocs() > max { select { case <-a.shutdownCh: return nil @@ -227,8 +252,9 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e } // Destroy the alloc runner and wait until it exits - a.destroyAllocRunner(gcAlloc.allocRunner, "new allocations") + a.destroyAllocRunner(gcAlloc.allocRunner, fmt.Sprintf("new allocations and over max (%d)", a.config.MaxAllocs)) } + totalResource := &structs.Resources{} for _, alloc := range allocations { if err := totalResource.Add(alloc.Resources); err != nil { @@ -303,26 +329,9 @@ func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) { return } - a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID) - a.allocRunners.Push(ar) -} - -// Remove removes an alloc runner without garbage collecting it -func (a *AllocGarbageCollector) Remove(ar *AllocRunner) { - if ar == nil || ar.Alloc() == nil { - return + if a.allocRunners.Push(ar) { + a.logger.Printf("[INFO] client.gc: marking allocation %v for GC", ar.Alloc().ID) } - - alloc := ar.Alloc() - if _, err := a.allocRunners.Remove(alloc.ID); err == nil { - a.logger.Printf("[INFO] client: removed alloc runner %v from garbage collector", alloc.ID) - } -} - -// numAllocs returns the total number of allocs tracked by the client as well -// as those marked for GC. -func (a *AllocGarbageCollector) numAllocs() int { - return a.allocRunners.Length() + a.allocCounter.NumAllocs() } // GCAlloc wraps an allocation runner and an index enabling it to be used within @@ -381,15 +390,16 @@ func NewIndexedGCAllocPQ() *IndexedGCAllocPQ { } } -// Push an alloc runner into the GC queue -func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) { +// Push an alloc runner into the GC queue. Returns true if alloc was added, +// false if the alloc already existed. +func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) bool { i.pqLock.Lock() defer i.pqLock.Unlock() alloc := ar.Alloc() if _, ok := i.index[alloc.ID]; ok { // No work to do - return + return false } gcAlloc := &GCAlloc{ timeStamp: time.Now(), @@ -397,7 +407,7 @@ func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) { } i.index[alloc.ID] = gcAlloc heap.Push(&i.heap, gcAlloc) - return + return true } func (i *IndexedGCAllocPQ) Pop() *GCAlloc { @@ -413,17 +423,18 @@ func (i *IndexedGCAllocPQ) Pop() *GCAlloc { return gcAlloc } -func (i *IndexedGCAllocPQ) Remove(allocID string) (*GCAlloc, error) { +// Remove alloc from GC. Returns nil if alloc doesn't exist. +func (i *IndexedGCAllocPQ) Remove(allocID string) *GCAlloc { i.pqLock.Lock() defer i.pqLock.Unlock() if gcAlloc, ok := i.index[allocID]; ok { heap.Remove(&i.heap, gcAlloc.index) delete(i.index, allocID) - return gcAlloc, nil + return gcAlloc } - return nil, fmt.Errorf("alloc %q not present", allocID) + return nil } func (i *IndexedGCAllocPQ) Length() int { diff --git a/client/gc_test.go b/client/gc_test.go index 960fb2a80..ac28239a1 100644 --- a/client/gc_test.go +++ b/client/gc_test.go @@ -1,12 +1,15 @@ package client import ( + "fmt" "testing" "time" + "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" ) func gcConfig() *GCConfig { @@ -128,9 +131,7 @@ func TestAllocGarbageCollector_Collect(t *testing.T) { close(ar1.waitCh) close(ar2.waitCh) - if err := gc.Collect(ar1.Alloc().ID); err != nil { - t.Fatalf("err: %v", err) - } + gc.Collect(ar1.Alloc().ID) gcAlloc := gc.allocRunners.Pop() if gcAlloc == nil || gcAlloc.allocRunner != ar2 { t.Fatalf("bad gcAlloc: %v", gcAlloc) @@ -147,9 +148,7 @@ func TestAllocGarbageCollector_CollectAll(t *testing.T) { gc.MarkForCollection(ar1) gc.MarkForCollection(ar2) - if err := gc.CollectAll(); err != nil { - t.Fatalf("err: %v", err) - } + gc.CollectAll() gcAlloc := gc.allocRunners.Pop() if gcAlloc != nil { t.Fatalf("bad gcAlloc: %v", gcAlloc) @@ -290,40 +289,132 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T) } } -func TestAllocGarbageCollector_MakeRoomForAllocations_MaxAllocs(t *testing.T) { +// TestAllocGarbageCollector_MaxAllocs asserts that when making room for new +// allocs, terminal allocs are GC'd until old_allocs + new_allocs <= limit +func TestAllocGarbageCollector_MaxAllocs(t *testing.T) { t.Parallel() - const ( - liveAllocs = 3 - maxAllocs = 6 - gcAllocs = 4 - gcAllocsLeft = 1 - ) + server, serverAddr := testServer(t, nil) + defer server.Shutdown() + testutil.WaitForLeader(t, server.RPC) - logger := testLogger() - statsCollector := &MockStatsCollector{ - availableValues: []uint64{10 * 1024 * MB}, - usedPercents: []float64{0}, - inodePercents: []float64{0}, - } - allocCounter := &MockAllocCounter{allocs: liveAllocs} - conf := gcConfig() - conf.MaxAllocs = maxAllocs - gc := NewAllocGarbageCollector(logger, statsCollector, allocCounter, conf) + const maxAllocs = 6 + client := testClient(t, func(c *config.Config) { + c.GCMaxAllocs = maxAllocs + c.GCDiskUsageThreshold = 100 + c.GCInodeUsageThreshold = 100 + c.GCParallelDestroys = 1 + c.GCInterval = time.Hour - for i := 0; i < gcAllocs; i++ { - _, ar := testAllocRunnerFromAlloc(mock.Alloc(), false) - close(ar.waitCh) - gc.MarkForCollection(ar) + c.RPCHandler = server + c.Servers = []string{serverAddr} + c.ConsulConfig.ClientAutoJoin = new(bool) // squelch logs + }) + defer client.Shutdown() + waitTilNodeReady(client, t) + + callN := 0 + assertAllocs := func(expectedAll, expectedDestroyed int) { + // Wait for allocs to be started + callN++ + client.logger.Printf("[TEST] %d -- Waiting for %d total allocs, %d GC'd", callN, expectedAll, expectedDestroyed) + testutil.WaitForResult(func() (bool, error) { + all, destroyed := 0, 0 + for _, ar := range client.getAllocRunners() { + all++ + if ar.IsDestroyed() { + destroyed++ + } + } + return all == expectedAll && destroyed == expectedDestroyed, fmt.Errorf( + "expected %d allocs (found %d); expected %d destroy (found %d)", + expectedAll, all, expectedDestroyed, destroyed, + ) + }, func(err error) { + client.logger.Printf("[TEST] %d -- FAILED to find %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed) + t.Fatalf("%d alloc state: %v", callN, err) + }) + client.logger.Printf("[TEST] %d -- Found %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed) } - if err := gc.MakeRoomFor([]*structs.Allocation{mock.Alloc(), mock.Alloc()}); err != nil { - t.Fatalf("error making room for 2 new allocs: %v", err) + // Create a job + state := server.State() + job := mock.Job() + job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + job.TaskGroups[0].Tasks[0].Config["run_for"] = "30s" + nodeID := client.Node().ID + if err := state.UpsertJob(98, job); err != nil { + t.Fatalf("error upserting job: %v", err) + } + if err := state.UpsertJobSummary(99, mock.JobSummary(job.ID)); err != nil { + t.Fatalf("error upserting job summary: %v", err) } - // There should be gcAllocsLeft alloc runners left to be collected - if n := len(gc.allocRunners.index); n != gcAllocsLeft { - t.Fatalf("expected %d remaining GC-able alloc runners but found %d", gcAllocsLeft, n) + newAlloc := func() *structs.Allocation { + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.Job = job + alloc.NodeID = nodeID + return alloc } + + // Create the allocations + allocs := make([]*structs.Allocation, 7) + for i := 0; i < len(allocs); i++ { + allocs[i] = newAlloc() + } + + // Upsert a copy of the allocs as modifying the originals later would + // cause a race + { + allocsCopy := make([]*structs.Allocation, len(allocs)) + for i, a := range allocs { + allocsCopy[i] = a.Copy() + } + if err := state.UpsertAllocs(100, allocsCopy); err != nil { + t.Fatalf("error upserting initial allocs: %v", err) + } + } + + // 7 total, 0 GC'd + assertAllocs(7, 0) + + // Set the first few as terminal so they're marked for gc + const terminalN = 4 + for i := 0; i < terminalN; i++ { + // Copy the alloc so the pointers aren't shared + alloc := allocs[i].Copy() + alloc.DesiredStatus = structs.AllocDesiredStatusStop + allocs[i] = alloc + } + if err := state.UpsertAllocs(101, allocs[:terminalN]); err != nil { + t.Fatalf("error upserting stopped allocs: %v", err) + } + + // 7 total, 1 GC'd to get down to limit of 6 + assertAllocs(7, 1) + + // Add one more alloc + if err := state.UpsertAllocs(102, []*structs.Allocation{newAlloc()}); err != nil { + t.Fatalf("error upserting new alloc: %v", err) + } + + // 8 total, 1 GC'd to get down to limit of 6 + // If this fails it may be due to the gc's Run and MarkRoomFor methods + // gc'ing concurrently. May have to disable gc's run loop if this test + // is flaky. + assertAllocs(8, 2) + + // Add new allocs to cause the gc of old terminal ones + newAllocs := make([]*structs.Allocation, 4) + for i := 0; i < len(newAllocs); i++ { + newAllocs[i] = newAlloc() + } + if err := state.UpsertAllocs(200, newAllocs); err != nil { + t.Fatalf("error upserting %d new allocs: %v", len(newAllocs), err) + } + + // 12 total, 4 GC'd total because all other allocs are alive + assertAllocs(12, 4) } func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) { @@ -391,39 +482,3 @@ func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) { t.Fatalf("gcAlloc: %v", gcAlloc) } } - -func TestAllocGarbageCollector_MaxAllocsThreshold(t *testing.T) { - t.Parallel() - const ( - liveAllocs = 3 - maxAllocs = 6 - gcAllocs = 4 - gcAllocsLeft = 1 - ) - - logger := testLogger() - statsCollector := &MockStatsCollector{ - availableValues: []uint64{1000}, - usedPercents: []float64{0}, - inodePercents: []float64{0}, - } - allocCounter := &MockAllocCounter{allocs: liveAllocs} - conf := gcConfig() - conf.MaxAllocs = 4 - gc := NewAllocGarbageCollector(logger, statsCollector, allocCounter, conf) - - for i := 0; i < gcAllocs; i++ { - _, ar := testAllocRunnerFromAlloc(mock.Alloc(), false) - close(ar.waitCh) - gc.MarkForCollection(ar) - } - - if err := gc.keepUsageBelowThreshold(); err != nil { - t.Fatalf("error gc'ing: %v", err) - } - - // We should have gc'd down to MaxAllocs - if n := len(gc.allocRunners.index); n != gcAllocsLeft { - t.Fatalf("expected remaining gc allocs (%d) to equal %d", n, gcAllocsLeft) - } -} diff --git a/client/stats/host.go b/client/stats/host.go index d284973e7..8f0f92377 100644 --- a/client/stats/host.go +++ b/client/stats/host.go @@ -190,9 +190,6 @@ func (h *HostStatsCollector) Stats() *HostStats { // toDiskStats merges UsageStat and PartitionStat to create a DiskStat func (h *HostStatsCollector) toDiskStats(usage *disk.UsageStat, partitionStat *disk.PartitionStat) *DiskStats { - if usage == nil { - return nil - } ds := DiskStats{ Size: usage.Total, Used: usage.Used, diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index 7011a3cfc..65bcbd011 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -115,7 +115,8 @@ func (s *HTTPServer) ClientGCRequest(resp http.ResponseWriter, req *http.Request return nil, structs.ErrPermissionDenied } - return nil, s.agent.Client().CollectAllAllocs() + s.agent.Client().CollectAllAllocs() + return nil, nil } func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -131,7 +132,12 @@ func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http } else if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilitySubmitJob) { return nil, structs.ErrPermissionDenied } - return nil, s.agent.Client().CollectAllocation(allocID) + + if !s.agent.Client().CollectAllocation(allocID) { + // Could not find alloc + return nil, fmt.Errorf("unable to collect allocation: not present") + } + return nil, nil } func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { diff --git a/command/agent/config-test-fixtures/basic.hcl b/command/agent/config-test-fixtures/basic.hcl index 188f085be..a96a1e431 100644 --- a/command/agent/config-test-fixtures/basic.hcl +++ b/command/agent/config-test-fixtures/basic.hcl @@ -152,6 +152,7 @@ tls { ca_file = "foo" cert_file = "bar" key_file = "pipe" + rpc_upgrade_mode = true verify_https_client = true } sentinel { diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index 25b25f8a2..3f42b2ae5 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -771,6 +771,7 @@ func parseTLSConfig(result **config.TLSConfig, list *ast.ObjectList) error { "http", "rpc", "verify_server_hostname", + "rpc_upgrade_mode", "ca_file", "cert_file", "key_file", diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 15ad5dc16..1a824f93a 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -172,6 +172,7 @@ func TestConfig_Parse(t *testing.T) { CAFile: "foo", CertFile: "bar", KeyFile: "pipe", + RPCUpgradeMode: true, VerifyHTTPSClient: true, }, HTTPAPIResponseHeaders: map[string]string{ diff --git a/command/alloc_status.go b/command/alloc_status.go index 2f206f40b..d1e75932e 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -214,6 +214,16 @@ func (c *AllocStatusCommand) Run(args []string) int { } func formatAllocBasicInfo(alloc *api.Allocation, client *api.Client, uuidLength int, verbose bool) (string, error) { + var formattedCreateTime, formattedModifyTime string + + if verbose { + formattedCreateTime = formatUnixNanoTime(alloc.CreateTime) + formattedModifyTime = formatUnixNanoTime(alloc.ModifyTime) + } else { + formattedCreateTime = prettyTimeDiff(time.Unix(0, alloc.CreateTime), time.Now()) + formattedModifyTime = prettyTimeDiff(time.Unix(0, alloc.ModifyTime), time.Now()) + } + basic := []string{ fmt.Sprintf("ID|%s", limit(alloc.ID, uuidLength)), fmt.Sprintf("Eval ID|%s", limit(alloc.EvalID, uuidLength)), @@ -225,7 +235,8 @@ func formatAllocBasicInfo(alloc *api.Allocation, client *api.Client, uuidLength fmt.Sprintf("Client Description|%s", alloc.ClientDescription), fmt.Sprintf("Desired Status|%s", alloc.DesiredStatus), fmt.Sprintf("Desired Description|%s", alloc.DesiredDescription), - fmt.Sprintf("Created At|%s", formatUnixNanoTime(alloc.CreateTime)), + fmt.Sprintf("Created|%s", formattedCreateTime), + fmt.Sprintf("Modified|%s", formattedModifyTime), } if alloc.DeploymentID != "" { diff --git a/command/alloc_status_test.go b/command/alloc_status_test.go index f858eaf6b..9be04f3dc 100644 --- a/command/alloc_status_test.go +++ b/command/alloc_status_test.go @@ -128,9 +128,14 @@ func TestAllocStatusCommand_Run(t *testing.T) { t.Fatalf("expected exit 0, got: %d", code) } out := ui.OutputWriter.String() - if !strings.Contains(out, "Created At") { - t.Fatalf("expected to have 'Created At' but saw: %s", out) + if !strings.Contains(out, "Created") { + t.Fatalf("expected to have 'Created' but saw: %s", out) } + + if !strings.Contains(out, "Modified") { + t.Fatalf("expected to have 'Modified' but saw: %s", out) + } + ui.OutputWriter.Reset() if code := cmd.Run([]string{"-address=" + url, "-verbose", allocId1}); code != 0 { @@ -140,8 +145,8 @@ func TestAllocStatusCommand_Run(t *testing.T) { if !strings.Contains(out, allocId1) { t.Fatal("expected to find alloc id in output") } - if !strings.Contains(out, "Created At") { - t.Fatalf("expected to have 'Created At' but saw: %s", out) + if !strings.Contains(out, "Created") { + t.Fatalf("expected to have 'Created' but saw: %s", out) } ui.OutputWriter.Reset() @@ -150,8 +155,8 @@ func TestAllocStatusCommand_Run(t *testing.T) { t.Fatalf("expected exit 0, got: %d", code) } out = ui.OutputWriter.String() - if !strings.Contains(out, "Created At") { - t.Fatalf("expected to have 'Created At' but saw: %s", out) + if !strings.Contains(out, "Created") { + t.Fatalf("expected to have 'Created' but saw: %s", out) } ui.OutputWriter.Reset() diff --git a/command/helpers.go b/command/helpers.go index 13f74ba9d..bf2f9dbbb 100644 --- a/command/helpers.go +++ b/command/helpers.go @@ -75,6 +75,116 @@ func formatTimeDifference(first, second time.Time, d time.Duration) string { return second.Truncate(d).Sub(first.Truncate(d)).String() } +// fmtInt formats v into the tail of buf. +// It returns the index where the output begins. +func fmtInt(buf []byte, v uint64) int { + w := len(buf) + for v > 0 { + w-- + buf[w] = byte(v%10) + '0' + v /= 10 + } + return w +} + +// prettyTimeDiff prints a human readable time difference. +// It uses abbreviated forms for each period - s for seconds, m for minutes, h for hours, +// d for days, mo for months, and y for years. Time difference is rounded to the nearest second, +// and the top two least granular periods are returned. For example, if the time difference +// is 10 months, 12 days, 3 hours and 2 seconds, the string "10mo12d" is returned. Zero values return the empty string +func prettyTimeDiff(first, second time.Time) string { + // handle zero values + if first.Second() == 0 { + return "" + } + // round to the nearest second + first = first.Round(time.Second) + second = second.Round(time.Second) + + // calculate time difference in seconds + d := second.Sub(first) + u := uint64(d.Seconds()) + + var buf [32]byte + w := len(buf) + secs := u % 60 + + // track indexes of various periods + var indexes []int + + if secs > 0 { + w-- + buf[w] = 's' + // u is now seconds + w = fmtInt(buf[:w], secs) + indexes = append(indexes, w) + } + u /= 60 + // u is now minutes + if u > 0 { + mins := u % 60 + if mins > 0 { + w-- + buf[w] = 'm' + w = fmtInt(buf[:w], mins) + indexes = append(indexes, w) + } + u /= 60 + // u is now hours + if u > 0 { + hrs := u % 24 + if hrs > 0 { + w-- + buf[w] = 'h' + w = fmtInt(buf[:w], hrs) + indexes = append(indexes, w) + } + u /= 24 + } + // u is now days + if u > 0 { + days := u % 30 + if days > 0 { + w-- + buf[w] = 'd' + w = fmtInt(buf[:w], days) + indexes = append(indexes, w) + } + u /= 30 + } + // u is now months + if u > 0 { + months := u % 12 + if months > 0 { + w-- + buf[w] = 'o' + w-- + buf[w] = 'm' + w = fmtInt(buf[:w], months) + indexes = append(indexes, w) + } + u /= 12 + } + // u is now years + if u > 0 { + w-- + buf[w] = 'y' + w = fmtInt(buf[:w], u) + indexes = append(indexes, w) + } + } + start := w + end := len(buf) + + // truncate to the first two periods + num_periods := len(indexes) + if num_periods > 2 { + end = indexes[num_periods-3] + } + return string(buf[start:end]) + " ago" + +} + // getLocalNodeID returns the node ID of the local Nomad Client and an error if // it couldn't be determined or the Agent is not running in Client mode. func getLocalNodeID(client *api.Client) (string, error) { diff --git a/command/helpers_test.go b/command/helpers_test.go index a0a538c8b..617a732f3 100644 --- a/command/helpers_test.go +++ b/command/helpers_test.go @@ -294,3 +294,35 @@ func TestJobGetter_HTTPServer(t *testing.T) { t.Fatalf("Unexpected file") } } + +func TestPrettyTimeDiff(t *testing.T) { + test_cases := []struct { + d time.Duration + exp string + }{ + {-740 * time.Second, "12m20s ago"}, + {-12 * time.Minute, "12m ago"}, + {-60 * time.Minute, "1h ago"}, + {-80 * time.Minute, "1h20m ago"}, + {-6 * time.Hour, "6h ago"}, + {-22165 * time.Second, "6h9m ago"}, + {-100 * time.Hour, "4d4h ago"}, + {-438000 * time.Minute, "10mo4d ago"}, + {-20460 * time.Hour, "2y4mo ago"}, + } + for _, tc := range test_cases { + t2 := time.Now().Add(tc.d) + out := prettyTimeDiff(t2, time.Now()) + if out != tc.exp { + t.Fatalf("expected :%v but got :%v", tc.exp, out) + } + } + + var t1 time.Time + out := prettyTimeDiff(t1, time.Now()) + + if out != "" { + t.Fatalf("Expected empty output but got:%v", out) + } + +} diff --git a/command/job_status.go b/command/job_status.go index 0adc69ef3..bf239ff93 100644 --- a/command/job_status.go +++ b/command/job_status.go @@ -406,9 +406,9 @@ func formatAllocListStubs(stubs []*api.AllocationListStub, verbose bool, uuidLen allocs := make([]string, len(stubs)+1) if verbose { - allocs[0] = "ID|Eval ID|Node ID|Task Group|Version|Desired|Status|Created At" + allocs[0] = "ID|Eval ID|Node ID|Task Group|Version|Desired|Status|Created|Modified" for i, alloc := range stubs { - allocs[i+1] = fmt.Sprintf("%s|%s|%s|%s|%d|%s|%s|%s", + allocs[i+1] = fmt.Sprintf("%s|%s|%s|%s|%d|%s|%s|%s|%s", limit(alloc.ID, uuidLength), limit(alloc.EvalID, uuidLength), limit(alloc.NodeID, uuidLength), @@ -416,19 +416,23 @@ func formatAllocListStubs(stubs []*api.AllocationListStub, verbose bool, uuidLen alloc.JobVersion, alloc.DesiredStatus, alloc.ClientStatus, - formatUnixNanoTime(alloc.CreateTime)) + formatUnixNanoTime(alloc.CreateTime), + formatUnixNanoTime(alloc.ModifyTime)) } } else { - allocs[0] = "ID|Node ID|Task Group|Version|Desired|Status|Created At" + allocs[0] = "ID|Node ID|Task Group|Version|Desired|Status|Created|Modified" for i, alloc := range stubs { - allocs[i+1] = fmt.Sprintf("%s|%s|%s|%d|%s|%s|%s", + createTimePretty := prettyTimeDiff(time.Unix(0, alloc.CreateTime), time.Now()) + modTimePretty := prettyTimeDiff(time.Unix(0, alloc.ModifyTime), time.Now()) + allocs[i+1] = fmt.Sprintf("%s|%s|%s|%d|%s|%s|%s|%s", limit(alloc.ID, uuidLength), limit(alloc.NodeID, uuidLength), alloc.TaskGroup, alloc.JobVersion, alloc.DesiredStatus, alloc.ClientStatus, - formatUnixNanoTime(alloc.CreateTime)) + createTimePretty, + modTimePretty) } } diff --git a/command/job_status_test.go b/command/job_status_test.go index 5a7c71ce8..4a07ea71f 100644 --- a/command/job_status_test.go +++ b/command/job_status_test.go @@ -113,9 +113,12 @@ func TestJobStatusCommand_Run(t *testing.T) { if !strings.Contains(out, "Allocations") { t.Fatalf("should dump allocations") } - if !strings.Contains(out, "Created At") { + if !strings.Contains(out, "Created") { t.Fatal("should have created header") } + if !strings.Contains(out, "Modified") { + t.Fatal("should have modified header") + } ui.ErrorWriter.Reset() ui.OutputWriter.Reset() @@ -138,6 +141,14 @@ func TestJobStatusCommand_Run(t *testing.T) { if !strings.Contains(out, "job1_sfx") || strings.Contains(out, "job2_sfx") { t.Fatalf("expected only job1_sfx, got: %s", out) } + + if !strings.Contains(out, "Created") { + t.Fatal("should have created header") + } + + if !strings.Contains(out, "Modified") { + t.Fatal("should have modified header") + } ui.OutputWriter.Reset() // Query in short view mode diff --git a/e2e/migrations/docker-run.sh b/e2e/migrations/docker-run.sh index 4b3e71f92..128599665 100755 --- a/e2e/migrations/docker-run.sh +++ b/e2e/migrations/docker-run.sh @@ -4,4 +4,11 @@ ROOT_DIRECTORY="$( dirname "$(dirname "$CURRENT_DIRECTORY")")" docker run --privileged -v \ $ROOT_DIRECTORY:/gopkg/src/github.com/hashicorp/nomad \ -it nomad-e2e /bin/bash \ --c "cd gopkg/src/github.com/hashicorp/nomad/e2e/migrations && go test -integration" +-c "cd gopkg/src/github.com/hashicorp/nomad/e2e/migrations && go test --run \ +TestJobMigrations -integration" + +docker run --privileged \ +-v $ROOT_DIRECTORY:/gopkg/src/github.com/hashicorp/nomad \ +-it nomad-e2e /bin/bash \ +-c "cd gopkg/src/github.com/hashicorp/nomad/e2e/migrations && go test --run \ +TestMigrations_WithACLs -integration" diff --git a/e2e/migrations/migrations_test.go b/e2e/migrations/migrations_test.go index 6752800ac..14140aa96 100644 --- a/e2e/migrations/migrations_test.go +++ b/e2e/migrations/migrations_test.go @@ -3,6 +3,7 @@ package e2e import ( "bytes" "flag" + "fmt" "io/ioutil" "os" "os/exec" @@ -108,15 +109,26 @@ func isSuccess(execCmd *exec.Cmd, retries int, keyword string) (string, error) { // allNodesAreReady attempts to query the status of a cluster a specific number // of times -func allNodesAreReady(retries int) (string, error) { - cmd := exec.Command("nomad", "node-status") +func allNodesAreReady(retries int, flags string) (string, error) { + var cmd *exec.Cmd + if flags != "" { + cmd = exec.Command("nomad", "node-status", flags) + } else { + cmd = exec.Command("nomad", "node-status") + } + return isSuccess(cmd, retries, "initializing") } // jobIsReady attempts sto query the status of a specific job a fixed number of // times -func jobIsReady(retries int, jobName string) (string, error) { - cmd := exec.Command("nomad", "job", "status", jobName) +func jobIsReady(retries int, flags, jobName string) (string, error) { + var cmd *exec.Cmd + if flags != "" { + cmd = exec.Command("nomad", "job", "status", flags, jobName) + } else { + cmd = exec.Command("nomad", "job", "status", jobName) + } return isSuccess(cmd, retries, "pending") } @@ -146,6 +158,60 @@ func startCluster(clusterConfig []string) (func(), error) { return f, nil } +func bootstrapACL() (string, error) { + var bootstrapOut bytes.Buffer + + bootstrapCmd := exec.Command("nomad", "acl", "bootstrap") + bootstrapCmd.Stdout = &bootstrapOut + + if err := bootstrapCmd.Run(); err != nil { + return "", err + } + + parts := strings.Split(bootstrapOut.String(), "\n") + if len(parts) < 2 { + return "", fmt.Errorf("unexpected bootstrap output") + } + + secretIDLine := strings.Split(parts[1], " ") + if secretIDLine[0] != "Secret" { + return "", fmt.Errorf("unable to find secret id in bootstrap output") + } + return secretIDLine[len(secretIDLine)-1], nil +} + +func startACLServer(serverConfig string) (func(), string, error) { + cmd := exec.Command("nomad", "agent", "-config", serverConfig) + if err := cmd.Start(); err != nil { + return func() {}, "", err + } + + f := func() { + cmd.Process.Kill() + } + + var secretID string + var err error + testutil.WaitForResultRetries(2000, func() (bool, error) { + + secretIDOutput, err := bootstrapACL() + if err != nil { + return false, err + } + + secretID = secretIDOutput + return true, nil + }, func(cmd_err error) { + err = cmd_err + }) + + if err != nil { + return func() {}, "", err + } + + return f, secretID, nil +} + func TestJobMigrations(t *testing.T) { flag.Parse() if !*integration { @@ -160,7 +226,7 @@ func TestJobMigrations(t *testing.T) { assert.Nil(err) defer stopCluster() - _, err = allNodesAreReady(10) + _, err = allNodesAreReady(10, "") assert.Nil(err) fh, err := ioutil.TempFile("", "nomad-sleep-1") @@ -175,7 +241,7 @@ func TestJobMigrations(t *testing.T) { err = jobCmd.Run() assert.Nil(err) - firstJobOutput, err := jobIsReady(20, "sleep") + firstJobOutput, err := jobIsReady(20, "", "sleep") assert.Nil(err) assert.NotContains(firstJobOutput, "failed") assert.NotContains(firstJobOutput, "pending") @@ -185,15 +251,69 @@ func TestJobMigrations(t *testing.T) { defer os.Remove(fh2.Name()) _, err = fh2.WriteString(sleepJobTwo) - assert.Nil(err) secondJobCmd := exec.Command("nomad", "run", fh2.Name()) err = secondJobCmd.Run() assert.Nil(err) - jobOutput, err := jobIsReady(20, "sleep") + jobOutput, err := jobIsReady(20, "", "sleep") assert.Nil(err) + assert.NotContains(jobOutput, "failed") + assert.Contains(jobOutput, "complete") +} + +func TestMigrations_WithACLs(t *testing.T) { + flag.Parse() + if !*integration { + t.Skip("skipping test in non-integration mode.") + } + + t.Parallel() + assert := assert.New(t) + + stopServer, secretID, err := startACLServer("server_acl.hcl") + assert.Nil(err) + defer stopServer() + + clusterConfig := []string{"client1.hcl", "client2.hcl"} + stopCluster, err := startCluster(clusterConfig) + assert.Nil(err) + defer stopCluster() + + _, err = allNodesAreReady(10, "-token="+secretID) + assert.Nil(err) + + fh, err := ioutil.TempFile("", "nomad-sleep-1") + assert.Nil(err) + + defer os.Remove(fh.Name()) + _, err = fh.WriteString(sleepJobOne) + + assert.Nil(err) + + jobCmd := exec.Command("nomad", "run", "-token="+secretID, fh.Name()) + err = jobCmd.Run() + assert.Nil(err) + + _, err = jobIsReady(20, "-token="+secretID, "sleep") + assert.Nil(err) + + fh2, err := ioutil.TempFile("", "nomad-sleep-2") + assert.Nil(err) + + defer os.Remove(fh2.Name()) + _, err = fh2.WriteString(sleepJobTwo) + + assert.Nil(err) + + secondJobCmd := exec.Command("nomad", "run", "-token="+secretID, fh2.Name()) + err = secondJobCmd.Run() + assert.Nil(err) + + jobOutput, err := jobIsReady(20, "-token="+secretID, "sleep") + assert.Nil(err) + assert.NotContains(jobOutput, "failed") assert.NotContains(jobOutput, "pending") assert.Contains(jobOutput, "complete") diff --git a/e2e/migrations/server_acl.hcl b/e2e/migrations/server_acl.hcl new file mode 100644 index 000000000..60049c2e8 --- /dev/null +++ b/e2e/migrations/server_acl.hcl @@ -0,0 +1,13 @@ +log_level = "DEBUG" + +data_dir = "/tmp/server1_acl" + +server { + enabled = true + bootstrap_expect = 1 +} + +acl { + enabled = true +} + diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index c130e90af..7f4265fb9 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -820,6 +820,11 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene return fmt.Errorf("must update at least one allocation") } + // Update modified timestamp for client initiated allocation updates + now := time.Now().UTC().UnixNano() + for _, alloc := range args.Alloc { + alloc.ModifyTime = now + } // Add this to the batch n.updatesLock.Lock() n.updates = append(n.updates, args.Alloc...) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index e50e96826..909e2a637 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1323,8 +1323,10 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) { node.ModifyIndex = resp.Index // Inject fake evaluations async + now := time.Now().UTC().UnixNano() alloc := mock.Alloc() alloc.NodeID = node.ID + alloc.ModifyTime = now state := s1.fsm.State() state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) start := time.Now() @@ -1363,6 +1365,32 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) { t.Fatalf("bad: %#v", resp2.Allocs) } + iter, err := state.AllocsByIDPrefix(nil, structs.DefaultNamespace, alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + getAllocs := func(iter memdb.ResultIterator) []*structs.Allocation { + var allocs []*structs.Allocation + for { + raw := iter.Next() + if raw == nil { + break + } + allocs = append(allocs, raw.(*structs.Allocation)) + } + return allocs + } + out := getAllocs(iter) + + if len(out) != 1 { + t.Fatalf("Expected to get one allocation but got:%v", out) + } + + if out[0].ModifyTime != now { + t.Fatalf("Invalid modify time %v", out[0].ModifyTime) + } + // Alloc updates fire watches time.AfterFunc(100*time.Millisecond, func() { allocUpdate := mock.Alloc() @@ -1675,6 +1703,10 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) { if out.ClientStatus != structs.AllocClientStatusFailed { t.Fatalf("Bad: %#v", out) } + + if out.ModifyTime <= 0 { + t.Fatalf("must have valid modify time but was %v", out.ModifyTime) + } } func TestClientEndpoint_BatchUpdate(t *testing.T) { diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 9e82c54f6..5d2c29bcf 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -150,6 +150,7 @@ func (s *Server) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap if alloc.CreateTime == 0 { alloc.CreateTime = now } + alloc.ModifyTime = now } // Dispatch the Raft transaction diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index f4341d49c..7d3195d92 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -147,6 +147,16 @@ func TestPlanApply_applyPlan(t *testing.T) { t.Fatalf("missing alloc") } + if out.CreateTime <= 0 { + t.Fatalf("invalid create time %v", out.CreateTime) + } + if out.ModifyTime <= 0 { + t.Fatalf("invalid modify time %v", out.CreateTime) + } + if out.CreateTime != out.ModifyTime { + t.Fatalf("create time %v modify time %v must be equal", out.CreateTime, out.ModifyTime) + } + // Lookup the new deployment dout, err := fsmState.DeploymentByID(ws, plan.Deployment.ID) if err != nil { @@ -226,6 +236,10 @@ func TestPlanApply_applyPlan(t *testing.T) { t.Fatalf("missing job") } + if out.ModifyTime <= 0 { + t.Fatalf("must have valid modify time but was %v", out.ModifyTime) + } + // Lookup the allocation out, err = s1.fsm.State().AllocByID(ws, alloc2.ID) if err != nil { diff --git a/nomad/rpc.go b/nomad/rpc.go index 45efd57cb..4b0dbf828 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -100,9 +100,11 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) { // Enforce TLS if EnableRPC is set if s.config.TLSConfig.EnableRPC && !isTLS && RPCType(buf[0]) != rpcTLS { - s.logger.Printf("[WARN] nomad.rpc: Non-TLS connection attempted with RequireTLS set") - conn.Close() - return + if !s.config.TLSConfig.RPCUpgradeMode { + s.logger.Printf("[WARN] nomad.rpc: Non-TLS connection attempted from %s with RequireTLS set", conn.RemoteAddr().String()) + conn.Close() + return + } } // Switch on the byte diff --git a/nomad/rpc_test.go b/nomad/rpc_test.go index d8890f34e..392bb6870 100644 --- a/nomad/rpc_test.go +++ b/nomad/rpc_test.go @@ -3,10 +3,17 @@ package nomad import ( "net" "net/rpc" + "os" + "path" "testing" "time" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" ) // rpcClient is a test helper method to return a ClientCodec to use to make rpc @@ -84,3 +91,83 @@ func TestRPC_forwardRegion(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestRPC_PlaintextRPCSucceedsWhenInUpgradeMode(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + const ( + cafile = "../helper/tlsutil/testdata/ca.pem" + foocert = "../helper/tlsutil/testdata/nomad-foo.pem" + fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem" + ) + dir := tmpDir(t) + defer os.RemoveAll(dir) + + s1 := testServer(t, func(c *Config) { + c.DataDir = path.Join(dir, "node1") + c.TLSConfig = &config.TLSConfig{ + EnableRPC: true, + VerifyServerHostname: true, + CAFile: cafile, + CertFile: foocert, + KeyFile: fookey, + RPCUpgradeMode: true, + } + }) + defer s1.Shutdown() + + codec := rpcClient(t, s1) + + // Create the register request + node := mock.Node() + req := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp) + assert.Nil(err) + + // Check that heartbeatTimers has the heartbeat ID + _, ok := s1.heartbeatTimers[node.ID] + assert.True(ok) +} + +func TestRPC_PlaintextRPCFailsWhenNotInUpgradeMode(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + const ( + cafile = "../helper/tlsutil/testdata/ca.pem" + foocert = "../helper/tlsutil/testdata/nomad-foo.pem" + fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem" + ) + dir := tmpDir(t) + defer os.RemoveAll(dir) + + s1 := testServer(t, func(c *Config) { + c.DataDir = path.Join(dir, "node1") + c.TLSConfig = &config.TLSConfig{ + EnableRPC: true, + VerifyServerHostname: true, + CAFile: cafile, + CertFile: foocert, + KeyFile: fookey, + } + }) + defer s1.Shutdown() + + codec := rpcClient(t, s1) + + node := mock.Node() + req := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp) + assert.NotNil(err) +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 62bb49480..0091055b3 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1718,6 +1718,9 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a // Update the modify index copyAlloc.ModifyIndex = index + // Update the modify time + copyAlloc.ModifyTime = alloc.ModifyTime + if err := s.updateDeploymentWithAlloc(index, copyAlloc, exist, txn); err != nil { return fmt.Errorf("error updating deployment: %v", err) } diff --git a/nomad/structs/config/tls.go b/nomad/structs/config/tls.go index 2baa76a07..322e5581d 100644 --- a/nomad/structs/config/tls.go +++ b/nomad/structs/config/tls.go @@ -29,6 +29,11 @@ type TLSConfig struct { // Must be provided to serve TLS connections. KeyFile string `mapstructure:"key_file"` + // RPCUpgradeMode should be enabled when a cluster is being upgraded + // to TLS. Allows servers to accept both plaintext and TLS connections and + // should only be a temporary state. + RPCUpgradeMode bool `mapstructure:"rpc_upgrade_mode"` + // Verify connections to the HTTPS API VerifyHTTPSClient bool `mapstructure:"verify_https_client"` } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 42e7db18d..fbe93de85 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4605,6 +4605,9 @@ type Allocation struct { // CreateTime is the time the allocation has finished scheduling and been // verified by the plan applier. CreateTime int64 + + // ModifyTime is the time the allocation was last updated. + ModifyTime int64 } // Index returns the index of the allocation. If the allocation is from a task @@ -4746,6 +4749,7 @@ func (a *Allocation) Stub() *AllocListStub { CreateIndex: a.CreateIndex, ModifyIndex: a.ModifyIndex, CreateTime: a.CreateTime, + ModifyTime: a.ModifyTime, } } @@ -4767,6 +4771,7 @@ type AllocListStub struct { CreateIndex uint64 ModifyIndex uint64 CreateTime int64 + ModifyTime int64 } // AllocMetric is used to track various metrics while attempting diff --git a/website/source/api/allocations.html.md b/website/source/api/allocations.html.md index ab6a97511..6655d52e0 100644 --- a/website/source/api/allocations.html.md +++ b/website/source/api/allocations.html.md @@ -141,7 +141,8 @@ $ curl \ }, "CreateIndex": 54, "ModifyIndex": 57, - "CreateTime": 1495747371794276400 + "CreateTime": 1495747371794276400, + "ModifyTime": 1495747371794276400 } ] ``` @@ -461,7 +462,8 @@ $ curl \ "CreateIndex": 54, "ModifyIndex": 57, "AllocModifyIndex": 54, - "CreateTime": 1495747371794276400 + "CreateTime": 1495747371794276400, + "ModifyTime": 1495747371794276400 } ``` diff --git a/website/source/api/deployments.html.md b/website/source/api/deployments.html.md index e9163bf0b..ed4029968 100644 --- a/website/source/api/deployments.html.md +++ b/website/source/api/deployments.html.md @@ -253,7 +253,8 @@ $ curl \ "DeploymentStatus": null, "CreateIndex": 19, "ModifyIndex": 22, - "CreateTime": 1498775380678486300 + "CreateTime": 1498775380678486300, + "ModifyTime": 1498775380678486300 } ] ``` diff --git a/website/source/api/jobs.html.md b/website/source/api/jobs.html.md index bfbb51ddd..eb932f954 100644 --- a/website/source/api/jobs.html.md +++ b/website/source/api/jobs.html.md @@ -761,7 +761,8 @@ $ curl \ }, "CreateIndex": 9, "ModifyIndex": 13, - "CreateTime": 1495755675944527600 + "CreateTime": 1495755675944527600, + "ModifyTime": 1495755675944527600 } ] ``` diff --git a/website/source/api/nodes.html.md b/website/source/api/nodes.html.md index 9732127ee..0d7ff6c78 100644 --- a/website/source/api/nodes.html.md +++ b/website/source/api/nodes.html.md @@ -504,7 +504,8 @@ $ curl \ "CreateIndex": 15052, "ModifyIndex": 15057, "AllocModifyIndex": 15052, - "CreateTime": 1502140975600438500 + "CreateTime": 1502140975600438500, + "ModifyTime": 1502140975600438500 }, ... ] diff --git a/website/source/docs/agent/configuration/tls.html.md b/website/source/docs/agent/configuration/tls.html.md index 5534eef7a..b816f7f21 100644 --- a/website/source/docs/agent/configuration/tls.html.md +++ b/website/source/docs/agent/configuration/tls.html.md @@ -54,6 +54,10 @@ the [Agent's Gossip and RPC Encryption](/docs/agent/encryption.html). a Nomad client makes the client use TLS for making RPC requests to the Nomad servers. +- `rpc_upgrade_mode` `(bool: false)` - This option should be used only when the + cluster is being upgraded to TLS, and removed after the migration is + complete. This allows the agent to accept both TLS and plaintext traffic. + - `verify_https_client` `(bool: false)` - Specifies agents should require client certificates for all incoming HTTPS requests. The client certificates must be signed by the same CA as Nomad. diff --git a/website/source/docs/commands/alloc-status.html.md.erb b/website/source/docs/commands/alloc-status.html.md.erb index ce61f963d..36bf422cb 100644 --- a/website/source/docs/commands/alloc-status.html.md.erb +++ b/website/source/docs/commands/alloc-status.html.md.erb @@ -11,7 +11,8 @@ description: > The `alloc-status` command displays status information and metadata about an existing allocation and its tasks. It can be useful while debugging to reveal the underlying reasons for scheduling decisions or failures, as well as the -current state of its tasks. +current state of its tasks. As of Nomad 0.7.1, alloc status also shows allocation +modification time in addition to create time. ## Usage @@ -74,7 +75,8 @@ Client Status = running Client Description = Desired Status = run Desired Description = -Created At = 07/25/17 16:12:48 UTC +Created = 5m ago +Modified = 5m ago Deployment ID = 0c83a3b1 Deployment Health = healthy @@ -127,7 +129,8 @@ Client Status = running Client Description = Desired Status = run Desired Description = -Created At = 07/25/17 16:12:48 UTC +Created = 07/25/17 16:12:48 UTC +Modified = 07/25/17 16:12:48 UTC Deployment ID = 0c83a3b1-8a7b-136b-0e11-8383dc6c9276 Deployment Health = healthy Evaluated Nodes = 1 diff --git a/website/source/docs/commands/job/status.html.md.erb b/website/source/docs/commands/job/status.html.md.erb index ff67b3bd6..5ff67823c 100644 --- a/website/source/docs/commands/job/status.html.md.erb +++ b/website/source/docs/commands/job/status.html.md.erb @@ -22,7 +22,9 @@ the specific job is queried and displayed. Otherwise, a list of matching jobs an information will be displayed. If the ID is omitted, the command lists out all of the existing jobs and a few of -the most useful status fields for each. +the most useful status fields for each. As of Nomad 0.7.1, alloc status also shows allocation +modification time in addition to create time. When the `-verbose` flag is not set, allocation +creation and modify times are shown in a shortened relative time format like `5m ago`. ## General Options @@ -38,7 +40,7 @@ the most useful status fields for each. * `-short`: Display short output. Used only when a single node is being queried. Drops verbose node allocation data from the output. -* `-verbose`: Show full information. +* `-verbose`: Show full information. Allocation create and modify times are shown in `yyyy/mm/dd hh:mm:ss` format. ## Examples @@ -95,8 +97,8 @@ Task Group Desired Placed Healthy Unhealthy cache 1 1 1 0 Allocations -ID Node ID Task Group Version Desired Status Created At -478ce836 5ed166e8 cache 0 run running 07/25/17 15:53:04 UTC +ID Node ID Task Group Version Desired Status Created Modified +478ce836 5ed166e8 cache 0 run running 5m ago 5m ago ``` Full status information of a perodic job: @@ -187,11 +189,11 @@ Task Group Desired Placed Healthy Unhealthy cache 5 4 4 0 Allocations -ID Node ID Task Group Version Desired Status Created At -048c1e9e 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC -250f9dec 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC -2eb772a1 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC -a17b7d3d 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC +ID Node ID Task Group Version Desired Status Created Modified +048c1e9e 3f38ecb4 cache 0 run running 5m ago 5m ago +250f9dec 3f38ecb4 cache 0 run running 5m ago 5m ago +2eb772a1 3f38ecb4 cache 0 run running 5m ago 5m ago +a17b7d3d 3f38ecb4 cache 0 run running 5m ago 5m ago ``` Full status information showing evaluations with a placement failure. The in @@ -240,9 +242,9 @@ Task Group Desired Placed Healthy Unhealthy cache 5 4 4 0 Allocations -ID Node ID Task Group Version Desired Status Created At -048c1e9e 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC -250f9dec 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC -2eb772a1 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC -a17b7d3d 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC +ID Node ID Task Group Version Desired Status Created Modified +048c1e9e 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC 07/25/17 15:55:27 UTC +250f9dec 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC 07/25/17 15:55:27 UTC +2eb772a1 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC 07/25/17 15:55:27 UTC +a17b7d3d 3f38ecb4 cache 0 run running 07/25/17 15:55:27 UTC 07/25/17 15:55:27 UTC ``` diff --git a/website/source/intro/getting-started/jobs.html.md b/website/source/intro/getting-started/jobs.html.md index 26287586a..c64092bfb 100644 --- a/website/source/intro/getting-started/jobs.html.md +++ b/website/source/intro/getting-started/jobs.html.md @@ -79,8 +79,8 @@ Task Group Desired Placed Healthy Unhealthy cache 1 1 1 0 Allocations -ID Node ID Task Group Version Desired Status Created At -883269bf e42d6f19 cache 0 run running 10/31/17 22:58:40 UTC +ID Node ID Task Group Version Desired Status Created Modified +8ba85cef 171a583b cache 0 run running 5m ago 5m ago ``` Here we can see that the result of our evaluation was the creation of an @@ -101,8 +101,9 @@ Client Status = running Client Description = Desired Status = run Desired Description = -Created At = 10/31/17 22:58:40 UTC -Deployment ID = b0a84e74 +Created = 5m ago +Modified = 5m ago +Deployment ID = fa882a5b Deployment Health = healthy Task "redis" is "running" @@ -326,13 +327,10 @@ Task Group Desired Placed Healthy Unhealthy cache 3 3 3 0 Allocations -ID Node ID Task Group Version Desired Status Created At -7dce5722 e42d6f19 cache 2 stop complete 11/01/17 17:31:16 UTC -8cfab5f4 e42d6f19 cache 2 stop complete 11/01/17 17:31:02 UTC -27bd4a41 e42d6f19 cache 2 stop complete 11/01/17 17:30:40 UTC -3249e320 e42d6f19 cache 1 stop complete 11/01/17 17:28:28 UTC -453b210f e42d6f19 cache 1 stop complete 11/01/17 17:28:28 UTC -883269bf e42d6f19 cache 1 stop complete 10/31/17 22:58:40 UTC +ID Node ID Task Group Version Desired Status Created Modified +8ace140d 2cfe061e cache 2 stop complete 5m ago 5m ago +8af5330a 2cfe061e cache 2 stop complete 6m ago 6m ago +df50c3ae 2cfe061e cache 2 stop complete 6m ago 6m ago ``` If we wanted to start the job again, we could simply `run` it again.