mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 17:35:43 +03:00
Fix tr race by not sharing alloc/task
prestart only needs the original alloc/task so pass their pointers in. Task updates may concurrently replace the pointer on tr.
This commit is contained in:
@@ -867,10 +867,12 @@ func (r *TaskRunner) updatedTokenHandler() {
|
||||
}
|
||||
|
||||
// prestart handles life-cycle tasks that occur before the task has started.
|
||||
func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
if r.task.Vault != nil {
|
||||
// Since it's run asynchronously with the main Run() loop the alloc & task are
|
||||
// passed in to avoid racing with updates.
|
||||
func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, resultCh chan bool) {
|
||||
if task.Vault != nil {
|
||||
// Wait for the token
|
||||
r.logger.Printf("[DEBUG] client: waiting for Vault token for task %v in alloc %q", r.task.Name, r.alloc.ID)
|
||||
r.logger.Printf("[DEBUG] client: waiting for Vault token for task %v in alloc %q", task.Name, alloc.ID)
|
||||
tokenCh := r.vaultFuture.Wait()
|
||||
select {
|
||||
case <-tokenCh:
|
||||
@@ -878,16 +880,16 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
resultCh <- false
|
||||
return
|
||||
}
|
||||
r.logger.Printf("[DEBUG] client: retrieved Vault token for task %v in alloc %q", r.task.Name, r.alloc.ID)
|
||||
r.envBuilder.SetVaultToken(r.vaultFuture.Get(), r.task.Vault.Env)
|
||||
r.logger.Printf("[DEBUG] client: retrieved Vault token for task %v in alloc %q", task.Name, alloc.ID)
|
||||
r.envBuilder.SetVaultToken(r.vaultFuture.Get(), task.Vault.Env)
|
||||
}
|
||||
|
||||
// If the job is a dispatch job and there is a payload write it to disk
|
||||
requirePayload := len(r.alloc.Job.Payload) != 0 &&
|
||||
requirePayload := len(alloc.Job.Payload) != 0 &&
|
||||
(r.task.DispatchPayload != nil && r.task.DispatchPayload.File != "")
|
||||
if !r.payloadRendered && requirePayload {
|
||||
renderTo := filepath.Join(r.taskDir.LocalDir, r.task.DispatchPayload.File)
|
||||
decoded, err := snappy.Decode(nil, r.alloc.Job.Payload)
|
||||
renderTo := filepath.Join(r.taskDir.LocalDir, task.DispatchPayload.File)
|
||||
decoded, err := snappy.Decode(nil, alloc.Job.Payload)
|
||||
if err != nil {
|
||||
r.setState(
|
||||
structs.TaskStateDead,
|
||||
@@ -921,10 +923,10 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
r.persistLock.Unlock()
|
||||
|
||||
// Download the task's artifacts
|
||||
if !downloaded && len(r.task.Artifacts) > 0 {
|
||||
if !downloaded && len(task.Artifacts) > 0 {
|
||||
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts))
|
||||
taskEnv := r.envBuilder.Build()
|
||||
for _, artifact := range r.task.Artifacts {
|
||||
for _, artifact := range task.Artifacts {
|
||||
if err := getter.GetArtifact(taskEnv, artifact, r.taskDir.Dir); err != nil {
|
||||
wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err)
|
||||
r.logger.Printf("[DEBUG] client: %v", wrapped)
|
||||
@@ -941,7 +943,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
}
|
||||
|
||||
// We don't have to wait for any template
|
||||
if len(r.task.Templates) == 0 {
|
||||
if len(task.Templates) == 0 {
|
||||
// Send the start signal
|
||||
select {
|
||||
case r.startCh <- struct{}{}:
|
||||
@@ -955,12 +957,12 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
// Build the template manager
|
||||
if r.templateManager == nil {
|
||||
var err error
|
||||
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates,
|
||||
r.templateManager, err = NewTaskTemplateManager(r, task.Templates,
|
||||
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to build task's template manager: %v", err)
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
|
||||
r.logger.Printf("[ERR] client: alloc %q, task %q %v", r.alloc.ID, r.task.Name, err)
|
||||
r.logger.Printf("[ERR] client: alloc %q, task %q %v", alloc.ID, task.Name, err)
|
||||
resultCh <- false
|
||||
return
|
||||
}
|
||||
@@ -1022,7 +1024,7 @@ func (r *TaskRunner) run() {
|
||||
for {
|
||||
// Do the prestart activities
|
||||
prestartResultCh := make(chan bool, 1)
|
||||
go r.prestart(prestartResultCh)
|
||||
go r.prestart(r.alloc, r.task, prestartResultCh)
|
||||
|
||||
WAIT:
|
||||
for {
|
||||
|
||||
Reference in New Issue
Block a user