From 0900a1dbcc3759b8984890ed9e4e83f0deec58c8 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 2 Apr 2018 16:39:18 -0700 Subject: [PATCH 1/4] drain: fix double-close panic on drain future --- nomad/drainer/drainer.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/nomad/drainer/drainer.go b/nomad/drainer/drainer.go index db345a2ee..b7c7ddf00 100644 --- a/nomad/drainer/drainer.go +++ b/nomad/drainer/drainer.go @@ -6,6 +6,7 @@ import ( "sync" "time" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" @@ -399,11 +400,12 @@ func (n *NodeDrainer) drainAllocs(future *structs.BatchFuture, allocs []*structs var finalIndex uint64 for _, u := range partitionAllocDrain(transistions, evals) { index, err := n.raft.AllocUpdateDesiredTransition(u.Transistions, u.Evals) + var mErr multierror.Error if err != nil { - future.Respond(index, err) + mErr.Errors = append(mErr.Errors, err) } finalIndex = index } - future.Respond(finalIndex, nil) + future.Respond(finalIndex, mErr.ErrorOrNil()) } From a8e65c3fd4b68e28969a7da0dff572c9a48500d2 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 2 Apr 2018 16:40:06 -0700 Subject: [PATCH 2/4] drain: refactor batch_future into its own file aka What If structs.go Wasn't So Big? --- nomad/structs/batch_future.go | 43 ++++++++++++++++++++++++++++++ nomad/structs/batch_future_test.go | 35 ++++++++++++++++++++++++ nomad/structs/structs.go | 42 ----------------------------- nomad/structs/structs_test.go | 28 ------------------- 4 files changed, 78 insertions(+), 70 deletions(-) create mode 100644 nomad/structs/batch_future.go create mode 100644 nomad/structs/batch_future_test.go diff --git a/nomad/structs/batch_future.go b/nomad/structs/batch_future.go new file mode 100644 index 000000000..c0ddc30f3 --- /dev/null +++ b/nomad/structs/batch_future.go @@ -0,0 +1,43 @@ +package structs + +// BatchFuture is used to wait on a batch update to complete +type BatchFuture struct { + doneCh chan struct{} + err error + index uint64 +} + +// NewBatchFuture creates a new batch future +func NewBatchFuture() *BatchFuture { + return &BatchFuture{ + doneCh: make(chan struct{}), + } +} + +// Wait is used to block for the future to complete and returns the error +func (b *BatchFuture) Wait() error { + <-b.doneCh + return b.err +} + +// WaitCh is used to block for the future to complete +func (b *BatchFuture) WaitCh() <-chan struct{} { + return b.doneCh +} + +// Error is used to return the error of the batch, only after Wait() +func (b *BatchFuture) Error() error { + return b.err +} + +// Index is used to return the index of the batch, only after Wait() +func (b *BatchFuture) Index() uint64 { + return b.index +} + +// Respond is used to unblock the future +func (b *BatchFuture) Respond(index uint64, err error) { + b.index = index + b.err = err + close(b.doneCh) +} diff --git a/nomad/structs/batch_future_test.go b/nomad/structs/batch_future_test.go new file mode 100644 index 000000000..52ff12563 --- /dev/null +++ b/nomad/structs/batch_future_test.go @@ -0,0 +1,35 @@ +package structs + +import ( + "fmt" + "testing" + "time" +) + +func TestBatchFuture(t *testing.T) { + t.Parallel() + bf := NewBatchFuture() + + // Async respond to the future + expect := fmt.Errorf("testing") + go func() { + time.Sleep(10 * time.Millisecond) + bf.Respond(1000, expect) + }() + + // Block for the result + start := time.Now() + err := bf.Wait() + diff := time.Since(start) + if diff < 5*time.Millisecond { + t.Fatalf("too fast") + } + + // Check the results + if err != expect { + t.Fatalf("bad: %s", err) + } + if bf.Index() != 1000 { + t.Fatalf("bad: %d", bf.Index()) + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 328ff86a1..23aa8fc5b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7101,45 +7101,3 @@ type ACLTokenUpsertResponse struct { Tokens []*ACLToken WriteMeta } - -// BatchFuture is used to wait on a batch update to complete -type BatchFuture struct { - doneCh chan struct{} - err error - index uint64 -} - -// NewBatchFuture creates a new batch future -func NewBatchFuture() *BatchFuture { - return &BatchFuture{ - doneCh: make(chan struct{}), - } -} - -// Wait is used to block for the future to complete and returns the error -func (b *BatchFuture) Wait() error { - <-b.doneCh - return b.err -} - -// WaitCh is used to block for the future to complete -func (b *BatchFuture) WaitCh() <-chan struct{} { - return b.doneCh -} - -// Error is used to return the error of the batch, only after Wait() -func (b *BatchFuture) Error() error { - return b.err -} - -// Index is used to return the index of the batch, only after Wait() -func (b *BatchFuture) Index() uint64 { - return b.index -} - -// Respond is used to unblock the future -func (b *BatchFuture) Respond(index uint64, err error) { - b.index = index - b.err = err - close(b.doneCh) -} diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 1f2193b29..b76bb9b98 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -3652,34 +3652,6 @@ func TestNetworkResourcesEquals(t *testing.T) { } } -func TestBatchFuture(t *testing.T) { - t.Parallel() - bf := NewBatchFuture() - - // Async respond to the future - expect := fmt.Errorf("testing") - go func() { - time.Sleep(10 * time.Millisecond) - bf.Respond(1000, expect) - }() - - // Block for the result - start := time.Now() - err := bf.Wait() - diff := time.Since(start) - if diff < 5*time.Millisecond { - t.Fatalf("too fast") - } - - // Check the results - if err != expect { - t.Fatalf("bad: %s", err) - } - if bf.Index() != 1000 { - t.Fatalf("bad: %d", bf.Index()) - } -} - func TestNode_Canonicalize(t *testing.T) { t.Parallel() require := require.New(t) From feabcac0b78abcd9f86f33391746a9f678bac1f3 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 2 Apr 2018 16:40:47 -0700 Subject: [PATCH 3/4] drain: improve tests and fix spelling * transistion -> transition * don't t.Fatal in goroutines * don't mutate global state --- nomad/drainer/drainer.go | 8 ++--- nomad/drainer/drainer_util.go | 52 ++++++++++++++--------------- nomad/drainer/drainer_util_test.go | 20 +++++------ nomad/drainer_int_test.go | 53 +++++++++++++++++++++++------- 4 files changed, 80 insertions(+), 53 deletions(-) diff --git a/nomad/drainer/drainer.go b/nomad/drainer/drainer.go index b7c7ddf00..77799b80c 100644 --- a/nomad/drainer/drainer.go +++ b/nomad/drainer/drainer.go @@ -257,7 +257,7 @@ func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) { // Submit the node transistions in a sharded form to ensure a reasonable // Raft transaction size. - for _, nodes := range partitionIds(nodes) { + for _, nodes := range partitionIds(defaultMaxIdsPerTxn, nodes) { if _, err := n.raft.NodesDrainComplete(nodes); err != nil { n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err) } @@ -327,7 +327,7 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) { // Submit the node transistions in a sharded form to ensure a reasonable // Raft transaction size. - for _, nodes := range partitionIds(done) { + for _, nodes := range partitionIds(defaultMaxIdsPerTxn, done) { if _, err := n.raft.NodesDrainComplete(nodes); err != nil { n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err) } @@ -398,9 +398,9 @@ func (n *NodeDrainer) drainAllocs(future *structs.BatchFuture, allocs []*structs // Commit this update via Raft var finalIndex uint64 - for _, u := range partitionAllocDrain(transistions, evals) { - index, err := n.raft.AllocUpdateDesiredTransition(u.Transistions, u.Evals) var mErr multierror.Error + for _, u := range partitionAllocDrain(defaultMaxIdsPerTxn, transistions, evals) { + index, err := n.raft.AllocUpdateDesiredTransition(u.Transitions, u.Evals) if err != nil { mErr.Errors = append(mErr.Errors, err) } diff --git a/nomad/drainer/drainer_util.go b/nomad/drainer/drainer_util.go index 09d026235..eb27048cb 100644 --- a/nomad/drainer/drainer_util.go +++ b/nomad/drainer/drainer_util.go @@ -4,70 +4,70 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -var ( - // maxIdsPerTxn is the maximum number of IDs that can be included in a - // single Raft transaction. This is to ensure that the Raft message does not - // become too large. - maxIdsPerTxn = (1024 * 256) / 36 // 0.25 MB of ids. +const ( + // defaultMaxIdsPerTxn is the maximum number of IDs that can be included in a + // single Raft transaction. This is to ensure that the Raft message + // does not become too large. + defaultMaxIdsPerTxn = (1024 * 256) / 36 // 0.25 MB of ids. ) // partitionIds takes a set of IDs and returns a partitioned view of them such // that no batch would result in an overly large raft transaction. -func partitionIds(ids []string) [][]string { +func partitionIds(maxIds int, ids []string) [][]string { index := 0 total := len(ids) var partitions [][]string for remaining := total - index; remaining > 0; remaining = total - index { - if remaining < maxIdsPerTxn { + if remaining < maxIds { partitions = append(partitions, ids[index:]) break } else { - partitions = append(partitions, ids[index:index+maxIdsPerTxn]) - index += maxIdsPerTxn + partitions = append(partitions, ids[index:index+maxIds]) + index += maxIds } } return partitions } -// transistionTuple is used to group desired transistions and evals -type transistionTuple struct { - Transistions map[string]*structs.DesiredTransition - Evals []*structs.Evaluation +// transitionTuple is used to group desired transitions and evals +type transitionTuple struct { + Transitions map[string]*structs.DesiredTransition + Evals []*structs.Evaluation } -// partitionAllocDrain returns a list of alloc transistions and evals to apply +// partitionAllocDrain returns a list of alloc transitions and evals to apply // in a single raft transaction.This is necessary to ensure that the Raft // transaction does not become too large. -func partitionAllocDrain(transistions map[string]*structs.DesiredTransition, - evals []*structs.Evaluation) []*transistionTuple { +func partitionAllocDrain(maxIds int, transitions map[string]*structs.DesiredTransition, + evals []*structs.Evaluation) []*transitionTuple { - // Determine a stable ordering of the transistioning allocs - allocs := make([]string, 0, len(transistions)) - for id := range transistions { + // Determine a stable ordering of the transitioning allocs + allocs := make([]string, 0, len(transitions)) + for id := range transitions { allocs = append(allocs, id) } - var requests []*transistionTuple + var requests []*transitionTuple submittedEvals, submittedTrans := 0, 0 - for submittedEvals != len(evals) || submittedTrans != len(transistions) { - req := &transistionTuple{ - Transistions: make(map[string]*structs.DesiredTransition), + for submittedEvals != len(evals) || submittedTrans != len(transitions) { + req := &transitionTuple{ + Transitions: make(map[string]*structs.DesiredTransition), } requests = append(requests, req) - available := maxIdsPerTxn + available := maxIds // Add the allocs first if remaining := len(allocs) - submittedTrans; remaining > 0 { if remaining <= available { for _, id := range allocs[submittedTrans:] { - req.Transistions[id] = transistions[id] + req.Transitions[id] = transitions[id] } available -= remaining submittedTrans += remaining } else { for _, id := range allocs[submittedTrans : submittedTrans+available] { - req.Transistions[id] = transistions[id] + req.Transitions[id] = transitions[id] } submittedTrans += available diff --git a/nomad/drainer/drainer_util_test.go b/nomad/drainer/drainer_util_test.go index ee2f4a79f..057c4e24b 100644 --- a/nomad/drainer/drainer_util_test.go +++ b/nomad/drainer/drainer_util_test.go @@ -8,40 +8,38 @@ import ( ) func TestDrainer_PartitionAllocDrain(t *testing.T) { + t.Parallel() // Set the max ids per reap to something lower. - old := maxIdsPerTxn - defer func() { maxIdsPerTxn = old }() - maxIdsPerTxn = 2 + maxIdsPerTxn := 2 require := require.New(t) transistions := map[string]*structs.DesiredTransition{"a": nil, "b": nil, "c": nil} evals := []*structs.Evaluation{nil, nil, nil} - requests := partitionAllocDrain(transistions, evals) + requests := partitionAllocDrain(maxIdsPerTxn, transistions, evals) require.Len(requests, 3) first := requests[0] - require.Len(first.Transistions, 2) + require.Len(first.Transitions, 2) require.Len(first.Evals, 0) second := requests[1] - require.Len(second.Transistions, 1) + require.Len(second.Transitions, 1) require.Len(second.Evals, 1) third := requests[2] - require.Len(third.Transistions, 0) + require.Len(third.Transitions, 0) require.Len(third.Evals, 2) } func TestDrainer_PartitionIds(t *testing.T) { + t.Parallel() require := require.New(t) // Set the max ids per reap to something lower. - old := maxIdsPerTxn - defer func() { maxIdsPerTxn = old }() - maxIdsPerTxn = 2 + maxIdsPerTxn := 2 ids := []string{"1", "2", "3", "4", "5"} - requests := partitionIds(ids) + requests := partitionIds(maxIdsPerTxn, ids) require.Len(requests, 3) require.Len(requests[0], 2) require.Len(requests[1], 2) diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index b6cb88950..315a538f7 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -18,10 +18,9 @@ import ( "github.com/stretchr/testify/require" ) -func allocPromoter(t *testing.T, ctx context.Context, +func allocPromoter(errCh chan<- error, ctx context.Context, state *state.StateStore, codec rpc.ClientCodec, nodeID string, logger *log.Logger) { - t.Helper() nindex := uint64(1) for { @@ -31,7 +30,8 @@ func allocPromoter(t *testing.T, ctx context.Context, return } - t.Fatalf("failed to get node allocs: %v", err) + errCh <- fmt.Errorf("failed to get node allocs: %v", err) + return } nindex = index @@ -67,12 +67,23 @@ func allocPromoter(t *testing.T, ctx context.Context, if ctx.Err() == context.Canceled { return } else if err != nil { - require.Nil(t, err) + errCh <- err } } } } +// checkAllocPromoter is a small helper to return an error or nil from an error +// chan like the one given to the allocPromoter goroutine. +func checkAllocPromoter(errCh chan error) error { + select { + case err := <-errCh: + return err + default: + return nil + } +} + func getNodeAllocs(ctx context.Context, state *state.StateStore, nodeID string, index uint64) ([]*structs.Allocation, uint64, error) { resp, index, err := state.BlockingQuery(getNodeAllocsImpl(nodeID), index, ctx) if err != nil { @@ -169,10 +180,11 @@ func TestDrainer_Simple_ServiceOnly(t *testing.T) { require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) // Wait for the allocs to be replaced + errCh := make(chan error, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger) - go allocPromoter(t, ctx, state, codec, n2.ID, s1.logger) + go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) + go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) testutil.WaitForResult(func() (bool, error) { allocs, err := state.AllocsByNode(nil, n2.ID) @@ -186,6 +198,9 @@ func TestDrainer_Simple_ServiceOnly(t *testing.T) { // Check that the node drain is removed testutil.WaitForResult(func() (bool, error) { + if err := checkAllocPromoter(errCh); err != nil { + return false, err + } node, err := state.NodeByID(nil, n1.ID) if err != nil { return false, err @@ -422,14 +437,19 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) // Wait for the allocs to be replaced + errCh := make(chan error, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger) - go allocPromoter(t, ctx, state, codec, n2.ID, s1.logger) + go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) + go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) // Wait for the allocs to be stopped var finalAllocs []*structs.Allocation testutil.WaitForResult(func() (bool, error) { + if err := checkAllocPromoter(errCh); err != nil { + return false, err + } + var err error finalAllocs, err = state.AllocsByNode(nil, n1.ID) if err != nil { @@ -575,10 +595,11 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) // Wait for the allocs to be replaced + errCh := make(chan error, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger) - go allocPromoter(t, ctx, state, codec, n2.ID, s1.logger) + go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) + go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) // Wait for the service allocs to be stopped on the draining node testutil.WaitForResult(func() (bool, error) { @@ -594,6 +615,9 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus) } } + if err := checkAllocPromoter(errCh); err != nil { + return false, err + } return true, nil }, func(err error) { t.Fatalf("err: %v", err) @@ -635,7 +659,7 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { } // Test that transistions to force drain work. -func TestDrainer_Batch_TransistionToForce(t *testing.T) { +func TestDrainer_Batch_TransitionToForce(t *testing.T) { t.Parallel() require := require.New(t) @@ -707,12 +731,17 @@ func TestDrainer_Batch_TransistionToForce(t *testing.T) { require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) // Wait for the allocs to be replaced + errCh := make(chan error, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger) + go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) // Make sure the batch job isn't affected testutil.AssertUntil(500*time.Millisecond, func() (bool, error) { + if err := checkAllocPromoter(errCh); err != nil { + return false, err + } + allocs, err := state.AllocsByNode(nil, n1.ID) if err != nil { return false, err From 717bd7512ac8882f9bd8c7f12dcda982331dbcc6 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 3 Apr 2018 16:46:35 -0700 Subject: [PATCH 4/4] drain: return on first error If one error is encountered it is unlikely any further attempts will succeed, so fail fast. --- nomad/drainer/drainer.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/nomad/drainer/drainer.go b/nomad/drainer/drainer.go index 77799b80c..e6be05c7d 100644 --- a/nomad/drainer/drainer.go +++ b/nomad/drainer/drainer.go @@ -6,7 +6,6 @@ import ( "sync" "time" - multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" @@ -398,14 +397,14 @@ func (n *NodeDrainer) drainAllocs(future *structs.BatchFuture, allocs []*structs // Commit this update via Raft var finalIndex uint64 - var mErr multierror.Error for _, u := range partitionAllocDrain(defaultMaxIdsPerTxn, transistions, evals) { index, err := n.raft.AllocUpdateDesiredTransition(u.Transitions, u.Evals) if err != nil { - mErr.Errors = append(mErr.Errors, err) + future.Respond(0, err) + return } finalIndex = index } - future.Respond(finalIndex, mErr.ErrorOrNil()) + future.Respond(finalIndex, nil) }