mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 02:45:42 +03:00
Move restart tracker creation into task runner
This commit is contained in:
@@ -40,12 +40,11 @@ type AllocRunner struct {
|
||||
|
||||
dirtyCh chan struct{}
|
||||
|
||||
ctx *driver.ExecContext
|
||||
tasks map[string]*TaskRunner
|
||||
taskStates map[string]*structs.TaskState
|
||||
restored map[string]struct{}
|
||||
RestartPolicy *structs.RestartPolicy
|
||||
taskLock sync.RWMutex
|
||||
ctx *driver.ExecContext
|
||||
tasks map[string]*TaskRunner
|
||||
taskStates map[string]*structs.TaskState
|
||||
restored map[string]struct{}
|
||||
taskLock sync.RWMutex
|
||||
|
||||
taskStatusLock sync.RWMutex
|
||||
|
||||
@@ -62,7 +61,6 @@ type allocRunnerState struct {
|
||||
Alloc *structs.Allocation
|
||||
AllocClientStatus string
|
||||
AllocClientDescription string
|
||||
RestartPolicy *structs.RestartPolicy
|
||||
TaskStates map[string]*structs.TaskState
|
||||
Context *driver.ExecContext
|
||||
}
|
||||
@@ -102,7 +100,6 @@ func (r *AllocRunner) RestoreState() error {
|
||||
|
||||
// Restore fields
|
||||
r.alloc = snap.Alloc
|
||||
r.RestartPolicy = snap.RestartPolicy
|
||||
r.ctx = snap.Context
|
||||
r.allocClientStatus = snap.AllocClientStatus
|
||||
r.allocClientDescription = snap.AllocClientDescription
|
||||
@@ -115,9 +112,8 @@ func (r *AllocRunner) RestoreState() error {
|
||||
r.restored[name] = struct{}{}
|
||||
|
||||
task := &structs.Task{Name: name}
|
||||
restartTracker := newRestartTracker(r.RestartPolicy, r.alloc.Job.Type)
|
||||
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
|
||||
r.alloc, task, restartTracker, r.consulService)
|
||||
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
|
||||
task, r.consulService)
|
||||
r.tasks[name] = tr
|
||||
|
||||
// Skip tasks in terminal states.
|
||||
@@ -163,7 +159,6 @@ func (r *AllocRunner) saveAllocRunnerState() error {
|
||||
defer r.allocLock.Unlock()
|
||||
snap := allocRunnerState{
|
||||
Alloc: r.alloc,
|
||||
RestartPolicy: r.RestartPolicy,
|
||||
Context: r.ctx,
|
||||
AllocClientStatus: r.allocClientStatus,
|
||||
AllocClientDescription: r.allocClientDescription,
|
||||
@@ -347,9 +342,6 @@ func (r *AllocRunner) Run() {
|
||||
return
|
||||
}
|
||||
|
||||
// Extract the RestartPolicy from the TG and set it on the alloc
|
||||
r.RestartPolicy = tg.RestartPolicy
|
||||
|
||||
// Create the execution context
|
||||
if r.ctx == nil {
|
||||
allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID))
|
||||
@@ -370,9 +362,8 @@ func (r *AllocRunner) Run() {
|
||||
|
||||
// Merge in the task resources
|
||||
task.Resources = alloc.TaskResources[task.Name]
|
||||
restartTracker := newRestartTracker(r.RestartPolicy, r.alloc.Job.Type)
|
||||
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
|
||||
r.alloc, task, restartTracker, r.consulService)
|
||||
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
|
||||
task, r.consulService)
|
||||
r.tasks[task.Name] = tr
|
||||
go tr.Run()
|
||||
}
|
||||
|
||||
@@ -52,7 +52,15 @@ type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent)
|
||||
func NewTaskRunner(logger *log.Logger, config *config.Config,
|
||||
updater TaskStateUpdater, ctx *driver.ExecContext,
|
||||
alloc *structs.Allocation, task *structs.Task,
|
||||
restartTracker *RestartTracker, consulService *ConsulService) *TaskRunner {
|
||||
consulService *ConsulService) *TaskRunner {
|
||||
|
||||
// Build the restart tracker.
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
if tg == nil {
|
||||
logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
|
||||
return nil
|
||||
}
|
||||
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)
|
||||
|
||||
tc := &TaskRunner{
|
||||
config: config,
|
||||
|
||||
@@ -48,13 +48,10 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) {
|
||||
allocDir.Build([]*structs.Task{task})
|
||||
|
||||
ctx := driver.NewExecContext(allocDir, alloc.ID)
|
||||
rp := structs.NewRestartPolicy(structs.JobTypeService)
|
||||
restartTracker := newRestartTracker(rp, alloc.Job.Type)
|
||||
tr := NewTaskRunner(logger, conf, upd.Update, ctx, mock.Alloc(), task, consulClient)
|
||||
if !restarts {
|
||||
restartTracker = noRestartsTracker()
|
||||
tr.restartTracker = noRestartsTracker()
|
||||
}
|
||||
|
||||
tr := NewTaskRunner(logger, conf, upd.Update, ctx, mock.Alloc(), task, restartTracker, consulClient)
|
||||
return upd, tr
|
||||
}
|
||||
|
||||
@@ -172,8 +169,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) {
|
||||
// Create a new task runner
|
||||
consulClient, _ := NewConsulService(&consulServiceConfig{tr.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
|
||||
tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update,
|
||||
tr.ctx, tr.alloc, &structs.Task{Name: tr.task.Name}, tr.restartTracker,
|
||||
consulClient)
|
||||
tr.ctx, tr.alloc, &structs.Task{Name: tr.task.Name}, consulClient)
|
||||
if err := tr2.RestoreState(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user