From 1492d0c49ae353e8d83ed366e1983c78250bee77 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 21 May 2019 08:50:25 -0400 Subject: [PATCH] exec: allow drivers to handle stream termination Without this change, alloc_endpoint cancel the context passed to handler when we detect EOF. This races driver in setting exit code; and we run into a case where the exec process terminates cleanly yet we attempt to mark it as failed with context error. Here, we rely on the driver to handle errors returned from Stream and without racing to set an error. --- client/alloc_endpoint.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/client/alloc_endpoint.go b/client/alloc_endpoint.go index febad3fbf..26c3c06a8 100644 --- a/client/alloc_endpoint.go +++ b/client/alloc_endpoint.go @@ -240,7 +240,7 @@ func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, e return helper.Int64ToPtr(404), fmt.Errorf("task %q is not running.", req.Task) } - err = h(ctx, req.Cmd, req.Tty, newExecStream(cancel, decoder, encoder)) + err = h(ctx, req.Cmd, req.Tty, newExecStream(decoder, encoder)) if err != nil { code := helper.Int64ToPtr(500) return code, err @@ -250,11 +250,10 @@ func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, e } // newExecStream returns a new exec stream as expected by drivers that interpolate with RPC streaming format -func newExecStream(cancelFn func(), decoder *codec.Decoder, encoder *codec.Encoder) drivers.ExecTaskStream { +func newExecStream(decoder *codec.Decoder, encoder *codec.Encoder) drivers.ExecTaskStream { buf := new(bytes.Buffer) return &execStream{ - cancelFn: cancelFn, - decoder: decoder, + decoder: decoder, buf: buf, encoder: encoder, @@ -263,8 +262,7 @@ func newExecStream(cancelFn func(), decoder *codec.Decoder, encoder *codec.Encod } type execStream struct { - cancelFn func() - decoder *codec.Decoder + decoder *codec.Decoder encoder *codec.Encoder buf *bytes.Buffer @@ -286,8 +284,5 @@ func (s *execStream) Send(m *drivers.ExecTaskStreamingResponseMsg) error { func (s *execStream) Recv() (*drivers.ExecTaskStreamingRequestMsg, error) { req := drivers.ExecTaskStreamingRequestMsg{} err := s.decoder.Decode(&req) - if err == io.EOF || err == io.ErrClosedPipe { - s.cancelFn() - } return &req, err }