Client stores when it receives a task

This commit is contained in:
Alex Dadgar
2016-02-19 14:49:43 -08:00
parent d13262589f
commit eb76d6d623
5 changed files with 59 additions and 15 deletions

View File

@@ -19,6 +19,12 @@ const (
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 15 * time.Second
// taskPendingSyncLimit is how long the client will wait before sending that
// a task was received to the server. If the task was received transitions
// to any other state the server will receive the update. This limit is just
// then for the pathological case in which no other transistion occurs.
taskPendingSyncLimit = 30 * time.Second
)
// AllocStateUpdater is used to update the status of an allocation
@@ -45,7 +51,12 @@ type AllocRunner struct {
restored map[string]struct{}
taskLock sync.RWMutex
taskStatusLock sync.RWMutex
// taskPendingTimer is used to mitigate updates sent to the server because
// we expect that shortly after receiving an alloc it will transistion
// state. We use a timer to send the update if this hasn't happened after a
// reasonable time.
taskPendingTimer *time.Timer
taskStatusLock sync.RWMutex
updateCh chan *structs.Allocation
@@ -323,6 +334,24 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
taskState.State = state
r.appendTaskEvent(taskState, event)
// We don't immediately mark ourselves as dirty, since in most cases there
// will immediately be another state transistion. This reduces traffic to
// the server.
if state == structs.TaskStatePending {
if r.taskPendingTimer == nil {
r.taskPendingTimer = time.AfterFunc(taskPendingSyncLimit, func() {
// Send a dirty signal to sync our state.
r.dirtyCh <- struct{}{}
})
}
return
}
// Cancel any existing pending state timer.
if r.taskPendingTimer != nil {
r.taskPendingTimer.Stop()
}
select {
case r.dirtyCh <- struct{}{}:
default:

View File

@@ -95,6 +95,9 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
}
// Set the state to pending.
tc.updater(task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived))
return tc
}

View File

@@ -68,20 +68,24 @@ func TestTaskRunner_SimpleRun(t *testing.T) {
t.Fatalf("timeout")
}
if len(upd.events) != 2 {
t.Fatalf("should have 2 updates: %#v", upd.events)
if len(upd.events) != 3 {
t.Fatalf("should have 3 updates: %#v", upd.events)
}
if upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
}
if upd.events[0].Type != structs.TaskStarted {
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskStarted)
if upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
}
if upd.events[1].Type != structs.TaskTerminated {
t.Fatalf("First Event was %v; want %v", upd.events[1].Type, structs.TaskTerminated)
if upd.events[1].Type != structs.TaskStarted {
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
}
if upd.events[2].Type != structs.TaskTerminated {
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskTerminated)
}
}
@@ -107,20 +111,24 @@ func TestTaskRunner_Destroy(t *testing.T) {
t.Fatalf("timeout")
}
if len(upd.events) != 2 {
t.Fatalf("should have 2 updates: %#v", upd.events)
if len(upd.events) != 3 {
t.Fatalf("should have 3 updates: %#v", upd.events)
}
if upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
}
if upd.events[0].Type != structs.TaskStarted {
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskStarted)
if upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
}
if upd.events[1].Type != structs.TaskKilled {
t.Fatalf("First Event was %v; want %v", upd.events[1].Type, structs.TaskKilled)
if upd.events[1].Type != structs.TaskStarted {
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
}
if upd.events[2].Type != structs.TaskKilled {
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskKilled)
}
}

View File

@@ -102,7 +102,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// evaluation broker
if numWorkers := len(s.workers); numWorkers > 1 {
// Disabling half the workers frees half the CPUs.
for i := 0; i < numWorkers / 2; i++ {
for i := 0; i < numWorkers/2; i++ {
s.workers[i].SetPause(true)
}
}
@@ -366,7 +366,7 @@ func (s *Server) revokeLeadership() error {
// Unpause our worker if we paused previously
if len(s.workers) > 1 {
for i := 0; i < len(s.workers) / 2; i++ {
for i := 0; i < len(s.workers)/2; i++ {
s.workers[i].SetPause(false)
}
}

View File

@@ -1687,6 +1687,10 @@ const (
// failure in the driver.
TaskDriverFailure = "Driver Failure"
// Task Received signals that the task has been pulled by the client at the
// given timestamp.
TaskReceived = "Received"
// Task Started signals that the task was started and its timestamp can be
// used to determine the running length of the task.
TaskStarted = "Started"