exec: allow drivers to handle stream termination

Without this change, alloc_endpoint cancel the context passed to handler
when we detect EOF.  This races driver in setting exit code; and we run
into a case where the exec process terminates cleanly yet we attempt to
mark it as failed with context error.

Here, we rely on the driver to handle errors returned from Stream and
without racing to set an error.
This commit is contained in:
Mahmood Ali
2019-05-21 08:50:25 -04:00
parent 59946ff70f
commit 1492d0c49a

View File

@@ -240,7 +240,7 @@ func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, e
return helper.Int64ToPtr(404), fmt.Errorf("task %q is not running.", req.Task)
}
err = h(ctx, req.Cmd, req.Tty, newExecStream(cancel, decoder, encoder))
err = h(ctx, req.Cmd, req.Tty, newExecStream(decoder, encoder))
if err != nil {
code := helper.Int64ToPtr(500)
return code, err
@@ -250,11 +250,10 @@ func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, e
}
// newExecStream returns a new exec stream as expected by drivers that interpolate with RPC streaming format
func newExecStream(cancelFn func(), decoder *codec.Decoder, encoder *codec.Encoder) drivers.ExecTaskStream {
func newExecStream(decoder *codec.Decoder, encoder *codec.Encoder) drivers.ExecTaskStream {
buf := new(bytes.Buffer)
return &execStream{
cancelFn: cancelFn,
decoder: decoder,
decoder: decoder,
buf: buf,
encoder: encoder,
@@ -263,8 +262,7 @@ func newExecStream(cancelFn func(), decoder *codec.Decoder, encoder *codec.Encod
}
type execStream struct {
cancelFn func()
decoder *codec.Decoder
decoder *codec.Decoder
encoder *codec.Encoder
buf *bytes.Buffer
@@ -286,8 +284,5 @@ func (s *execStream) Send(m *drivers.ExecTaskStreamingResponseMsg) error {
func (s *execStream) Recv() (*drivers.ExecTaskStreamingRequestMsg, error) {
req := drivers.ExecTaskStreamingRequestMsg{}
err := s.decoder.Decode(&req)
if err == io.EOF || err == io.ErrClosedPipe {
s.cancelFn()
}
return &req, err
}