Add Leader support to client

This commit is contained in:
Alex Dadgar
2017-02-10 17:55:19 -08:00
parent a22f1647f9
commit edbc84087c
6 changed files with 106 additions and 15 deletions

View File

@@ -257,10 +257,10 @@ const (
TaskNotRestarting = "Not Restarting"
TaskDownloadingArtifacts = "Downloading Artifacts"
TaskArtifactDownloadFailed = "Failed Artifact Download"
TaskVaultRenewalFailed = "Vault token renewal failed"
TaskSiblingFailed = "Sibling task failed"
TaskSiblingFailed = "Sibling Task Failed"
TaskSignaling = "Signaling"
TaskRestartSignal = "Restart Signaled"
TaskLeaderDead = "Leader Task Dead"
)
// TaskEvent is an event that effects the state of a task and contains meta-data

View File

@@ -383,19 +383,39 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
taskState.State = state
if state == structs.TaskStateDead {
// If the task failed, we should kill all the other tasks in the task group.
if taskState.Failed {
var destroyingTasks []string
for task, tr := range r.tasks {
if task != taskName {
destroyingTasks = append(destroyingTasks, task)
tr.Destroy(structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName))
}
}
if len(destroyingTasks) > 0 {
r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, destroyingTasks)
// Find all tasks that are not the one that is dead and check if the one
// that is dead is a leader
var otherTaskRunners []*TaskRunner
var otherTaskNames []string
leader := false
for task, tr := range r.tasks {
if task != taskName {
otherTaskRunners = append(otherTaskRunners, tr)
otherTaskNames = append(otherTaskNames, task)
} else if tr.task.Leader {
leader = true
}
}
// If the task failed, we should kill all the other tasks in the task group.
if taskState.Failed {
for _, tr := range otherTaskRunners {
tr.Destroy(structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName))
}
if len(otherTaskRunners) > 0 {
r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, otherTaskNames)
}
} else if leader {
for _, tr := range otherTaskRunners {
tr.Destroy(structs.NewTaskEvent(structs.TaskLeaderDead))
}
if len(otherTaskRunners) > 0 {
r.logger.Printf("[DEBUG] client: leader task %q is dead, destroying other tasks in task group: %v", taskName, otherTaskNames)
}
}
// If the task was a leader task we should kill all the other
// tasks.
}
select {

View File

@@ -663,6 +663,70 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
})
}
func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
upd, ar := testAllocRunner(false)
// Create two tasks in the task group
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.KillTimeout = 10 * time.Millisecond
task.Config = map[string]interface{}{
"run_for": "10s",
}
task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "task 2"
task2.Driver = "mock_driver"
task2.Leader = true
task2.Config = map[string]interface{}{
"run_for": "1s",
}
ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2)
ar.alloc.TaskResources[task2.Name] = task2.Resources
go ar.Run()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
}
// Task One should be killed
state1 := last.TaskStates[task.Name]
if state1.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead)
}
if len(state1.Events) < 2 {
// At least have a received and destroyed
return false, fmt.Errorf("Unexpected number of events")
}
found := false
for _, e := range state1.Events {
if e.Type != structs.TaskLeaderDead {
found = true
}
}
if !found {
return false, fmt.Errorf("Did not find event %v", structs.TaskLeaderDead)
}
// Task Two should be dead
state2 := last.TaskStates[task2.Name]
if state2.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestAllocRunner_MoveAllocDir(t *testing.T) {
// Create an alloc runner
alloc := mock.Alloc()

View File

@@ -382,6 +382,8 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
}
case api.TaskDriverMessage:
desc = event.DriverMessage
case api.TaskLeaderDead:
desc = "Leader Task in Group dead"
}
// Reverse order so we are sorted by time

View File

@@ -2788,12 +2788,15 @@ const (
// TaskSiblingFailed indicates that a sibling task in the task group has
// failed.
TaskSiblingFailed = "Sibling task failed"
TaskSiblingFailed = "Sibling Task Failed"
// TaskDriverMessage is an informational event message emitted by
// drivers such as when they're performing a long running action like
// downloading an image.
TaskDriverMessage = "Driver"
// TaskLeaderDead indicates that the leader task within the has finished.
TaskLeaderDead = "Leader Task Dead"
)
// TaskEvent is an event that effects the state of a task and contains meta-data

View File

@@ -267,6 +267,8 @@ be specified using the `?region=` query parameter.
* `Failed Artifact Download` - Artifact(s) specified in the task failed to download.
* `Restart Signaled` - The task was signalled to be restarted.
* `Signaling` - The task was is being sent a signal.
* `Sibling task failed` - A task in the same task group failed.
* `Sibling Task Failed` - A task in the same task group failed.
* `Leader Task Dead` - The group's leader task is dead.
* `Driver` - A message from the driver.
Depending on the type the event will have applicable annotations.