Watch batch jobs

This commit is contained in:
Alex Dadgar
2018-03-29 14:30:38 -07:00
parent a998cafde8
commit 251dd090d8
6 changed files with 40 additions and 29 deletions

View File

@@ -119,8 +119,9 @@ func (n *drainingNode) RemainingAllocs() ([]*structs.Allocation, error) {
return drain, nil
}
// RunningServices returns the set of service jobs on the node.
func (n *drainingNode) RunningServices() ([]structs.NamespacedID, error) {
// DrainingJobs returns the set of jobs on the node that can block a drain.
// These include batch and service jobs.
func (n *drainingNode) DrainingJobs() ([]structs.NamespacedID, error) {
n.l.RLock()
defer n.l.RUnlock()
@@ -133,7 +134,7 @@ func (n *drainingNode) RunningServices() ([]structs.NamespacedID, error) {
jobIDs := make(map[structs.NamespacedID]struct{})
var jobs []structs.NamespacedID
for _, alloc := range allocs {
if alloc.TerminalStatus() || alloc.Job.Type != structs.JobTypeService {
if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
continue
}

View File

@@ -46,9 +46,9 @@ func assertDrainingNode(t *testing.T, dn *drainingNode, isDone bool, remaining,
require.Nil(t, err)
assert.Len(t, allocs, remaining, "RemainingAllocs mismatch")
jobs, err := dn.RunningServices()
jobs, err := dn.DrainingJobs()
require.Nil(t, err)
assert.Len(t, jobs, running, "RunningServices mismatch")
assert.Len(t, jobs, running, "DrainingJobs mismatch")
}
func TestDrainingNode_Table(t *testing.T) {
@@ -70,7 +70,7 @@ func TestDrainingNode_Table(t *testing.T) {
name: "Batch",
isDone: false,
remaining: 1,
running: 0,
running: 1,
setup: func(t *testing.T, dn *drainingNode) {
alloc := mock.BatchAlloc()
alloc.NodeID = dn.node.ID
@@ -128,7 +128,7 @@ func TestDrainingNode_Table(t *testing.T) {
name: "ServiceTerminal",
isDone: false,
remaining: 2,
running: 0,
running: 1,
setup: func(t *testing.T, dn *drainingNode) {
allocs := []*structs.Allocation{mock.Alloc(), mock.BatchAlloc(), mock.SystemAlloc()}
for _, a := range allocs {
@@ -146,7 +146,7 @@ func TestDrainingNode_Table(t *testing.T) {
name: "AllTerminalButBatch",
isDone: false,
remaining: 1,
running: 0,
running: 1,
setup: func(t *testing.T, dn *drainingNode) {
allocs := []*structs.Allocation{mock.Alloc(), mock.BatchAlloc(), mock.SystemAlloc()}
for _, a := range allocs {
@@ -184,7 +184,7 @@ func TestDrainingNode_Table(t *testing.T) {
name: "HalfTerminal",
isDone: false,
remaining: 3,
running: 1,
running: 2,
setup: func(t *testing.T, dn *drainingNode) {
allocs := []*structs.Allocation{
mock.Alloc(),

View File

@@ -143,7 +143,7 @@ func (w *drainingJobWatcher) watch() {
for {
w.logger.Printf("[TRACE] nomad.drain.job_watcher: getting job allocs at index %d", waitIndex)
jobAllocs, index, err := w.getJobAllocs(w.getQueryCtx(), waitIndex)
w.logger.Printf("[TRACE] nomad.drain.job_watcher: got job allocs %d at index %d: %v", len(jobAllocs), waitIndex, err)
w.logger.Printf("[TRACE] nomad.drain.job_watcher: got allocs for %d jobs at index %d: %v", len(jobAllocs), index, err)
if err != nil {
if err == context.Canceled {
// Determine if it is a cancel or a shutdown
@@ -205,8 +205,8 @@ func (w *drainingJobWatcher) watch() {
continue
}
// Ignore all non-service jobs
if job.Type != structs.JobTypeService {
// Ignore any system jobs
if job.Type == structs.JobTypeSystem {
w.deregisterJob(job.ID, job.Namespace)
continue
}
@@ -299,11 +299,12 @@ func (r *jobResult) String() string {
// handleJob takes the state of a draining job and returns the desired actions.
func handleJob(snap *state.StateSnapshot, job *structs.Job, allocs []*structs.Allocation, lastHandledIndex uint64) (*jobResult, error) {
r := newJobResult()
batch := job.Type == structs.JobTypeBatch
taskGroups := make(map[string]*structs.TaskGroup, len(job.TaskGroups))
for _, tg := range job.TaskGroups {
if tg.Migrate != nil {
// TODO handle the upgrade path
// Only capture the groups that have a migrate strategy
// Only capture the groups that have a migrate strategy or we are just
// watching batch
if tg.Migrate != nil || batch {
taskGroups[tg.Name] = tg
}
}
@@ -320,7 +321,7 @@ func handleJob(snap *state.StateSnapshot, job *structs.Job, allocs []*structs.Al
for name, tg := range taskGroups {
allocs := tgAllocs[name]
if err := handleTaskGroup(snap, tg, allocs, lastHandledIndex, r); err != nil {
if err := handleTaskGroup(snap, batch, tg, allocs, lastHandledIndex, r); err != nil {
return nil, fmt.Errorf("drain for task group %q failed: %v", name, err)
}
}
@@ -328,8 +329,11 @@ func handleJob(snap *state.StateSnapshot, job *structs.Job, allocs []*structs.Al
return r, nil
}
// handleTaskGroup takes the state of a draining task group and computes the desired actions.
func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup,
// handleTaskGroup takes the state of a draining task group and computes the
// desired actions. For batch jobs we only notify when they have been migrated
// and never mark them for drain. Batch jobs are allowed to complete up until
// the deadline, after which they are force killed.
func handleTaskGroup(snap *state.StateSnapshot, batch bool, tg *structs.TaskGroup,
allocs []*structs.Allocation, lastHandledIndex uint64, result *jobResult) error {
// Determine how many allocations can be drained
@@ -363,9 +367,10 @@ func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup,
continue
}
// If the alloc is running and has its deployment status set, it is
// considered healthy from a migration standpoint.
if !alloc.TerminalStatus() &&
// If the service alloc is running and has its deployment status set, it
// is considered healthy from a migration standpoint.
if !batch &&
!alloc.TerminalStatus() &&
alloc.DeploymentStatus != nil &&
alloc.DeploymentStatus.Healthy != nil {
healthy++
@@ -384,7 +389,7 @@ func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup,
// If we haven't marked this allocation for migration already, capture
// it as eligible for draining.
if !alloc.DesiredTransition.ShouldMigrate() {
if !batch && !alloc.DesiredTransition.ShouldMigrate() {
drainable = append(drainable, alloc)
}
}
@@ -394,6 +399,11 @@ func handleTaskGroup(snap *state.StateSnapshot, tg *structs.TaskGroup,
result.done = false
}
// We don't mark batch for drain so exit
if batch {
return nil
}
// Determine how many we can drain
thresholdCount := tg.Count - tg.Migrate.MaxParallel
numToDrain := healthy - thresholdCount

View File

@@ -554,7 +554,7 @@ func testHandleTaskGroup(t *testing.T, tc handleTaskGroupTestCase) {
require.Nil(err)
res := newJobResult()
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 102, res))
require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 102, res))
assert.Lenf(res.drain, tc.ExpectedDrained, "Drain expected %d but found: %d",
tc.ExpectedDrained, len(res.drain))
assert.Lenf(res.migrated, tc.ExpectedMigrated, "Migrate expected %d but found: %d",
@@ -605,13 +605,13 @@ func TestHandleTaskGroup_Migrations(t *testing.T) {
// Handle before and after indexes
res := newJobResult()
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 101, res))
require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 101, res))
require.Empty(res.drain)
require.Len(res.migrated, 10)
require.True(res.done)
res = newJobResult()
require.Nil(handleTaskGroup(snap, job.TaskGroups[0], allocs, 103, res))
require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 103, res))
require.Empty(res.drain)
require.Empty(res.migrated)
require.True(res.done)

View File

@@ -68,12 +68,12 @@ func (n *NodeDrainer) Update(node *structs.Node) {
// TODO Test this
// Register interest in the draining jobs.
jobs, err := draining.RunningServices()
jobs, err := draining.DrainingJobs()
if err != nil {
n.logger.Printf("[ERR] nomad.drain: error retrieving services on node %q: %v", node.ID, err)
n.logger.Printf("[ERR] nomad.drain: error retrieving draining jobs on node %q: %v", node.ID, err)
return
}
n.logger.Printf("[TRACE] nomad.drain: node %q has %d services on it", node.ID, len(jobs))
n.logger.Printf("[TRACE] nomad.drain: node %q has %d draining jobs on it", node.ID, len(jobs))
n.jobWatcher.RegisterJobs(jobs)
// TODO Test at this layer as well that a node drain on a node without

View File

@@ -609,7 +609,7 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
new.ClientStatus = structs.AllocClientStatusComplete
updates = append(updates, new)
}
require.Nil(state.UpsertAllocs(1000, updates))
require.Nil(state.UpdateAllocsFromClient(1000, updates))
// Check that the node drain is removed
testutil.WaitForResult(func() (bool, error) {