From 091d0e3eeb3c37f20f6d7ff8881f0a00d9ea8a1d Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 11 Aug 2016 18:16:10 -0700 Subject: [PATCH 1/7] Add travis multiplier --- command/agent/fs_endpoint_test.go | 40 +++++++++++++++---------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index 52cc58e54..1804e61b9 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -155,7 +155,7 @@ func TestStreamFramer_Flush(t *testing.T) { select { case <-resultCh: - case <-time.After(2 * bWindow): + case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * bWindow): t.Fatalf("failed to flush") } @@ -166,7 +166,7 @@ func TestStreamFramer_Flush(t *testing.T) { select { case <-sf.ExitCh(): - case <-time.After(2 * hRate): + case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("exit channel should close") } @@ -224,7 +224,7 @@ func TestStreamFramer_Batch(t *testing.T) { select { case <-resultCh: t.Fatalf("Got data before frame size reached") - case <-time.After(bWindow / 2): + case <-time.After(bWindow * time.Duration(testutil.TestMultiplier()) / 2): } // Write the rest so we hit the frame size @@ -235,7 +235,7 @@ func TestStreamFramer_Batch(t *testing.T) { // Ensure we get data select { case <-resultCh: - case <-time.After(2 * bWindow): + case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * bWindow): t.Fatalf("Did not receive data after batch size reached") } @@ -246,7 +246,7 @@ func TestStreamFramer_Batch(t *testing.T) { select { case <-sf.ExitCh(): - case <-time.After(2 * hRate): + case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("exit channel should close") } @@ -285,7 +285,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) { select { case <-resultCh: - case <-time.After(2 * hRate): + case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("failed to heartbeat") } @@ -296,7 +296,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) { select { case <-sf.ExitCh(): - case <-time.After(2 * hRate): + case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("exit channel should close") } @@ -374,7 +374,7 @@ func TestStreamFramer_Order(t *testing.T) { // Ensure we get data select { case <-resultCh: - case <-time.After(10 * bWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow): got := receivedBuf.String() want := expected.String() t.Fatalf("Got %v; want %v", got, want) @@ -387,7 +387,7 @@ func TestStreamFramer_Order(t *testing.T) { select { case <-sf.ExitCh(): - case <-time.After(2 * hRate): + case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("exit channel should close") } @@ -535,14 +535,14 @@ func TestHTTP_Stream_Modify(t *testing.T) { // Sleep a little before writing more. This lets us check if the watch // is working. - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) if _, err := f.Write(data[3:]); err != nil { t.Fatalf("write failed: %v", err) } select { case <-resultCh: - case <-time.After(2 * streamBatchWindow): + case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("failed to send new data") } }) @@ -616,7 +616,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) { // Sleep a little before truncating. This lets us check if the watch // is working. - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) if err := f.Truncate(0); err != nil { t.Fatalf("truncate failed: %v", err) } @@ -638,20 +638,20 @@ func TestHTTP_Stream_Truncate(t *testing.T) { select { case <-truncateCh: - case <-time.After(2 * streamBatchWindow): + case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive truncate") } // Sleep a little before writing more. This lets us check if the watch // is working. - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) if _, err := f2.Write(data[5:]); err != nil { t.Fatalf("write failed: %v", err) } select { case <-dataPostTruncCh: - case <-time.After(2 * streamBatchWindow): + case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive post truncate data") } }) @@ -718,14 +718,14 @@ func TestHTTP_Stream_Delete(t *testing.T) { // Sleep a little before deleting. This lets us check if the watch // is working. - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) if err := os.Remove(streamFilePath); err != nil { t.Fatalf("delete failed: %v", err) } select { case <-deleteCh: - case <-time.After(4 * streamBatchWindow): + case <-time.After(4 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive delete") } @@ -807,7 +807,7 @@ func TestHTTP_Logs_NoFollow(t *testing.T) { select { case <-resultCh: - case <-time.After(4 * streamBatchWindow): + case <-time.After(4 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive data: got %q", string(received)) } @@ -896,7 +896,7 @@ func TestHTTP_Logs_Follow(t *testing.T) { select { case <-firstResultCh: - case <-time.After(4 * streamBatchWindow): + case <-time.After(4 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive data: got %q", string(received)) } @@ -908,7 +908,7 @@ func TestHTTP_Logs_Follow(t *testing.T) { select { case <-fullResultCh: - case <-time.After(4 * streamBatchWindow): + case <-time.After(4 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive data: got %q", string(received)) } From e0bb6b415f6ebec3a5eda22ebba5c55b8100018d Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 11 Aug 2016 18:17:40 -0700 Subject: [PATCH 2/7] only test command/agent --- scripts/test.sh | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/scripts/test.sh b/scripts/test.sh index b9cde8b9a..510f8fa57 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -11,8 +11,9 @@ go build -o $TEMPDIR/nomad || exit 1 # Run the tests echo "--> Running tests" GOBIN="`which go`" -go list ./... | grep -v '^github.com/hashicorp/nomad/vendor/' | \ - sudo \ - -E PATH=$TEMPDIR:$PATH \ - -E GOPATH=$GOPATH \ - xargs $GOBIN test -v ${GOTEST_FLAGS:--cover -timeout=900s} +$GOBIN test -v -timeout=900s github.com/hashicorp/nomad/command/agent +#go list ./... | grep -v '^github.com/hashicorp/nomad/vendor/' | \ + #sudo \ + #-E PATH=$TEMPDIR:$PATH \ + #-E GOPATH=$GOPATH \ + #xargs $GOBIN test -v ${GOTEST_FLAGS:--cover -timeout=900s} From 2de52849db3f43f73dc71019c583ca26fc718796 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 11 Aug 2016 18:30:46 -0700 Subject: [PATCH 3/7] up timeouts --- command/agent/fs_endpoint_test.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index 1804e61b9..6e4113f6f 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -155,7 +155,7 @@ func TestStreamFramer_Flush(t *testing.T) { select { case <-resultCh: - case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * bWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow): t.Fatalf("failed to flush") } @@ -166,7 +166,7 @@ func TestStreamFramer_Flush(t *testing.T) { select { case <-sf.ExitCh(): - case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * hRate): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("exit channel should close") } @@ -224,7 +224,7 @@ func TestStreamFramer_Batch(t *testing.T) { select { case <-resultCh: t.Fatalf("Got data before frame size reached") - case <-time.After(bWindow * time.Duration(testutil.TestMultiplier()) / 2): + case <-time.After(bWindow / 2): } // Write the rest so we hit the frame size @@ -235,7 +235,7 @@ func TestStreamFramer_Batch(t *testing.T) { // Ensure we get data select { case <-resultCh: - case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * bWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow): t.Fatalf("Did not receive data after batch size reached") } @@ -246,7 +246,7 @@ func TestStreamFramer_Batch(t *testing.T) { select { case <-sf.ExitCh(): - case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * hRate): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("exit channel should close") } @@ -285,7 +285,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) { select { case <-resultCh: - case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * hRate): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("failed to heartbeat") } @@ -296,7 +296,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) { select { case <-sf.ExitCh(): - case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * hRate): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("exit channel should close") } @@ -387,7 +387,7 @@ func TestStreamFramer_Order(t *testing.T) { select { case <-sf.ExitCh(): - case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * hRate): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("exit channel should close") } @@ -542,7 +542,7 @@ func TestHTTP_Stream_Modify(t *testing.T) { select { case <-resultCh: - case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("failed to send new data") } }) @@ -638,7 +638,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) { select { case <-truncateCh: - case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive truncate") } @@ -651,7 +651,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) { select { case <-dataPostTruncCh: - case <-time.After(2 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive post truncate data") } }) @@ -725,7 +725,7 @@ func TestHTTP_Stream_Delete(t *testing.T) { select { case <-deleteCh: - case <-time.After(4 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive delete") } @@ -807,7 +807,7 @@ func TestHTTP_Logs_NoFollow(t *testing.T) { select { case <-resultCh: - case <-time.After(4 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive data: got %q", string(received)) } @@ -896,7 +896,7 @@ func TestHTTP_Logs_Follow(t *testing.T) { select { case <-firstResultCh: - case <-time.After(4 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive data: got %q", string(received)) } @@ -908,7 +908,7 @@ func TestHTTP_Logs_Follow(t *testing.T) { select { case <-fullResultCh: - case <-time.After(4 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive data: got %q", string(received)) } From 014d8fe0e8739245f04f00ab170a157d811c7a86 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 11 Aug 2016 18:37:08 -0700 Subject: [PATCH 4/7] lock heartbeat --- command/agent/fs_endpoint.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 3a1ca361a..342098429 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -348,10 +348,12 @@ func (s *StreamFramer) run() { s.l.Unlock() case <-s.heartbeat.C: // Send a heartbeat frame + s.l.Lock() select { case s.outbound <- &StreamFrame{}: default: } + s.l.Unlock() } } }() From 17b9e7ddb4f1afc8bdea0cb4ad32bdad1be6deee Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 11 Aug 2016 18:59:48 -0700 Subject: [PATCH 5/7] only use polling --- client/allocdir/alloc_dir.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 514f05cfb..f9e6c05f9 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -6,7 +6,6 @@ import ( "io/ioutil" "os" "path/filepath" - "runtime" "time" "gopkg.in/tomb.v1" @@ -367,12 +366,7 @@ func (d *AllocDir) ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*wa // getFileWatcher returns a FileWatcher for the given path. func getFileWatcher(path string) watch.FileWatcher { - if runtime.GOOS == "windows" { - // There are some deadlock issues with the inotify implementation on - // windows. Use polling watcher for now. - return watch.NewPollingFileWatcher(path) - } - return watch.NewInotifyFileWatcher(path) + return watch.NewPollingFileWatcher(path) } func fileCopy(src, dst string, perm os.FileMode) error { From 049a7b030d3ba14545db9e5c110942a1f52210c0 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 11 Aug 2016 19:07:13 -0700 Subject: [PATCH 6/7] fs uses timed unblocker like logs command --- command/fs.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/command/fs.go b/command/fs.go index 32a6145b5..14eb621b2 100644 --- a/command/fs.go +++ b/command/fs.go @@ -289,7 +289,7 @@ func (f *FSCommand) Run(args []string) int { // If numLines is set, wrap the reader if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0) + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second) } } @@ -324,11 +324,12 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, // Create a reader var r io.ReadCloser frameReader := api.NewFrameReader(frames, cancel) + frameReader.SetUnblockTime(500 * time.Millisecond) r = frameReader // If numLines is set, wrap the reader if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0) + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second) } go func() { From 7b8e19e166bbc72bb65ce0a1e490825a0343dc2c Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 11 Aug 2016 19:24:37 -0700 Subject: [PATCH 7/7] revert test script --- scripts/test.sh | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/scripts/test.sh b/scripts/test.sh index 510f8fa57..b9cde8b9a 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -11,9 +11,8 @@ go build -o $TEMPDIR/nomad || exit 1 # Run the tests echo "--> Running tests" GOBIN="`which go`" -$GOBIN test -v -timeout=900s github.com/hashicorp/nomad/command/agent -#go list ./... | grep -v '^github.com/hashicorp/nomad/vendor/' | \ - #sudo \ - #-E PATH=$TEMPDIR:$PATH \ - #-E GOPATH=$GOPATH \ - #xargs $GOBIN test -v ${GOTEST_FLAGS:--cover -timeout=900s} +go list ./... | grep -v '^github.com/hashicorp/nomad/vendor/' | \ + sudo \ + -E PATH=$TEMPDIR:$PATH \ + -E GOPATH=$GOPATH \ + xargs $GOBIN test -v ${GOTEST_FLAGS:--cover -timeout=900s}