diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 3e3976264..372c5abba 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -19,6 +19,12 @@ const ( // allocSyncRetryIntv is the interval on which we retry updating // the status of the allocation allocSyncRetryIntv = 15 * time.Second + + // taskPendingSyncLimit is how long the client will wait before sending that + // a task was received to the server. If the task was received transitions + // to any other state the server will receive the update. This limit is just + // then for the pathological case in which no other transistion occurs. + taskPendingSyncLimit = 30 * time.Second ) // AllocStateUpdater is used to update the status of an allocation @@ -45,7 +51,12 @@ type AllocRunner struct { restored map[string]struct{} taskLock sync.RWMutex - taskStatusLock sync.RWMutex + // taskPendingTimer is used to mitigate updates sent to the server because + // we expect that shortly after receiving an alloc it will transistion + // state. We use a timer to send the update if this hasn't happened after a + // reasonable time. + taskPendingTimer *time.Timer + taskStatusLock sync.RWMutex updateCh chan *structs.Allocation @@ -323,6 +334,24 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv taskState.State = state r.appendTaskEvent(taskState, event) + // We don't immediately mark ourselves as dirty, since in most cases there + // will immediately be another state transistion. This reduces traffic to + // the server. + if state == structs.TaskStatePending { + if r.taskPendingTimer == nil { + r.taskPendingTimer = time.AfterFunc(taskPendingSyncLimit, func() { + // Send a dirty signal to sync our state. + r.dirtyCh <- struct{}{} + }) + } + return + } + + // Cancel any existing pending state timer. + if r.taskPendingTimer != nil { + r.taskPendingTimer.Stop() + } + select { case r.dirtyCh <- struct{}{}: default: diff --git a/client/task_runner.go b/client/task_runner.go index 3d1b6f5dd..2a683ed9c 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -95,6 +95,9 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, destroyCh: make(chan struct{}), waitCh: make(chan struct{}), } + + // Set the state to pending. + tc.updater(task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived)) return tc } diff --git a/client/task_runner_test.go b/client/task_runner_test.go index c757c5893..f31348560 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -68,20 +68,24 @@ func TestTaskRunner_SimpleRun(t *testing.T) { t.Fatalf("timeout") } - if len(upd.events) != 2 { - t.Fatalf("should have 2 updates: %#v", upd.events) + if len(upd.events) != 3 { + t.Fatalf("should have 3 updates: %#v", upd.events) } if upd.state != structs.TaskStateDead { t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead) } - if upd.events[0].Type != structs.TaskStarted { - t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskStarted) + if upd.events[0].Type != structs.TaskReceived { + t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived) } - if upd.events[1].Type != structs.TaskTerminated { - t.Fatalf("First Event was %v; want %v", upd.events[1].Type, structs.TaskTerminated) + if upd.events[1].Type != structs.TaskStarted { + t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted) + } + + if upd.events[2].Type != structs.TaskTerminated { + t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskTerminated) } } @@ -107,20 +111,24 @@ func TestTaskRunner_Destroy(t *testing.T) { t.Fatalf("timeout") } - if len(upd.events) != 2 { - t.Fatalf("should have 2 updates: %#v", upd.events) + if len(upd.events) != 3 { + t.Fatalf("should have 3 updates: %#v", upd.events) } if upd.state != structs.TaskStateDead { t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead) } - if upd.events[0].Type != structs.TaskStarted { - t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskStarted) + if upd.events[0].Type != structs.TaskReceived { + t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived) } - if upd.events[1].Type != structs.TaskKilled { - t.Fatalf("First Event was %v; want %v", upd.events[1].Type, structs.TaskKilled) + if upd.events[1].Type != structs.TaskStarted { + t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted) + } + + if upd.events[2].Type != structs.TaskKilled { + t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskKilled) } } diff --git a/nomad/leader.go b/nomad/leader.go index 229a9f219..88a62802f 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -102,7 +102,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // evaluation broker if numWorkers := len(s.workers); numWorkers > 1 { // Disabling half the workers frees half the CPUs. - for i := 0; i < numWorkers / 2; i++ { + for i := 0; i < numWorkers/2; i++ { s.workers[i].SetPause(true) } } @@ -366,7 +366,7 @@ func (s *Server) revokeLeadership() error { // Unpause our worker if we paused previously if len(s.workers) > 1 { - for i := 0; i < len(s.workers) / 2; i++ { + for i := 0; i < len(s.workers)/2; i++ { s.workers[i].SetPause(false) } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e2f733e6d..93f43f57a 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1687,6 +1687,10 @@ const ( // failure in the driver. TaskDriverFailure = "Driver Failure" + // Task Received signals that the task has been pulled by the client at the + // given timestamp. + TaskReceived = "Received" + // Task Started signals that the task was started and its timestamp can be // used to determine the running length of the task. TaskStarted = "Started"