nomad: avoid split-brain eval handling after leader transition

This commit is contained in:
Armon Dadgar
2015-08-12 15:25:31 -07:00
parent 9d0e6b40bd
commit b3b2c4d540
7 changed files with 136 additions and 73 deletions

View File

@@ -51,6 +51,7 @@ type EvalBroker struct {
// unackEval tracks an unacknowledged evaluation along with the Nack timer
type unackEval struct {
Eval *structs.Evaluation
Token string
NackTimer *time.Timer
}
@@ -164,16 +165,16 @@ func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation) error {
}
// Dequeue is used to perform a blocking dequeue
func (b *EvalBroker) Dequeue(schedulers []string, timeout time.Duration) (*structs.Evaluation, error) {
func (b *EvalBroker) Dequeue(schedulers []string, timeout time.Duration) (*structs.Evaluation, string, error) {
var timeoutTimer *time.Timer
SCAN:
// Scan for work
eval, err := b.scanForSchedulers(schedulers)
eval, token, err := b.scanForSchedulers(schedulers)
if err != nil {
if timeoutTimer != nil {
timeoutTimer.Stop()
}
return nil, err
return nil, "", err
}
// Check if we have something
@@ -181,7 +182,7 @@ SCAN:
if timeoutTimer != nil {
timeoutTimer.Stop()
}
return eval, nil
return eval, token, nil
}
// Setup the timeout channel the first time around
@@ -194,18 +195,18 @@ SCAN:
if scan {
goto SCAN
}
return nil, nil
return nil, "", nil
}
// scanForSchedulers scans for work on any of the schedulers. The highest priority work
// is dequeued first. This may return nothing if there is no work waiting.
func (b *EvalBroker) scanForSchedulers(schedulers []string) (*structs.Evaluation, error) {
func (b *EvalBroker) scanForSchedulers(schedulers []string) (*structs.Evaluation, string, error) {
b.l.Lock()
defer b.l.Unlock()
// Do nothing if not enabled
if !b.enabled {
return nil, fmt.Errorf("eval broker disabled")
return nil, "", fmt.Errorf("eval broker disabled")
}
// Scan for eligible work
@@ -241,7 +242,7 @@ func (b *EvalBroker) scanForSchedulers(schedulers []string) (*structs.Evaluation
switch n := len(eligibleSched); n {
case 0:
// No work to do!
return nil, nil
return nil, "", nil
case 1:
// Only a single task, dequeue
@@ -257,21 +258,25 @@ func (b *EvalBroker) scanForSchedulers(schedulers []string) (*structs.Evaluation
// dequeueForSched is used to dequeue the next work item for a given scheduler.
// This assumes locks are held and that this scheduler has work
func (b *EvalBroker) dequeueForSched(sched string) (*structs.Evaluation, error) {
func (b *EvalBroker) dequeueForSched(sched string) (*structs.Evaluation, string, error) {
// Get the pending queue
pending := b.ready[sched]
raw := heap.Pop(&pending)
b.ready[sched] = pending
eval := raw.(*structs.Evaluation)
// Generate a UUID for the token
token := generateUUID()
// Setup Nack timer
nackTimer := time.AfterFunc(b.nackTimeout, func() {
b.Nack(eval.ID)
b.Nack(eval.ID, token)
})
// Add to the unack queue
b.unack[eval.ID] = &unackEval{
Eval: eval,
Token: token,
NackTimer: nackTimer,
}
@@ -282,7 +287,7 @@ func (b *EvalBroker) dequeueForSched(sched string) (*structs.Evaluation, error)
bySched.Ready -= 1
bySched.Unacked += 1
return eval, nil
return eval, token, nil
}
// waitForSchedulers is used to wait for work on any of the scheduler or until a timeout.
@@ -327,15 +332,19 @@ func (b *EvalBroker) waitForSchedulers(schedulers []string, timeoutCh <-chan tim
}
// Outstanding checks if an EvalID has been delivered but not acknowledged
func (b *EvalBroker) Outstanding(evalID string) bool {
// and returns the associated token for the evaluation.
func (b *EvalBroker) Outstanding(evalID string) (string, bool) {
b.l.RLock()
defer b.l.RUnlock()
_, ok := b.unack[evalID]
return ok
unack, ok := b.unack[evalID]
if !ok {
return "", false
}
return unack.Token, true
}
// Ack is used to positively acknowledge handling an evaluation
func (b *EvalBroker) Ack(evalID string) error {
func (b *EvalBroker) Ack(evalID, token string) error {
b.l.Lock()
defer b.l.Unlock()
@@ -344,6 +353,9 @@ func (b *EvalBroker) Ack(evalID string) error {
if !ok {
return fmt.Errorf("Evaluation ID not found")
}
if unack.Token != token {
return fmt.Errorf("Token does not match for Evaluation ID")
}
jobID := unack.Eval.JobID
// Ensure we were able to stop the timer
@@ -377,7 +389,7 @@ func (b *EvalBroker) Ack(evalID string) error {
}
// Nack is used to negatively acknowledge handling an evaluation
func (b *EvalBroker) Nack(evalID string) error {
func (b *EvalBroker) Nack(evalID, token string) error {
b.l.Lock()
defer b.l.Unlock()
@@ -386,6 +398,9 @@ func (b *EvalBroker) Nack(evalID string) error {
if !ok {
return fmt.Errorf("Evaluation ID not found")
}
if unack.Token != token {
return fmt.Errorf("Token does not match for Evaluation ID")
}
// Stop the timer, doesn't matter if we've missed it
unack.NackTimer.Stop()

View File

@@ -73,7 +73,7 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
}
// Dequeue should work
out, err := b.Dequeue(defaultSched, time.Second)
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -81,9 +81,13 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
t.Fatalf("bad : %#v", out)
}
if !b.Outstanding(out.ID) {
tokenOut, ok := b.Outstanding(out.ID)
if !ok {
t.Fatalf("should be outstanding")
}
if tokenOut != token {
t.Fatalf("Bad: %#v %#v", token, tokenOut)
}
// Check the stats
stats = b.Stats()
@@ -100,13 +104,19 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
t.Fatalf("bad: %#v", stats)
}
// Nack with wrong token should fail
err = b.Nack(eval.ID, "foobarbaz")
if err == nil {
t.Fatalf("should fail to nack")
}
// Nack back into the queue
err = b.Nack(eval.ID)
err = b.Nack(eval.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
if b.Outstanding(out.ID) {
if _, ok := b.Outstanding(out.ID); ok {
t.Fatalf("should not be outstanding")
}
@@ -126,25 +136,38 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
}
// Dequeue should work again
out2, err := b.Dequeue(defaultSched, time.Second)
out2, token2, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out2 != eval {
t.Fatalf("bad : %#v", out2)
}
if token2 == token {
t.Fatalf("should get a new token")
}
if !b.Outstanding(out.ID) {
tokenOut2, ok := b.Outstanding(out.ID)
if !ok {
t.Fatalf("should be outstanding")
}
if tokenOut2 != token2 {
t.Fatalf("Bad: %#v %#v", token2, tokenOut2)
}
// Ack with wrong token
err = b.Ack(eval.ID, "zip")
if err == nil {
t.Fatalf("should fail to ack")
}
// Ack finally
err = b.Ack(eval.ID)
err = b.Ack(eval.ID, token2)
if err != nil {
t.Fatalf("err: %v", err)
}
if b.Outstanding(out.ID) {
if _, ok := b.Outstanding(out.ID); ok {
t.Fatalf("should not be outstanding")
}
@@ -199,7 +222,7 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
}
// Dequeue should work
out, err := b.Dequeue(defaultSched, time.Second)
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -220,7 +243,7 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
}
// Ack out
err = b.Ack(eval.ID)
err = b.Ack(eval.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -238,7 +261,7 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
}
// Dequeue should work
out, err = b.Dequeue(defaultSched, time.Second)
out, token, err = b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -259,7 +282,7 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
}
// Ack out
err = b.Ack(eval2.ID)
err = b.Ack(eval2.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -277,7 +300,7 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
}
// Dequeue should work
out, err = b.Dequeue(defaultSched, time.Second)
out, token, err = b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -298,7 +321,7 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
}
// Ack out
err = b.Ack(eval3.ID)
err = b.Ack(eval3.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -348,7 +371,7 @@ func TestEvalBroker_Dequeue_Timeout(t *testing.T) {
b.SetEnabled(true)
start := time.Now()
out, err := b.Dequeue(defaultSched, 5*time.Millisecond)
out, _, err := b.Dequeue(defaultSched, 5*time.Millisecond)
end := time.Now()
if err != nil {
@@ -380,17 +403,17 @@ func TestEvalBroker_Dequeue_Priority(t *testing.T) {
eval3.Priority = 20
b.Enqueue(eval3)
out1, _ := b.Dequeue(defaultSched, time.Second)
out1, _, _ := b.Dequeue(defaultSched, time.Second)
if out1 != eval2 {
t.Fatalf("bad: %#v", out1)
}
out2, _ := b.Dequeue(defaultSched, time.Second)
out2, _, _ := b.Dequeue(defaultSched, time.Second)
if out2 != eval3 {
t.Fatalf("bad: %#v", out2)
}
out3, _ := b.Dequeue(defaultSched, time.Second)
out3, _, _ := b.Dequeue(defaultSched, time.Second)
if out3 != eval1 {
t.Fatalf("bad: %#v", out3)
}
@@ -410,7 +433,7 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) {
}
for i := 0; i < NUM; i++ {
out1, _ := b.Dequeue(defaultSched, time.Second)
out1, _, _ := b.Dequeue(defaultSched, time.Second)
if out1.CreateIndex != uint64(i) {
t.Fatalf("bad: %d %#v", i, out1)
}
@@ -435,7 +458,7 @@ func TestEvalBroker_Dequeue_Fairness(t *testing.T) {
counter := 0
for i := 0; i < NUM; i++ {
out1, _ := b.Dequeue(defaultSched, time.Second)
out1, _, _ := b.Dequeue(defaultSched, time.Second)
switch out1.Type {
case structs.JobTypeService:
@@ -467,7 +490,7 @@ func TestEvalBroker_Dequeue_Blocked(t *testing.T) {
outCh := make(chan *structs.Evaluation, 1)
go func() {
start := time.Now()
out, err := b.Dequeue(defaultSched, time.Second)
out, _, err := b.Dequeue(defaultSched, time.Second)
end := time.Now()
outCh <- out
if err != nil {
@@ -512,7 +535,7 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) {
}
// Dequeue
out, err := b.Dequeue(defaultSched, time.Second)
out, _, err := b.Dequeue(defaultSched, time.Second)
start := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
@@ -522,7 +545,7 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) {
}
// Dequeue, should block on Nack timer
out, err = b.Dequeue(defaultSched, time.Second)
out, _, err = b.Dequeue(defaultSched, time.Second)
end := time.Now()
if err != nil {
t.Fatalf("err: %v", err)

View File

@@ -56,7 +56,7 @@ func (e *Eval) GetEval(args *structs.EvalSpecificRequest,
// Dequeue is used to dequeue a pending evaluation
func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
reply *structs.SingleEvalResponse) error {
reply *structs.EvalDequeueResponse) error {
if done, err := e.srv.forward("Eval.Dequeue", args, args, reply); done {
return err
}
@@ -73,7 +73,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
}
// Attempt the dequeue
eval, err := e.srv.evalBroker.Dequeue(args.Schedulers, args.Timeout)
eval, token, err := e.srv.evalBroker.Dequeue(args.Schedulers, args.Timeout)
if err != nil {
return err
}
@@ -81,6 +81,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
// Provide the output if any
if eval != nil {
reply.Eval = eval
reply.Token = token
}
// Set the query response
@@ -89,7 +90,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
}
// Ack is used to acknowledge completion of a dequeued evaluation
func (e *Eval) Ack(args *structs.EvalSpecificRequest,
func (e *Eval) Ack(args *structs.EvalAckRequest,
reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Eval.Ack", args, args, reply); done {
return err
@@ -97,14 +98,14 @@ func (e *Eval) Ack(args *structs.EvalSpecificRequest,
defer metrics.MeasureSince([]string{"nomad", "eval", "ack"}, time.Now())
// Ack the EvalID
if err := e.srv.evalBroker.Ack(args.EvalID); err != nil {
if err := e.srv.evalBroker.Ack(args.EvalID, args.Token); err != nil {
return err
}
return nil
}
// NAck is used to negative acknowledge completion of a dequeued evaluation
func (e *Eval) Nack(args *structs.EvalSpecificRequest,
func (e *Eval) Nack(args *structs.EvalAckRequest,
reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Eval.Nack", args, args, reply); done {
return err
@@ -112,7 +113,7 @@ func (e *Eval) Nack(args *structs.EvalSpecificRequest,
defer metrics.MeasureSince([]string{"nomad", "eval", "nack"}, time.Now())
// Nack the EvalID
if err := e.srv.evalBroker.Nack(args.EvalID); err != nil {
if err := e.srv.evalBroker.Nack(args.EvalID, args.Token); err != nil {
return err
}
return nil

View File

@@ -68,7 +68,7 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
Schedulers: defaultSched,
WriteRequest: structs.WriteRequest{Region: "region1"},
}
var resp structs.SingleEvalResponse
var resp structs.EvalDequeueResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
@@ -78,9 +78,13 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
}
// Ensure outstanding
if !s1.evalBroker.Outstanding(eval1.ID) {
token, ok := s1.evalBroker.Outstanding(eval1.ID)
if !ok {
t.Fatalf("should be outstanding")
}
if token != resp.Token {
t.Fatalf("bad token: %#v %#v", token, resp.Token)
}
}
func TestEvalEndpoint_Ack(t *testing.T) {
@@ -97,7 +101,7 @@ func TestEvalEndpoint_Ack(t *testing.T) {
// Create the register request
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
out, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -106,8 +110,9 @@ func TestEvalEndpoint_Ack(t *testing.T) {
}
// Ack the eval
get := &structs.EvalSpecificRequest{
get := &structs.EvalAckRequest{
EvalID: out.ID,
Token: token,
WriteRequest: structs.WriteRequest{Region: "region1"},
}
var resp structs.GenericResponse
@@ -116,7 +121,7 @@ func TestEvalEndpoint_Ack(t *testing.T) {
}
// Ensure outstanding
if s1.evalBroker.Outstanding(eval1.ID) {
if _, ok := s1.evalBroker.Outstanding(eval1.ID); ok {
t.Fatalf("should not be outstanding")
}
}
@@ -135,14 +140,15 @@ func TestEvalEndpoint_Nack(t *testing.T) {
// Create the register request
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
out, _ := s1.evalBroker.Dequeue(defaultSched, time.Second)
out, token, _ := s1.evalBroker.Dequeue(defaultSched, time.Second)
if out == nil {
t.Fatalf("missing eval")
}
// Ack the eval
get := &structs.EvalSpecificRequest{
// Nack the eval
get := &structs.EvalAckRequest{
EvalID: out.ID,
Token: token,
WriteRequest: structs.WriteRequest{Region: "region1"},
}
var resp structs.GenericResponse
@@ -151,12 +157,12 @@ func TestEvalEndpoint_Nack(t *testing.T) {
}
// Ensure outstanding
if s1.evalBroker.Outstanding(eval1.ID) {
if _, ok := s1.evalBroker.Outstanding(eval1.ID); ok {
t.Fatalf("should not be outstanding")
}
// Should get it back
out2, _ := s1.evalBroker.Dequeue(defaultSched, time.Second)
out2, _, _ := s1.evalBroker.Dequeue(defaultSched, time.Second)
if out2 != out {
t.Fatalf("nack failed")
}

View File

@@ -179,6 +179,13 @@ type EvalSpecificRequest struct {
WriteRequest
}
// EvalAckRequest is used to Ack/Nack a specific evaluation
type EvalAckRequest struct {
EvalID string
Token string
WriteRequest
}
// EvalDequeueRequest is used when we want to dequeue an evaluation
type EvalDequeueRequest struct {
Schedulers []string
@@ -251,6 +258,13 @@ type SingleEvalResponse struct {
QueryMeta
}
// EvalDequeueResponse is used to return from a dequeue
type EvalDequeueResponse struct {
Eval *Evaluation
Token string
QueryMeta
}
// PlanResponse is used to return from a PlanRequest
type PlanResponse struct {
Result *PlanResult

View File

@@ -54,37 +54,37 @@ func NewWorker(srv *Server) (*Worker, error) {
func (w *Worker) run() {
for {
// Dequeue a pending evaluation
eval, shutdown := w.dequeueEvaluation(dequeueTimeout)
eval, token, shutdown := w.dequeueEvaluation(dequeueTimeout)
if shutdown {
return
}
// Check for a shutdown
if w.srv.IsShutdown() {
w.sendAck(eval.ID, false)
w.sendAck(eval.ID, token, false)
return
}
// Wait for the the raft log to catchup to the evaluation
if err := w.waitForIndex(eval.ModifyIndex, raftSyncLimit); err != nil {
w.sendAck(eval.ID, false)
w.sendAck(eval.ID, token, false)
continue
}
// Invoke the scheduler to determine placements
if err := w.invokeScheduler(eval); err != nil {
w.sendAck(eval.ID, false)
w.sendAck(eval.ID, token, false)
continue
}
// Complete the evaluation
w.sendAck(eval.ID, true)
w.sendAck(eval.ID, token, true)
}
}
// dequeueEvaluation is used to fetch the next ready evaluation.
// This blocks until an evaluation is available or a timeout is reached.
func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, bool) {
func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, string, bool) {
// Setup the request
req := structs.EvalDequeueRequest{
Schedulers: w.srv.config.EnabledSchedulers,
@@ -93,7 +93,7 @@ func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation,
Region: w.srv.config.Region,
},
}
var resp structs.SingleEvalResponse
var resp structs.EvalDequeueResponse
REQ:
// Make a blocking RPC
@@ -103,7 +103,7 @@ REQ:
if err != nil {
w.logger.Printf("[ERR] worker: failed to dequeue evaluation: %v", err)
if w.backoffErr() {
return nil, true
return nil, "", true
}
goto REQ
}
@@ -112,23 +112,24 @@ REQ:
// Check if we got a response
if resp.Eval != nil {
w.logger.Printf("[DEBUG] worker: dequeued evaluation %s", resp.Eval.ID)
return resp.Eval, false
return resp.Eval, resp.Token, false
}
// Check for potential shutdown
if w.srv.IsShutdown() {
return nil, true
return nil, "", true
}
goto REQ
}
// sendAck makes a best effort to ack or nack the evaluation.
// Any errors are logged but swallowed.
func (w *Worker) sendAck(evalID string, ack bool) {
func (w *Worker) sendAck(evalID, token string, ack bool) {
defer metrics.MeasureSince([]string{"nomad", "worker", "send_ack"}, time.Now())
// Setup the request
req := structs.EvalSpecificRequest{
req := structs.EvalAckRequest{
EvalID: evalID,
Token: token,
WriteRequest: structs.WriteRequest{
Region: w.srv.config.Region,
},

View File

@@ -57,10 +57,13 @@ func TestWorker_dequeueEvaluation(t *testing.T) {
w := &Worker{srv: s1, logger: s1.logger}
// Attempt dequeue
eval, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
eval, token, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
if shutdown {
t.Fatalf("should not shutdown")
}
if token == "" {
t.Fatalf("should get token")
}
// Ensure we get a sane eval
if !reflect.DeepEqual(eval, eval1) {
@@ -85,7 +88,7 @@ func TestWorker_dequeueEvaluation_shutdown(t *testing.T) {
}()
// Attempt dequeue
eval, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
eval, _, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
if !shutdown {
t.Fatalf("should not shutdown")
}
@@ -112,7 +115,7 @@ func TestWorker_sendAck(t *testing.T) {
w := &Worker{srv: s1, logger: s1.logger}
// Attempt dequeue
eval, _ := w.dequeueEvaluation(10 * time.Millisecond)
eval, token, _ := w.dequeueEvaluation(10 * time.Millisecond)
// Check the depth is 0, 1 unacked
stats := s1.evalBroker.Stats()
@@ -121,7 +124,7 @@ func TestWorker_sendAck(t *testing.T) {
}
// Send the Nack
w.sendAck(eval.ID, false)
w.sendAck(eval.ID, token, false)
// Check the depth is 1, nothing unacked
stats = s1.evalBroker.Stats()
@@ -130,10 +133,10 @@ func TestWorker_sendAck(t *testing.T) {
}
// Attempt dequeue
eval, _ = w.dequeueEvaluation(10 * time.Millisecond)
eval, token, _ = w.dequeueEvaluation(10 * time.Millisecond)
// Send the Ack
w.sendAck(eval.ID, true)
w.sendAck(eval.ID, token, true)
// Check the depth is 0
stats = s1.evalBroker.Stats()