diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 514f05cfb..f9e6c05f9 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -6,7 +6,6 @@ import ( "io/ioutil" "os" "path/filepath" - "runtime" "time" "gopkg.in/tomb.v1" @@ -367,12 +366,7 @@ func (d *AllocDir) ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*wa // getFileWatcher returns a FileWatcher for the given path. func getFileWatcher(path string) watch.FileWatcher { - if runtime.GOOS == "windows" { - // There are some deadlock issues with the inotify implementation on - // windows. Use polling watcher for now. - return watch.NewPollingFileWatcher(path) - } - return watch.NewInotifyFileWatcher(path) + return watch.NewPollingFileWatcher(path) } func fileCopy(src, dst string, perm os.FileMode) error { diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 3a1ca361a..342098429 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -348,10 +348,12 @@ func (s *StreamFramer) run() { s.l.Unlock() case <-s.heartbeat.C: // Send a heartbeat frame + s.l.Lock() select { case s.outbound <- &StreamFrame{}: default: } + s.l.Unlock() } } }() diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index 52cc58e54..6e4113f6f 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -155,7 +155,7 @@ func TestStreamFramer_Flush(t *testing.T) { select { case <-resultCh: - case <-time.After(2 * bWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow): t.Fatalf("failed to flush") } @@ -166,7 +166,7 @@ func TestStreamFramer_Flush(t *testing.T) { select { case <-sf.ExitCh(): - case <-time.After(2 * hRate): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("exit channel should close") } @@ -235,7 +235,7 @@ func TestStreamFramer_Batch(t *testing.T) { // Ensure we get data select { case <-resultCh: - case <-time.After(2 * bWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow): t.Fatalf("Did not receive data after batch size reached") } @@ -246,7 +246,7 @@ func TestStreamFramer_Batch(t *testing.T) { select { case <-sf.ExitCh(): - case <-time.After(2 * hRate): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("exit channel should close") } @@ -285,7 +285,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) { select { case <-resultCh: - case <-time.After(2 * hRate): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("failed to heartbeat") } @@ -296,7 +296,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) { select { case <-sf.ExitCh(): - case <-time.After(2 * hRate): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("exit channel should close") } @@ -374,7 +374,7 @@ func TestStreamFramer_Order(t *testing.T) { // Ensure we get data select { case <-resultCh: - case <-time.After(10 * bWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow): got := receivedBuf.String() want := expected.String() t.Fatalf("Got %v; want %v", got, want) @@ -387,7 +387,7 @@ func TestStreamFramer_Order(t *testing.T) { select { case <-sf.ExitCh(): - case <-time.After(2 * hRate): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate): t.Fatalf("exit channel should close") } @@ -535,14 +535,14 @@ func TestHTTP_Stream_Modify(t *testing.T) { // Sleep a little before writing more. This lets us check if the watch // is working. - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) if _, err := f.Write(data[3:]); err != nil { t.Fatalf("write failed: %v", err) } select { case <-resultCh: - case <-time.After(2 * streamBatchWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("failed to send new data") } }) @@ -616,7 +616,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) { // Sleep a little before truncating. This lets us check if the watch // is working. - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) if err := f.Truncate(0); err != nil { t.Fatalf("truncate failed: %v", err) } @@ -638,20 +638,20 @@ func TestHTTP_Stream_Truncate(t *testing.T) { select { case <-truncateCh: - case <-time.After(2 * streamBatchWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive truncate") } // Sleep a little before writing more. This lets us check if the watch // is working. - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) if _, err := f2.Write(data[5:]); err != nil { t.Fatalf("write failed: %v", err) } select { case <-dataPostTruncCh: - case <-time.After(2 * streamBatchWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive post truncate data") } }) @@ -718,14 +718,14 @@ func TestHTTP_Stream_Delete(t *testing.T) { // Sleep a little before deleting. This lets us check if the watch // is working. - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Duration(testutil.TestMultiplier()) * time.Second) if err := os.Remove(streamFilePath); err != nil { t.Fatalf("delete failed: %v", err) } select { case <-deleteCh: - case <-time.After(4 * streamBatchWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive delete") } @@ -807,7 +807,7 @@ func TestHTTP_Logs_NoFollow(t *testing.T) { select { case <-resultCh: - case <-time.After(4 * streamBatchWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive data: got %q", string(received)) } @@ -896,7 +896,7 @@ func TestHTTP_Logs_Follow(t *testing.T) { select { case <-firstResultCh: - case <-time.After(4 * streamBatchWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive data: got %q", string(received)) } @@ -908,7 +908,7 @@ func TestHTTP_Logs_Follow(t *testing.T) { select { case <-fullResultCh: - case <-time.After(4 * streamBatchWindow): + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * streamBatchWindow): t.Fatalf("did not receive data: got %q", string(received)) } diff --git a/command/fs.go b/command/fs.go index 32a6145b5..14eb621b2 100644 --- a/command/fs.go +++ b/command/fs.go @@ -289,7 +289,7 @@ func (f *FSCommand) Run(args []string) int { // If numLines is set, wrap the reader if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0) + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second) } } @@ -324,11 +324,12 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, // Create a reader var r io.ReadCloser frameReader := api.NewFrameReader(frames, cancel) + frameReader.SetUnblockTime(500 * time.Millisecond) r = frameReader // If numLines is set, wrap the reader if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0) + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second) } go func() {