Worker waitForIndex uses StateStore index, not Raft Applied Index

This commit is contained in:
Alex Dadgar
2016-06-22 09:04:22 -07:00
parent 3bbf3a56ee
commit 6df0da58f8
6 changed files with 64 additions and 34 deletions

View File

@@ -44,8 +44,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@@ -112,8 +111,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@@ -173,8 +171,7 @@ func TestCoreScheduler_EvalGC_Batch_NoAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@@ -235,8 +232,7 @@ func TestCoreScheduler_EvalGC_Batch_Allocs_WithJob(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@@ -296,8 +292,7 @@ func TestCoreScheduler_EvalGC_Batch_Allocs_NoJob(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@@ -344,7 +339,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1001)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@@ -394,8 +389,7 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@@ -444,8 +438,7 @@ func TestCoreScheduler_NodeGC_TerminalAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@@ -496,8 +489,7 @@ func TestCoreScheduler_NodeGC_RunningAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@@ -535,7 +527,7 @@ func TestCoreScheduler_NodeGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@@ -621,8 +613,7 @@ func TestCoreScheduler_JobGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
@@ -721,7 +712,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1002)
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)

View File

@@ -252,13 +252,27 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
defer jobGC.Stop()
for {
// Snapshot the current state
snap, err := s.fsm.State().Snapshot()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to snapshot state for periodic GC: %v", err)
continue
}
// Store the snapshot's index
snapshotIndex, err := snap.LatestIndex()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to determine snapshot's index for periodic GC: %v", err)
continue
}
select {
case <-evalGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC))
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC, snapshotIndex))
case <-nodeGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC))
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC, snapshotIndex))
case <-jobGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC))
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC, snapshotIndex))
case <-stopCh:
return
}
@@ -266,7 +280,7 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
}
// coreJobEval returns an evaluation for a core job
func (s *Server) coreJobEval(job string) *structs.Evaluation {
func (s *Server) coreJobEval(job string, modifyIndex uint64) *structs.Evaluation {
return &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: structs.CoreJobPriority,
@@ -274,7 +288,7 @@ func (s *Server) coreJobEval(job string) *structs.Evaluation {
TriggeredBy: structs.EvalTriggerScheduled,
JobID: job,
Status: structs.EvalStatusPending,
ModifyIndex: s.raft.AppliedIndex(),
ModifyIndex: modifyIndex,
}
}

View File

@@ -1,6 +1,8 @@
package nomad
import (
"fmt"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -16,6 +18,18 @@ func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.Gen
return err
}
s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC))
// Snapshot the current state
snap, err := s.srv.fsm.State().Snapshot()
if err != nil {
return fmt.Errorf("failed to snapshot state: %v", err)
}
// Store the snapshot's index
snapshotIndex, err := snap.LatestIndex()
if err != nil {
return fmt.Errorf("failed to determine snapshot's index: %v", err)
}
s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC, snapshotIndex))
return nil
}

View File

@@ -11,9 +11,6 @@ import (
)
func TestSystemEndpoint_GarbageCollect(t *testing.T) {
//s1 := testServer(t, func(c *Config) {
//c.NumSchedulers = 0 // Prevent automatic dequeue
//})
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
@@ -23,7 +20,7 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) {
state := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
if err := state.UpsertJob(0, job); err != nil {
if err := state.UpsertJob(1000, job); err != nil {
t.Fatalf("UpsertAllocs() failed: %v", err)
}

View File

@@ -215,9 +215,20 @@ func (w *Worker) waitForIndex(index uint64, timeout time.Duration) error {
start := time.Now()
defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start)
CHECK:
// Snapshot the current state
snap, err := w.srv.fsm.State().Snapshot()
if err != nil {
return fmt.Errorf("failed to snapshot state: %v", err)
}
// Store the snapshot's index
snapshotIndex, err := snap.LatestIndex()
if err != nil {
return fmt.Errorf("failed to determine snapshot's index: %v", err)
}
// We only need the FSM state to be as recent as the given index
appliedIndex := w.srv.raft.AppliedIndex()
if index <= appliedIndex {
if index <= snapshotIndex {
w.backoffReset()
return nil
}

View File

@@ -203,7 +203,10 @@ func TestWorker_waitForIndex(t *testing.T) {
// Cause an increment
go func() {
time.Sleep(10 * time.Millisecond)
s1.raft.Barrier(0)
n := mock.Node()
if err := s1.fsm.state.UpsertNode(index+1, n); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
}()
// Wait for a future index