diff --git a/nomad/drain.go b/nomad/drain.go index a1dc99972..01732db14 100644 --- a/nomad/drain.go +++ b/nomad/drain.go @@ -13,6 +13,12 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +// jobKey is a tuple of namespace+jobid for use as a map key by job +type jobKey struct { + ns string + jobid string +} + // drainingJob contains the Job and allocations for that job meant to be used // when collecting all allocations for a job with at least one allocation on a // draining node. @@ -48,19 +54,14 @@ func makeTaskGroupKey(a *structs.Allocation) string { // stopAllocs tracks allocs to drain by a unique TG key type stopAllocs struct { - perTaskGroup map[string]int - allocBatch []*structs.Allocation + allocBatch []*structs.Allocation // namespace+jobid -> Job - jobBatch map[string]*structs.Job + jobBatch map[jobKey]*structs.Job } //FIXME this method does an awful lot func (s *stopAllocs) add(j *structs.Job, a *structs.Allocation) { - // Increment the counter for how many allocs in this task group are being stopped - tgKey := makeTaskGroupKey(a) - s.perTaskGroup[tgKey]++ - // Update the allocation a.ModifyTime = time.Now().UnixNano() a.DesiredStatus = structs.AllocDesiredStatusStop @@ -69,8 +70,7 @@ func (s *stopAllocs) add(j *structs.Job, a *structs.Allocation) { s.allocBatch = append(s.allocBatch, a) // Add job to the job batch - jobKey := strings.Join([]string{j.Namespace, j.ID}, "-") - s.jobBatch[jobKey] = j + s.jobBatch[jobKey{a.Namespace, a.JobID}] = j } // startNodeDrainer should be called in establishLeadership by the leader. @@ -87,7 +87,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { } }() - nodes, nodesIndex, drainingAllocs, allocsIndex := initDrainer(s.logger, state) + nodes, nodesIndex, drainingJobs, allocsIndex := initDrainer(s.logger, state) // Wait for a node's drain deadline to expire var nextDeadline time.Time @@ -108,8 +108,9 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { go nodeWatcher.run(ctx) // Watch for drained allocations to be replaced - prevAllocs := newPrevAllocWatcher(s.logger, drainingAllocs, allocsIndex, state) - go prevAllocs.run(ctx) + // Watch for changes in allocs for jobs with allocs on draining nodes + jobWatcher := newJobWatcher(s.logger, drainingJobs, allocsIndex, state) + go jobWatcher.run(ctx) for { //TODO this method of async node updates means we could make @@ -117,16 +118,43 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { //possible outcome of this is that an allocation could be //stopped on a node that recently had its drain cancelled which //doesn't seem like that bad of a pathological case + s.logger.Printf("[TRACE] nomad.drain: LOOP next deadline: %s (%s)", nextDeadline, time.Until(nextDeadline)) select { case nodes = <-nodeWatcher.nodesCh: // update draining nodes - //TODO remove allocs from draining list with node ids not in this map s.logger.Printf("[TRACE] nomad.drain: running due to node change (%d nodes draining)", len(nodes)) - case drainedID := <-prevAllocs.allocsCh: - // drained alloc has been replaced - //TODO instead of modifying a view of draining allocs here created a shared map like prevallocs - delete(drainingAllocs, drainedID) - s.logger.Printf("[TRACE] nomad.drain: running due to alloc change (%s replaced)", drainedID) + + // update deadline timer + changed := false + for _, n := range nodes { + if nextDeadline.IsZero() { + nextDeadline = n.DrainStrategy.DeadlineTime() + changed = true + continue + } + + if deadline := n.DrainStrategy.DeadlineTime(); deadline.Before(nextDeadline) { + nextDeadline = deadline + changed = true + } + } + + // if changed reset the timer + if changed { + s.logger.Printf("[TRACE] nomad.drain: new node deadline: %s", nextDeadline) + if !deadlineTimer.Stop() { + // timer may have been recv'd in a + // previous loop, so don't block + select { + case <-deadlineTimer.C: + default: + } + } + deadlineTimer.Reset(time.Until(nextDeadline)) + } + + case jobs := <-jobWatcher.WaitCh(): + s.logger.Printf("[TRACE] nomad.drain: running due to alloc change (%d jobs updated)", len(jobs)) case when := <-deadlineTimer.C: // deadline for a node was reached s.logger.Printf("[TRACE] nomad.drain: running due to deadline reached (at %s)", when) @@ -148,7 +176,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { } now := time.Now() // for determing deadlines in a consistent way - // namespace -> job id -> {job, allocs} + // job key -> {job, allocs} // Collect all allocs for all jobs with at least one // alloc on a draining node. // Invariants: @@ -156,7 +184,15 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { // - No batch jobs unless their node's deadline is reached // - No entries with 0 allocs //TODO could this be a helper method on prevAllocWatcher - drainable := map[string]map[string]*drainingJob{} + drainable := map[jobKey]*drainingJob{} + + // track jobs we've looked up before and know we shouldn't + // consider for draining eg system jobs + skipJob := map[jobKey]struct{}{} + + // track number of "up" allocs per task group (not terminal and + // have a deployment status) + upPerTG := map[string]int{} // Collect all drainable jobs for nodeID, node := range nodes { @@ -169,13 +205,15 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { // track number of allocs left on this node to be drained allocsLeft := false for _, alloc := range allocs { - if _, ok := drainable[alloc.Namespace]; !ok { - // namespace does not exist - drainable[alloc.Namespace] = make(map[string]*drainingJob) + jobkey := jobKey{alloc.Namespace, alloc.JobID} + + if _, ok := drainable[jobkey]; ok { + // already found + continue } - if _, ok := drainable[alloc.Namespace][alloc.JobID]; ok { - // already found + if _, ok := skipJob[jobkey]; ok { + // already looked up and skipped continue } @@ -185,21 +223,27 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { //FIXME panic(err) } - //TODO check for job == nil? // Don't bother collecting system jobs if job.Type == structs.JobTypeSystem { + skipJob[jobkey] = struct{}{} + s.logger.Printf("[TRACE] nomad.drain: skipping system job %s", job.Name) continue } - // If a drainable alloc isn't yet stopping this - // node has allocs left to be drained + // If alloc isn't yet terminal this node has + // allocs left to be drained if !alloc.TerminalStatus() { - allocsLeft = true + if !allocsLeft { + s.logger.Printf("[TRACE] nomad.drain: node %s has allocs left to drain", nodeID[:6]) + allocsLeft = true + } } // Don't bother collecting batch jobs for nodes that haven't hit their deadline if job.Type == structs.JobTypeBatch && node.DrainStrategy.DeadlineTime().After(now) { + s.logger.Printf("[TRACE] nomad.drain: not draining batch job %s because deadline isn't for %s", job.Name, node.DrainStrategy.DeadlineTime().Sub(now)) + skipJob[jobkey] = struct{}{} continue } @@ -209,100 +253,109 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { panic(err) } - drainable[alloc.Namespace][alloc.JobID] = &drainingJob{ + // Count the number of down (terminal or nil deployment status) per task group + if job.Type == structs.JobTypeService { + n := 0 + for _, a := range jobAllocs { + if !a.TerminalStatus() && a.DeploymentStatus != nil { + upPerTG[makeTaskGroupKey(a)]++ + n++ + } + } + s.logger.Printf("[TRACE] nomad.drain: job %s has %d task groups running", job.Name, n) + } + + drainable[jobkey] = &drainingJob{ job: job, allocs: jobAllocs, } + + jobWatcher.watch(jobkey, nodeID) } // if node has no allocs, it's done draining! if !allocsLeft { + s.logger.Printf("[TRACE] nomad.drain: node %s has no more allocs left to drain", nodeID) + jobWatcher.nodeDone(nodeID) delete(nodes, nodeID) doneNodes[nodeID] = node } } - // Initialize stoplist with a count of allocs already draining per task group - //TODO wrap this up in a new func + // stoplist are the allocations to stop and their jobs to emit + // evaluations for stoplist := &stopAllocs{ - perTaskGroup: make(map[string]int, len(drainingAllocs)), - allocBatch: make([]*structs.Allocation, len(drainingAllocs)), - jobBatch: make(map[string]*structs.Job), - } - // initialize perTaskGroup to be the number of total *currently draining* allocations per task group - for _, a := range drainingAllocs { - stoplist.perTaskGroup[a.tgKey]++ + allocBatch: make([]*structs.Allocation, 0, len(drainable)), + jobBatch: make(map[jobKey]*structs.Job), } // deadlineNodes is a map of node IDs that have reached their // deadline and allocs that will be stopped due to deadline deadlineNodes := map[string]int{} - //TODO build drain list considering deadline & max_parallel - for _, drainingJobs := range drainable { - for _, drainingJob := range drainingJobs { - for _, alloc := range drainingJob.allocs { - // Already draining/dead allocs don't need to be drained - if alloc.TerminalStatus() { - continue - } + // build drain list considering deadline & max_parallel + for _, drainingJob := range drainable { + for _, alloc := range drainingJob.allocs { + // Already draining/dead allocs don't need to be drained + if alloc.TerminalStatus() { + continue + } - node, ok := nodes[alloc.NodeID] - if !ok { - // Alloc's node is not draining so not elligible for draining! - continue - } + node, ok := nodes[alloc.NodeID] + if !ok { + // Alloc's node is not draining so not elligible for draining! + continue + } - if node.DrainStrategy.DeadlineTime().Before(now) { - s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to node's drain deadline", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6]) - // Alloc's Node has reached its deadline - stoplist.add(drainingJob.job, alloc) + tgKey := makeTaskGroupKey(alloc) - deadlineNodes[node.ID]++ + if node.DrainStrategy.DeadlineTime().Before(now) { + s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to node's drain deadline", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6]) + // Alloc's Node has reached its deadline + stoplist.add(drainingJob.job, alloc) + upPerTG[tgKey]-- - //FIXME purge from watchlist? - continue - } + deadlineNodes[node.ID]++ + continue + } - // Batch jobs are only stopped when the node - // deadline is reached which has already been - // done. - if drainingJob.job.Type == structs.JobTypeBatch { - continue - } + // Batch jobs are only stopped when the node + // deadline is reached which has already been + // done. + if drainingJob.job.Type == structs.JobTypeBatch { + continue + } - // Stop allocs with count=1, max_parallel==0, or draining how many allocs are - // already draining for this task - // group, drain and track this alloc - tgKey := makeTaskGroupKey(alloc) + s.logger.Printf("[TRACE] nomad.drain: considering job %s alloc %s count %d maxp %d up %d", + drainingJob.job.Name, alloc.ID[:6], tg.Count, tg.Migrate.MaxParallel, upPerTG[tgKey]) - //FIXME change this to be based off of the sum(deploymentstatus!=nil && clientstatus==running) for this task group - if tg.Migrate.MaxParallel > stoplist.perTaskGroup[tgKey] { - s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to max parallel", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6]) - // More migrations are allowed, add to stoplist - stoplist.add(drainingJob.job, alloc) + // Count - MaxParalell = minimum number of allocations that must be "up" + minUp := (tg.Count - tg.Migrate.MaxParallel) - // Also add to prevAllocWatcher - prevAllocs.watch(alloc.ID) - } + // If minimum is < the current number up it is safe to stop one. + if minUp < upPerTG[tgKey] { + s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to max parallel", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6]) + // More migrations are allowed, add to stoplist + stoplist.add(drainingJob.job, alloc) + upPerTG[tgKey]-- } } } @@ -310,6 +363,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { // log drains due to node deadlines for nodeID, remaining := range deadlineNodes { s.logger.Printf("[DEBUG] nomad.drain: node %s drain deadline reached; stopping %d remaining allocs", nodeID, remaining) + jobWatcher.nodeDone(nodeID) } if len(stoplist.allocBatch) > 0 { @@ -425,17 +479,16 @@ func (n *nodeWatcher) run(ctx context.Context) { newNodes := resp.([]*structs.Node) n.logger.Printf("[TRACE] nomad.drain: %d nodes to consider", len(newNodes)) //FIXME remove for _, newNode := range newNodes { - if _, ok := n.nodes[newNode.ID]; ok { - // Node was draining + if existingNode, ok := n.nodes[newNode.ID]; ok { + // Node was draining, see if it has changed if !newNode.Drain { // Node stopped draining delete(n.nodes, newNode.ID) changed = true - } else { + } else if !newNode.DrainStrategy.DeadlineTime().Equal(existingNode.DrainStrategy.DeadlineTime()) { // Update deadline n.nodes[newNode.ID] = newNode - //FIXME set changed if it changed? - //changed = true + changed = true } } else { // Node was not draining @@ -492,73 +545,78 @@ func (n *nodeWatcher) queryNodeDrain(ws memdb.WatchSet, state *state.StateStore) return resp, index, nil } -// prevAllocWatcher monitors allocation updates for allocations which replace -// draining allocations. -type prevAllocWatcher struct { - // watchList is a map of alloc ids to look for in PreviousAllocation - // fields of new allocs - watchList map[string]struct{} - watchListMu sync.Mutex +type jobWatcher struct { + // allocsIndex to start watching from + allocsIndex uint64 + + // job -> node.ID + jobs map[jobKey]string + jobsMu sync.Mutex + + jobsCh chan map[jobKey]struct{} state *state.StateStore - // allocIndex to start watching from - allocIndex uint64 - - // allocsCh is sent Allocation.IDs as they're removed from the watchList - allocsCh chan string - logger *log.Logger } -// newPrevAllocWatcher creates a new prevAllocWatcher watching drainingAllocs -// from allocIndex in the state store. Must call run to start watching. -func newPrevAllocWatcher(logger *log.Logger, drainingAllocs map[string]drainingAlloc, allocIndex uint64, - state *state.StateStore) *prevAllocWatcher { - - watchList := make(map[string]struct{}, len(drainingAllocs)) - for allocID := range drainingAllocs { - watchList[allocID] = struct{}{} - } - - return &prevAllocWatcher{ - watchList: watchList, - state: state, - allocIndex: allocIndex, - allocsCh: make(chan string, 8), //FIXME 8? really? what should this be - logger: logger, +func newJobWatcher(logger *log.Logger, jobs map[jobKey]string, allocsIndex uint64, state *state.StateStore) *jobWatcher { + return &jobWatcher{ + allocsIndex: allocsIndex, + logger: logger, + jobs: jobs, + jobsCh: make(chan map[jobKey]struct{}), + state: state, } } -// watch for an allocation ID to be replaced. -func (p *prevAllocWatcher) watch(allocID string) { - p.watchListMu.Lock() - defer p.watchListMu.Unlock() - p.watchList[allocID] = struct{}{} +func (j *jobWatcher) watch(k jobKey, nodeID string) { + j.logger.Printf("[TRACE] nomad.drain: watching job %s on draining node %s", k.jobid, nodeID[:6]) + j.jobsMu.Lock() + j.jobs[k] = nodeID + j.jobsMu.Unlock() } -// run the prevAllocWatcher and send replaced draining alloc IDs on allocsCh. -func (p *prevAllocWatcher) run(ctx context.Context) { - // index to watch from +func (j *jobWatcher) nodeDone(nodeID string) { + j.jobsMu.Lock() + defer j.jobsMu.Unlock() + for k, v := range j.jobs { + if v == nodeID { + j.logger.Printf("[TRACE] nomad.drain: UNwatching job %s on done draining node %s", k.jobid, nodeID[:6]) + delete(j.jobs, k) + } + } +} + +func (j *jobWatcher) WaitCh() <-chan map[jobKey]struct{} { + return j.jobsCh +} + +func (j *jobWatcher) run(ctx context.Context) { var resp interface{} var err error for { + //FIXME have watchAllocs create a closure and give it a copy of j.jobs to remove locking? //FIXME it seems possible for this to return a nil error and a 0 index, what to do in that case? - resp, p.allocIndex, err = p.state.BlockingQuery(p.queryPrevAlloc, p.allocIndex, ctx) + var newIndex uint64 + resp, newIndex, err = j.state.BlockingQuery(j.watchAllocs, j.allocsIndex, ctx) if err != nil { if err == context.Canceled { - p.logger.Printf("[TRACE] nomad.drain: previous allocation watcher shutting down") + j.logger.Printf("[TRACE] nomad.drain: job watcher shutting down") return } - p.logger.Printf("[ERR] nomad.drain: error blocking on alloc updates: %v", err) + j.logger.Printf("[ERR] nomad.drain: error blocking on alloc updates: %v", err) return } - allocIDs := resp.([]string) - for _, id := range allocIDs { + j.logger.Printf("[TRACE] nomad.drain: job watcher old index: %d new index: %d", j.allocsIndex, newIndex) + j.allocsIndex = newIndex + + changedJobs := resp.(map[jobKey]struct{}) + if len(changedJobs) > 0 { select { - case p.allocsCh <- id: + case j.jobsCh <- changedJobs: case <-ctx.Done(): return } @@ -566,8 +624,7 @@ func (p *prevAllocWatcher) run(ctx context.Context) { } } -// queryPrevAlloc is the BlockingQuery func for scanning for replacement allocs -func (p *prevAllocWatcher) queryPrevAlloc(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { +func (j *jobWatcher) watchAllocs(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { iter, err := state.Allocs(ws) if err != nil { return nil, 0, err @@ -578,11 +635,10 @@ func (p *prevAllocWatcher) queryPrevAlloc(ws memdb.WatchSet, state *state.StateS return nil, 0, err } - //FIXME do fine grained locking around watclist mutations? - p.watchListMu.Lock() - defer p.watchListMu.Unlock() + skipped := 0 - resp := make([]string, 0, len(p.watchList)) + // job ids + resp := map[jobKey]struct{}{} for { raw := iter.Next() @@ -591,26 +647,35 @@ func (p *prevAllocWatcher) queryPrevAlloc(ws memdb.WatchSet, state *state.StateS } alloc := raw.(*structs.Allocation) - _, ok := p.watchList[alloc.PreviousAllocation] + + j.jobsMu.Lock() + _, ok := j.jobs[jobKey{alloc.Namespace, alloc.JobID}] + j.jobsMu.Unlock() + if !ok { - // PreviousAllocation not in watchList, skip it + // alloc is not part of a draining job + skipped++ continue } - // If the migration health is set on the replacement alloc we can stop watching the drained alloc + // don't wake drain loop if alloc hasn't updated its health if alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() { - delete(p.watchList, alloc.PreviousAllocation) - resp = append(resp, alloc.PreviousAllocation) + j.logger.Printf("[TRACE] nomad.drain: job watcher found alloc %s - deployment status: %t", alloc.ID[:6], *alloc.DeploymentStatus.Healthy) + resp[jobKey{alloc.Namespace, alloc.JobID}] = struct{}{} + } else { + j.logger.Printf("[TRACE] nomad.drain: job watcher ignoring alloc %s - no deployment status", alloc.ID[:6]) } } + j.logger.Printf("[TRACE] nomad.drain: job watcher ignoring %d allocs - not part of draining job at index %d", skipped, index) + return resp, index, nil } // initDrainer initializes the node drainer state and returns a list of // draining nodes as well as allocs that are draining that should be watched // for a replacement. -func initDrainer(logger *log.Logger, state *state.StateStore) (map[string]*structs.Node, uint64, map[string]drainingAlloc, uint64) { +func initDrainer(logger *log.Logger, state *state.StateStore) (map[string]*structs.Node, uint64, map[jobKey]string, uint64) { // StateStore.Snapshot never returns an error so don't bother checking it snapshot, _ := state.Snapshot() now := time.Now() @@ -624,9 +689,8 @@ func initDrainer(logger *log.Logger, state *state.StateStore) (map[string]*struc // map of draining nodes keyed by node ID nodes := map[string]*structs.Node{} - //FIXME rollup by composite namespace+job.ID+tg key? - // List of draining allocs by namespace and job: namespace -> job.ID -> alloc.ID -> *Allocation - allocsByNS := map[string]map[string]map[string]*structs.Allocation{} + // map of draining job IDs keyed by {namespace, job id} -> node.ID + jobs := map[jobKey]string{} for { raw := iter.Next() @@ -655,88 +719,7 @@ func initDrainer(logger *log.Logger, state *state.StateStore) (map[string]*struc } for _, alloc := range allocs { - //FIXME is it safe to assume the drainer set the desired status to stop? - if alloc.DesiredStatus == structs.AllocDesiredStatusStop { - if allocsByJob, ok := allocsByNS[alloc.Namespace]; ok { - if allocs, ok := allocsByJob[alloc.JobID]; ok { - allocs[alloc.ID] = alloc - } else { - // First alloc for job - allocsByJob[alloc.JobID] = map[string]*structs.Allocation{alloc.ID: alloc} - } - } else { - // First alloc in namespace - allocsByNS[alloc.Namespace] = map[string]map[string]*structs.Allocation{ - alloc.JobID: map[string]*structs.Allocation{alloc.ID: alloc}, - } - } - } - } - } - - // drainingAllocs is the list of all allocations that are currently - // draining and waiting for a replacement - drainingAllocs := map[string]drainingAlloc{} - - for ns, allocsByJobs := range allocsByNS { - for jobID, allocs := range allocsByJobs { - for allocID, alloc := range allocs { - job, err := snapshot.JobByID(nil, ns, jobID) - if err != nil { - logger.Printf("[ERR] nomad.drain: error getting job %q for alloc %q: %v", alloc.JobID, allocID, err) - //FIXME - panic(err) - } - - // Don't track drains for stopped or gc'd jobs - if job == nil || job.Status == structs.JobStatusDead { - continue - } - - jobAllocs, err := snapshot.AllocsByJob(nil, ns, jobID, true) - if err != nil { - //FIXME - panic(err) - } - - // Remove drained allocs for replacement allocs - for _, alloc := range jobAllocs { - if alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() { - delete(allocs, alloc.PreviousAllocation) - } - } - - //FIXME why are we doing a nested loop over allocs? - // Any remaining allocs need to be tracked - for allocID, alloc := range allocs { - tg := job.LookupTaskGroup(alloc.TaskGroup) - if tg == nil { - logger.Printf("[DEBUG] nomad.drain: unable to find task group %q for alloc %q", alloc.TaskGroup, allocID) - continue - } - - if tg.Migrate == nil { - // No migrate strategy so don't track - continue - } - - //FIXME Remove this? ModifyTime is not updated as expected - - // alloc.ModifyTime + HealthyDeadline is >= the - // healthy deadline for the allocation, so we - // can stop tracking it at that time. - deadline := time.Unix(0, alloc.ModifyTime).Add(tg.Migrate.HealthyDeadline) - - if deadline.After(now) { - // deadline already reached; don't bother tracking - continue - } - - // Draining allocation hasn't been replaced or - // reached its deadline; track it! - drainingAllocs[allocID] = newDrainingAlloc(alloc, deadline) - } - } + jobs[jobKey{alloc.Namespace, alloc.JobID}] = node.ID } } @@ -748,5 +731,5 @@ func initDrainer(logger *log.Logger, state *state.StateStore) (map[string]*struc if allocsIndex == 0 { allocsIndex = 1 } - return nodes, nodesIndex, drainingAllocs, allocsIndex + return nodes, nodesIndex, jobs, allocsIndex } diff --git a/nomad/drain_test.go b/nomad/drain_test.go index bf1ec875d..e611fbdee 100644 --- a/nomad/drain_test.go +++ b/nomad/drain_test.go @@ -62,6 +62,7 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) { systemJob.TaskGroups[0].Tasks[0].Resources = structs.MinResources() systemJob.TaskGroups[0].Tasks[0].Services = nil + // Batch job will run until the node's drain deadline is reached batchJob := mock.Job() batchJob.Name = "batch-job" batchJob.Type = structs.JobTypeBatch @@ -134,6 +135,7 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) { t.Logf("%d alloc %s job %s status %s", i, alloc.ID, alloc.Job.Name, alloc.ClientStatus) } } + server.logger.Println("----------------------------------------------------------------------quitting--------------------------------------------------------") t.Fatalf("failed waiting for all allocs to start: %v", err) }) @@ -182,10 +184,10 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) { t.Logf("%d alloc %s job %s status %s prev %s", i, alloc.ID, alloc.Job.Name, alloc.ClientStatus, alloc.PreviousAllocation) } } - t.Fatalf("failed waiting for all allocs to start: %v", err) + server.logger.Println("----------------------------------------------------------------------quitting--------------------------------------------------------") + t.Errorf("failed waiting for all allocs to migrate: %v", err) }) - // Wait for all service allocs to be replaced jobs, err := rpc.JobList() require.Nil(err) t.Logf("%d jobs", len(jobs.Jobs))