From a96fc052dd50d3ffcd9b7052accb90cdf5d73eb2 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 21 Jul 2017 16:17:42 -0700 Subject: [PATCH] Fix tr race by not sharing alloc/task prestart only needs the original alloc/task so pass their pointers in. Task updates may concurrently replace the pointer on tr. --- client/task_runner.go | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/client/task_runner.go b/client/task_runner.go index 605db589e..4e0ca6abd 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -867,10 +867,12 @@ func (r *TaskRunner) updatedTokenHandler() { } // prestart handles life-cycle tasks that occur before the task has started. -func (r *TaskRunner) prestart(resultCh chan bool) { - if r.task.Vault != nil { +// Since it's run asynchronously with the main Run() loop the alloc & task are +// passed in to avoid racing with updates. +func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, resultCh chan bool) { + if task.Vault != nil { // Wait for the token - r.logger.Printf("[DEBUG] client: waiting for Vault token for task %v in alloc %q", r.task.Name, r.alloc.ID) + r.logger.Printf("[DEBUG] client: waiting for Vault token for task %v in alloc %q", task.Name, alloc.ID) tokenCh := r.vaultFuture.Wait() select { case <-tokenCh: @@ -878,16 +880,16 @@ func (r *TaskRunner) prestart(resultCh chan bool) { resultCh <- false return } - r.logger.Printf("[DEBUG] client: retrieved Vault token for task %v in alloc %q", r.task.Name, r.alloc.ID) - r.envBuilder.SetVaultToken(r.vaultFuture.Get(), r.task.Vault.Env) + r.logger.Printf("[DEBUG] client: retrieved Vault token for task %v in alloc %q", task.Name, alloc.ID) + r.envBuilder.SetVaultToken(r.vaultFuture.Get(), task.Vault.Env) } // If the job is a dispatch job and there is a payload write it to disk - requirePayload := len(r.alloc.Job.Payload) != 0 && + requirePayload := len(alloc.Job.Payload) != 0 && (r.task.DispatchPayload != nil && r.task.DispatchPayload.File != "") if !r.payloadRendered && requirePayload { - renderTo := filepath.Join(r.taskDir.LocalDir, r.task.DispatchPayload.File) - decoded, err := snappy.Decode(nil, r.alloc.Job.Payload) + renderTo := filepath.Join(r.taskDir.LocalDir, task.DispatchPayload.File) + decoded, err := snappy.Decode(nil, alloc.Job.Payload) if err != nil { r.setState( structs.TaskStateDead, @@ -921,10 +923,10 @@ func (r *TaskRunner) prestart(resultCh chan bool) { r.persistLock.Unlock() // Download the task's artifacts - if !downloaded && len(r.task.Artifacts) > 0 { + if !downloaded && len(task.Artifacts) > 0 { r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts)) taskEnv := r.envBuilder.Build() - for _, artifact := range r.task.Artifacts { + for _, artifact := range task.Artifacts { if err := getter.GetArtifact(taskEnv, artifact, r.taskDir.Dir); err != nil { wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err) r.logger.Printf("[DEBUG] client: %v", wrapped) @@ -941,7 +943,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) { } // We don't have to wait for any template - if len(r.task.Templates) == 0 { + if len(task.Templates) == 0 { // Send the start signal select { case r.startCh <- struct{}{}: @@ -955,12 +957,12 @@ func (r *TaskRunner) prestart(resultCh chan bool) { // Build the template manager if r.templateManager == nil { var err error - r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates, + r.templateManager, err = NewTaskTemplateManager(r, task.Templates, r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder) if err != nil { err := fmt.Errorf("failed to build task's template manager: %v", err) r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) - r.logger.Printf("[ERR] client: alloc %q, task %q %v", r.alloc.ID, r.task.Name, err) + r.logger.Printf("[ERR] client: alloc %q, task %q %v", alloc.ID, task.Name, err) resultCh <- false return } @@ -1022,7 +1024,7 @@ func (r *TaskRunner) run() { for { // Do the prestart activities prestartResultCh := make(chan bool, 1) - go r.prestart(prestartResultCh) + go r.prestart(r.alloc, r.task, prestartResultCh) WAIT: for {