drainer: switch to job based watching

This commit is contained in:
Michael Schurter
2018-02-21 17:22:06 -08:00
parent 48d637dad1
commit 7deabe958d
2 changed files with 226 additions and 241 deletions

View File

@@ -13,6 +13,12 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
// jobKey is a tuple of namespace+jobid for use as a map key by job
type jobKey struct {
ns string
jobid string
}
// drainingJob contains the Job and allocations for that job meant to be used
// when collecting all allocations for a job with at least one allocation on a
// draining node.
@@ -48,19 +54,14 @@ func makeTaskGroupKey(a *structs.Allocation) string {
// stopAllocs tracks allocs to drain by a unique TG key
type stopAllocs struct {
perTaskGroup map[string]int
allocBatch []*structs.Allocation
allocBatch []*structs.Allocation
// namespace+jobid -> Job
jobBatch map[string]*structs.Job
jobBatch map[jobKey]*structs.Job
}
//FIXME this method does an awful lot
func (s *stopAllocs) add(j *structs.Job, a *structs.Allocation) {
// Increment the counter for how many allocs in this task group are being stopped
tgKey := makeTaskGroupKey(a)
s.perTaskGroup[tgKey]++
// Update the allocation
a.ModifyTime = time.Now().UnixNano()
a.DesiredStatus = structs.AllocDesiredStatusStop
@@ -69,8 +70,7 @@ func (s *stopAllocs) add(j *structs.Job, a *structs.Allocation) {
s.allocBatch = append(s.allocBatch, a)
// Add job to the job batch
jobKey := strings.Join([]string{j.Namespace, j.ID}, "-")
s.jobBatch[jobKey] = j
s.jobBatch[jobKey{a.Namespace, a.JobID}] = j
}
// startNodeDrainer should be called in establishLeadership by the leader.
@@ -87,7 +87,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
}
}()
nodes, nodesIndex, drainingAllocs, allocsIndex := initDrainer(s.logger, state)
nodes, nodesIndex, drainingJobs, allocsIndex := initDrainer(s.logger, state)
// Wait for a node's drain deadline to expire
var nextDeadline time.Time
@@ -108,8 +108,9 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
go nodeWatcher.run(ctx)
// Watch for drained allocations to be replaced
prevAllocs := newPrevAllocWatcher(s.logger, drainingAllocs, allocsIndex, state)
go prevAllocs.run(ctx)
// Watch for changes in allocs for jobs with allocs on draining nodes
jobWatcher := newJobWatcher(s.logger, drainingJobs, allocsIndex, state)
go jobWatcher.run(ctx)
for {
//TODO this method of async node updates means we could make
@@ -117,16 +118,43 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
//possible outcome of this is that an allocation could be
//stopped on a node that recently had its drain cancelled which
//doesn't seem like that bad of a pathological case
s.logger.Printf("[TRACE] nomad.drain: LOOP next deadline: %s (%s)", nextDeadline, time.Until(nextDeadline))
select {
case nodes = <-nodeWatcher.nodesCh:
// update draining nodes
//TODO remove allocs from draining list with node ids not in this map
s.logger.Printf("[TRACE] nomad.drain: running due to node change (%d nodes draining)", len(nodes))
case drainedID := <-prevAllocs.allocsCh:
// drained alloc has been replaced
//TODO instead of modifying a view of draining allocs here created a shared map like prevallocs
delete(drainingAllocs, drainedID)
s.logger.Printf("[TRACE] nomad.drain: running due to alloc change (%s replaced)", drainedID)
// update deadline timer
changed := false
for _, n := range nodes {
if nextDeadline.IsZero() {
nextDeadline = n.DrainStrategy.DeadlineTime()
changed = true
continue
}
if deadline := n.DrainStrategy.DeadlineTime(); deadline.Before(nextDeadline) {
nextDeadline = deadline
changed = true
}
}
// if changed reset the timer
if changed {
s.logger.Printf("[TRACE] nomad.drain: new node deadline: %s", nextDeadline)
if !deadlineTimer.Stop() {
// timer may have been recv'd in a
// previous loop, so don't block
select {
case <-deadlineTimer.C:
default:
}
}
deadlineTimer.Reset(time.Until(nextDeadline))
}
case jobs := <-jobWatcher.WaitCh():
s.logger.Printf("[TRACE] nomad.drain: running due to alloc change (%d jobs updated)", len(jobs))
case when := <-deadlineTimer.C:
// deadline for a node was reached
s.logger.Printf("[TRACE] nomad.drain: running due to deadline reached (at %s)", when)
@@ -148,7 +176,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
}
now := time.Now() // for determing deadlines in a consistent way
// namespace -> job id -> {job, allocs}
// job key -> {job, allocs}
// Collect all allocs for all jobs with at least one
// alloc on a draining node.
// Invariants:
@@ -156,7 +184,15 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
// - No batch jobs unless their node's deadline is reached
// - No entries with 0 allocs
//TODO could this be a helper method on prevAllocWatcher
drainable := map[string]map[string]*drainingJob{}
drainable := map[jobKey]*drainingJob{}
// track jobs we've looked up before and know we shouldn't
// consider for draining eg system jobs
skipJob := map[jobKey]struct{}{}
// track number of "up" allocs per task group (not terminal and
// have a deployment status)
upPerTG := map[string]int{}
// Collect all drainable jobs
for nodeID, node := range nodes {
@@ -169,13 +205,15 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
// track number of allocs left on this node to be drained
allocsLeft := false
for _, alloc := range allocs {
if _, ok := drainable[alloc.Namespace]; !ok {
// namespace does not exist
drainable[alloc.Namespace] = make(map[string]*drainingJob)
jobkey := jobKey{alloc.Namespace, alloc.JobID}
if _, ok := drainable[jobkey]; ok {
// already found
continue
}
if _, ok := drainable[alloc.Namespace][alloc.JobID]; ok {
// already found
if _, ok := skipJob[jobkey]; ok {
// already looked up and skipped
continue
}
@@ -185,21 +223,27 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
//FIXME
panic(err)
}
//TODO check for job == nil?
// Don't bother collecting system jobs
if job.Type == structs.JobTypeSystem {
skipJob[jobkey] = struct{}{}
s.logger.Printf("[TRACE] nomad.drain: skipping system job %s", job.Name)
continue
}
// If a drainable alloc isn't yet stopping this
// node has allocs left to be drained
// If alloc isn't yet terminal this node has
// allocs left to be drained
if !alloc.TerminalStatus() {
allocsLeft = true
if !allocsLeft {
s.logger.Printf("[TRACE] nomad.drain: node %s has allocs left to drain", nodeID[:6])
allocsLeft = true
}
}
// Don't bother collecting batch jobs for nodes that haven't hit their deadline
if job.Type == structs.JobTypeBatch && node.DrainStrategy.DeadlineTime().After(now) {
s.logger.Printf("[TRACE] nomad.drain: not draining batch job %s because deadline isn't for %s", job.Name, node.DrainStrategy.DeadlineTime().Sub(now))
skipJob[jobkey] = struct{}{}
continue
}
@@ -209,100 +253,109 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
panic(err)
}
drainable[alloc.Namespace][alloc.JobID] = &drainingJob{
// Count the number of down (terminal or nil deployment status) per task group
if job.Type == structs.JobTypeService {
n := 0
for _, a := range jobAllocs {
if !a.TerminalStatus() && a.DeploymentStatus != nil {
upPerTG[makeTaskGroupKey(a)]++
n++
}
}
s.logger.Printf("[TRACE] nomad.drain: job %s has %d task groups running", job.Name, n)
}
drainable[jobkey] = &drainingJob{
job: job,
allocs: jobAllocs,
}
jobWatcher.watch(jobkey, nodeID)
}
// if node has no allocs, it's done draining!
if !allocsLeft {
s.logger.Printf("[TRACE] nomad.drain: node %s has no more allocs left to drain", nodeID)
jobWatcher.nodeDone(nodeID)
delete(nodes, nodeID)
doneNodes[nodeID] = node
}
}
// Initialize stoplist with a count of allocs already draining per task group
//TODO wrap this up in a new func
// stoplist are the allocations to stop and their jobs to emit
// evaluations for
stoplist := &stopAllocs{
perTaskGroup: make(map[string]int, len(drainingAllocs)),
allocBatch: make([]*structs.Allocation, len(drainingAllocs)),
jobBatch: make(map[string]*structs.Job),
}
// initialize perTaskGroup to be the number of total *currently draining* allocations per task group
for _, a := range drainingAllocs {
stoplist.perTaskGroup[a.tgKey]++
allocBatch: make([]*structs.Allocation, 0, len(drainable)),
jobBatch: make(map[jobKey]*structs.Job),
}
// deadlineNodes is a map of node IDs that have reached their
// deadline and allocs that will be stopped due to deadline
deadlineNodes := map[string]int{}
//TODO build drain list considering deadline & max_parallel
for _, drainingJobs := range drainable {
for _, drainingJob := range drainingJobs {
for _, alloc := range drainingJob.allocs {
// Already draining/dead allocs don't need to be drained
if alloc.TerminalStatus() {
continue
}
// build drain list considering deadline & max_parallel
for _, drainingJob := range drainable {
for _, alloc := range drainingJob.allocs {
// Already draining/dead allocs don't need to be drained
if alloc.TerminalStatus() {
continue
}
node, ok := nodes[alloc.NodeID]
if !ok {
// Alloc's node is not draining so not elligible for draining!
continue
}
node, ok := nodes[alloc.NodeID]
if !ok {
// Alloc's node is not draining so not elligible for draining!
continue
}
if node.DrainStrategy.DeadlineTime().Before(now) {
s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to node's drain deadline", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
// Alloc's Node has reached its deadline
stoplist.add(drainingJob.job, alloc)
tgKey := makeTaskGroupKey(alloc)
deadlineNodes[node.ID]++
if node.DrainStrategy.DeadlineTime().Before(now) {
s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to node's drain deadline", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
// Alloc's Node has reached its deadline
stoplist.add(drainingJob.job, alloc)
upPerTG[tgKey]--
//FIXME purge from watchlist?
continue
}
deadlineNodes[node.ID]++
continue
}
// Batch jobs are only stopped when the node
// deadline is reached which has already been
// done.
if drainingJob.job.Type == structs.JobTypeBatch {
continue
}
// Batch jobs are only stopped when the node
// deadline is reached which has already been
// done.
if drainingJob.job.Type == structs.JobTypeBatch {
continue
}
// Stop allocs with count=1, max_parallel==0, or draining<max_parallel
tg := drainingJob.job.LookupTaskGroup(alloc.TaskGroup)
//FIXME tg==nil here?
// Stop allocs with count=1, max_parallel==0, or draining<max_parallel
tg := drainingJob.job.LookupTaskGroup(alloc.TaskGroup)
//FIXME tg==nil here?
// Only 1, drain
if tg.Count == 1 {
s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to count=1", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
stoplist.add(drainingJob.job, alloc)
continue
}
// Only 1, drain
if tg.Count == 1 {
s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to count=1", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
stoplist.add(drainingJob.job, alloc)
continue
}
// No migrate strategy or a max parallel of 0 mean force draining
if tg.Migrate == nil || tg.Migrate.MaxParallel == 0 {
s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to force drain", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
stoplist.add(drainingJob.job, alloc)
continue
}
// No migrate strategy or a max parallel of 0 mean force draining
if tg.Migrate == nil || tg.Migrate.MaxParallel == 0 {
s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to force drain", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
stoplist.add(drainingJob.job, alloc)
continue
}
// If MaxParallel > how many allocs are
// already draining for this task
// group, drain and track this alloc
tgKey := makeTaskGroupKey(alloc)
s.logger.Printf("[TRACE] nomad.drain: considering job %s alloc %s count %d maxp %d up %d",
drainingJob.job.Name, alloc.ID[:6], tg.Count, tg.Migrate.MaxParallel, upPerTG[tgKey])
//FIXME change this to be based off of the sum(deploymentstatus!=nil && clientstatus==running) for this task group
if tg.Migrate.MaxParallel > stoplist.perTaskGroup[tgKey] {
s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to max parallel", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
// More migrations are allowed, add to stoplist
stoplist.add(drainingJob.job, alloc)
// Count - MaxParalell = minimum number of allocations that must be "up"
minUp := (tg.Count - tg.Migrate.MaxParallel)
// Also add to prevAllocWatcher
prevAllocs.watch(alloc.ID)
}
// If minimum is < the current number up it is safe to stop one.
if minUp < upPerTG[tgKey] {
s.logger.Printf("[TRACE] nomad.drain: draining job %s alloc %s from node %s due to max parallel", drainingJob.job.Name, alloc.ID[:6], alloc.NodeID[:6])
// More migrations are allowed, add to stoplist
stoplist.add(drainingJob.job, alloc)
upPerTG[tgKey]--
}
}
}
@@ -310,6 +363,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
// log drains due to node deadlines
for nodeID, remaining := range deadlineNodes {
s.logger.Printf("[DEBUG] nomad.drain: node %s drain deadline reached; stopping %d remaining allocs", nodeID, remaining)
jobWatcher.nodeDone(nodeID)
}
if len(stoplist.allocBatch) > 0 {
@@ -425,17 +479,16 @@ func (n *nodeWatcher) run(ctx context.Context) {
newNodes := resp.([]*structs.Node)
n.logger.Printf("[TRACE] nomad.drain: %d nodes to consider", len(newNodes)) //FIXME remove
for _, newNode := range newNodes {
if _, ok := n.nodes[newNode.ID]; ok {
// Node was draining
if existingNode, ok := n.nodes[newNode.ID]; ok {
// Node was draining, see if it has changed
if !newNode.Drain {
// Node stopped draining
delete(n.nodes, newNode.ID)
changed = true
} else {
} else if !newNode.DrainStrategy.DeadlineTime().Equal(existingNode.DrainStrategy.DeadlineTime()) {
// Update deadline
n.nodes[newNode.ID] = newNode
//FIXME set changed if it changed?
//changed = true
changed = true
}
} else {
// Node was not draining
@@ -492,73 +545,78 @@ func (n *nodeWatcher) queryNodeDrain(ws memdb.WatchSet, state *state.StateStore)
return resp, index, nil
}
// prevAllocWatcher monitors allocation updates for allocations which replace
// draining allocations.
type prevAllocWatcher struct {
// watchList is a map of alloc ids to look for in PreviousAllocation
// fields of new allocs
watchList map[string]struct{}
watchListMu sync.Mutex
type jobWatcher struct {
// allocsIndex to start watching from
allocsIndex uint64
// job -> node.ID
jobs map[jobKey]string
jobsMu sync.Mutex
jobsCh chan map[jobKey]struct{}
state *state.StateStore
// allocIndex to start watching from
allocIndex uint64
// allocsCh is sent Allocation.IDs as they're removed from the watchList
allocsCh chan string
logger *log.Logger
}
// newPrevAllocWatcher creates a new prevAllocWatcher watching drainingAllocs
// from allocIndex in the state store. Must call run to start watching.
func newPrevAllocWatcher(logger *log.Logger, drainingAllocs map[string]drainingAlloc, allocIndex uint64,
state *state.StateStore) *prevAllocWatcher {
watchList := make(map[string]struct{}, len(drainingAllocs))
for allocID := range drainingAllocs {
watchList[allocID] = struct{}{}
}
return &prevAllocWatcher{
watchList: watchList,
state: state,
allocIndex: allocIndex,
allocsCh: make(chan string, 8), //FIXME 8? really? what should this be
logger: logger,
func newJobWatcher(logger *log.Logger, jobs map[jobKey]string, allocsIndex uint64, state *state.StateStore) *jobWatcher {
return &jobWatcher{
allocsIndex: allocsIndex,
logger: logger,
jobs: jobs,
jobsCh: make(chan map[jobKey]struct{}),
state: state,
}
}
// watch for an allocation ID to be replaced.
func (p *prevAllocWatcher) watch(allocID string) {
p.watchListMu.Lock()
defer p.watchListMu.Unlock()
p.watchList[allocID] = struct{}{}
func (j *jobWatcher) watch(k jobKey, nodeID string) {
j.logger.Printf("[TRACE] nomad.drain: watching job %s on draining node %s", k.jobid, nodeID[:6])
j.jobsMu.Lock()
j.jobs[k] = nodeID
j.jobsMu.Unlock()
}
// run the prevAllocWatcher and send replaced draining alloc IDs on allocsCh.
func (p *prevAllocWatcher) run(ctx context.Context) {
// index to watch from
func (j *jobWatcher) nodeDone(nodeID string) {
j.jobsMu.Lock()
defer j.jobsMu.Unlock()
for k, v := range j.jobs {
if v == nodeID {
j.logger.Printf("[TRACE] nomad.drain: UNwatching job %s on done draining node %s", k.jobid, nodeID[:6])
delete(j.jobs, k)
}
}
}
func (j *jobWatcher) WaitCh() <-chan map[jobKey]struct{} {
return j.jobsCh
}
func (j *jobWatcher) run(ctx context.Context) {
var resp interface{}
var err error
for {
//FIXME have watchAllocs create a closure and give it a copy of j.jobs to remove locking?
//FIXME it seems possible for this to return a nil error and a 0 index, what to do in that case?
resp, p.allocIndex, err = p.state.BlockingQuery(p.queryPrevAlloc, p.allocIndex, ctx)
var newIndex uint64
resp, newIndex, err = j.state.BlockingQuery(j.watchAllocs, j.allocsIndex, ctx)
if err != nil {
if err == context.Canceled {
p.logger.Printf("[TRACE] nomad.drain: previous allocation watcher shutting down")
j.logger.Printf("[TRACE] nomad.drain: job watcher shutting down")
return
}
p.logger.Printf("[ERR] nomad.drain: error blocking on alloc updates: %v", err)
j.logger.Printf("[ERR] nomad.drain: error blocking on alloc updates: %v", err)
return
}
allocIDs := resp.([]string)
for _, id := range allocIDs {
j.logger.Printf("[TRACE] nomad.drain: job watcher old index: %d new index: %d", j.allocsIndex, newIndex)
j.allocsIndex = newIndex
changedJobs := resp.(map[jobKey]struct{})
if len(changedJobs) > 0 {
select {
case p.allocsCh <- id:
case j.jobsCh <- changedJobs:
case <-ctx.Done():
return
}
@@ -566,8 +624,7 @@ func (p *prevAllocWatcher) run(ctx context.Context) {
}
}
// queryPrevAlloc is the BlockingQuery func for scanning for replacement allocs
func (p *prevAllocWatcher) queryPrevAlloc(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
func (j *jobWatcher) watchAllocs(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
iter, err := state.Allocs(ws)
if err != nil {
return nil, 0, err
@@ -578,11 +635,10 @@ func (p *prevAllocWatcher) queryPrevAlloc(ws memdb.WatchSet, state *state.StateS
return nil, 0, err
}
//FIXME do fine grained locking around watclist mutations?
p.watchListMu.Lock()
defer p.watchListMu.Unlock()
skipped := 0
resp := make([]string, 0, len(p.watchList))
// job ids
resp := map[jobKey]struct{}{}
for {
raw := iter.Next()
@@ -591,26 +647,35 @@ func (p *prevAllocWatcher) queryPrevAlloc(ws memdb.WatchSet, state *state.StateS
}
alloc := raw.(*structs.Allocation)
_, ok := p.watchList[alloc.PreviousAllocation]
j.jobsMu.Lock()
_, ok := j.jobs[jobKey{alloc.Namespace, alloc.JobID}]
j.jobsMu.Unlock()
if !ok {
// PreviousAllocation not in watchList, skip it
// alloc is not part of a draining job
skipped++
continue
}
// If the migration health is set on the replacement alloc we can stop watching the drained alloc
// don't wake drain loop if alloc hasn't updated its health
if alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() {
delete(p.watchList, alloc.PreviousAllocation)
resp = append(resp, alloc.PreviousAllocation)
j.logger.Printf("[TRACE] nomad.drain: job watcher found alloc %s - deployment status: %t", alloc.ID[:6], *alloc.DeploymentStatus.Healthy)
resp[jobKey{alloc.Namespace, alloc.JobID}] = struct{}{}
} else {
j.logger.Printf("[TRACE] nomad.drain: job watcher ignoring alloc %s - no deployment status", alloc.ID[:6])
}
}
j.logger.Printf("[TRACE] nomad.drain: job watcher ignoring %d allocs - not part of draining job at index %d", skipped, index)
return resp, index, nil
}
// initDrainer initializes the node drainer state and returns a list of
// draining nodes as well as allocs that are draining that should be watched
// for a replacement.
func initDrainer(logger *log.Logger, state *state.StateStore) (map[string]*structs.Node, uint64, map[string]drainingAlloc, uint64) {
func initDrainer(logger *log.Logger, state *state.StateStore) (map[string]*structs.Node, uint64, map[jobKey]string, uint64) {
// StateStore.Snapshot never returns an error so don't bother checking it
snapshot, _ := state.Snapshot()
now := time.Now()
@@ -624,9 +689,8 @@ func initDrainer(logger *log.Logger, state *state.StateStore) (map[string]*struc
// map of draining nodes keyed by node ID
nodes := map[string]*structs.Node{}
//FIXME rollup by composite namespace+job.ID+tg key?
// List of draining allocs by namespace and job: namespace -> job.ID -> alloc.ID -> *Allocation
allocsByNS := map[string]map[string]map[string]*structs.Allocation{}
// map of draining job IDs keyed by {namespace, job id} -> node.ID
jobs := map[jobKey]string{}
for {
raw := iter.Next()
@@ -655,88 +719,7 @@ func initDrainer(logger *log.Logger, state *state.StateStore) (map[string]*struc
}
for _, alloc := range allocs {
//FIXME is it safe to assume the drainer set the desired status to stop?
if alloc.DesiredStatus == structs.AllocDesiredStatusStop {
if allocsByJob, ok := allocsByNS[alloc.Namespace]; ok {
if allocs, ok := allocsByJob[alloc.JobID]; ok {
allocs[alloc.ID] = alloc
} else {
// First alloc for job
allocsByJob[alloc.JobID] = map[string]*structs.Allocation{alloc.ID: alloc}
}
} else {
// First alloc in namespace
allocsByNS[alloc.Namespace] = map[string]map[string]*structs.Allocation{
alloc.JobID: map[string]*structs.Allocation{alloc.ID: alloc},
}
}
}
}
}
// drainingAllocs is the list of all allocations that are currently
// draining and waiting for a replacement
drainingAllocs := map[string]drainingAlloc{}
for ns, allocsByJobs := range allocsByNS {
for jobID, allocs := range allocsByJobs {
for allocID, alloc := range allocs {
job, err := snapshot.JobByID(nil, ns, jobID)
if err != nil {
logger.Printf("[ERR] nomad.drain: error getting job %q for alloc %q: %v", alloc.JobID, allocID, err)
//FIXME
panic(err)
}
// Don't track drains for stopped or gc'd jobs
if job == nil || job.Status == structs.JobStatusDead {
continue
}
jobAllocs, err := snapshot.AllocsByJob(nil, ns, jobID, true)
if err != nil {
//FIXME
panic(err)
}
// Remove drained allocs for replacement allocs
for _, alloc := range jobAllocs {
if alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() {
delete(allocs, alloc.PreviousAllocation)
}
}
//FIXME why are we doing a nested loop over allocs?
// Any remaining allocs need to be tracked
for allocID, alloc := range allocs {
tg := job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
logger.Printf("[DEBUG] nomad.drain: unable to find task group %q for alloc %q", alloc.TaskGroup, allocID)
continue
}
if tg.Migrate == nil {
// No migrate strategy so don't track
continue
}
//FIXME Remove this? ModifyTime is not updated as expected
// alloc.ModifyTime + HealthyDeadline is >= the
// healthy deadline for the allocation, so we
// can stop tracking it at that time.
deadline := time.Unix(0, alloc.ModifyTime).Add(tg.Migrate.HealthyDeadline)
if deadline.After(now) {
// deadline already reached; don't bother tracking
continue
}
// Draining allocation hasn't been replaced or
// reached its deadline; track it!
drainingAllocs[allocID] = newDrainingAlloc(alloc, deadline)
}
}
jobs[jobKey{alloc.Namespace, alloc.JobID}] = node.ID
}
}
@@ -748,5 +731,5 @@ func initDrainer(logger *log.Logger, state *state.StateStore) (map[string]*struc
if allocsIndex == 0 {
allocsIndex = 1
}
return nodes, nodesIndex, drainingAllocs, allocsIndex
return nodes, nodesIndex, jobs, allocsIndex
}

View File

@@ -62,6 +62,7 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) {
systemJob.TaskGroups[0].Tasks[0].Resources = structs.MinResources()
systemJob.TaskGroups[0].Tasks[0].Services = nil
// Batch job will run until the node's drain deadline is reached
batchJob := mock.Job()
batchJob.Name = "batch-job"
batchJob.Type = structs.JobTypeBatch
@@ -134,6 +135,7 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) {
t.Logf("%d alloc %s job %s status %s", i, alloc.ID, alloc.Job.Name, alloc.ClientStatus)
}
}
server.logger.Println("----------------------------------------------------------------------quitting--------------------------------------------------------")
t.Fatalf("failed waiting for all allocs to start: %v", err)
})
@@ -182,10 +184,10 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) {
t.Logf("%d alloc %s job %s status %s prev %s", i, alloc.ID, alloc.Job.Name, alloc.ClientStatus, alloc.PreviousAllocation)
}
}
t.Fatalf("failed waiting for all allocs to start: %v", err)
server.logger.Println("----------------------------------------------------------------------quitting--------------------------------------------------------")
t.Errorf("failed waiting for all allocs to migrate: %v", err)
})
// Wait for all service allocs to be replaced
jobs, err := rpc.JobList()
require.Nil(err)
t.Logf("%d jobs", len(jobs.Jobs))