diff --git a/client/alloc_endpoint.go b/client/alloc_endpoint.go index febad3fbf..26c3c06a8 100644 --- a/client/alloc_endpoint.go +++ b/client/alloc_endpoint.go @@ -240,7 +240,7 @@ func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, e return helper.Int64ToPtr(404), fmt.Errorf("task %q is not running.", req.Task) } - err = h(ctx, req.Cmd, req.Tty, newExecStream(cancel, decoder, encoder)) + err = h(ctx, req.Cmd, req.Tty, newExecStream(decoder, encoder)) if err != nil { code := helper.Int64ToPtr(500) return code, err @@ -250,11 +250,10 @@ func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, e } // newExecStream returns a new exec stream as expected by drivers that interpolate with RPC streaming format -func newExecStream(cancelFn func(), decoder *codec.Decoder, encoder *codec.Encoder) drivers.ExecTaskStream { +func newExecStream(decoder *codec.Decoder, encoder *codec.Encoder) drivers.ExecTaskStream { buf := new(bytes.Buffer) return &execStream{ - cancelFn: cancelFn, - decoder: decoder, + decoder: decoder, buf: buf, encoder: encoder, @@ -263,8 +262,7 @@ func newExecStream(cancelFn func(), decoder *codec.Decoder, encoder *codec.Encod } type execStream struct { - cancelFn func() - decoder *codec.Decoder + decoder *codec.Decoder encoder *codec.Encoder buf *bytes.Buffer @@ -286,8 +284,5 @@ func (s *execStream) Send(m *drivers.ExecTaskStreamingResponseMsg) error { func (s *execStream) Recv() (*drivers.ExecTaskStreamingRequestMsg, error) { req := drivers.ExecTaskStreamingRequestMsg{} err := s.decoder.Decode(&req) - if err == io.EOF || err == io.ErrClosedPipe { - s.cancelFn() - } return &req, err }