From 9904463da2aeac8d1e2d9a89e70e48a7f4f2aa3d Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Thu, 10 Jan 2019 14:20:18 -0500 Subject: [PATCH] executor: fix failing stats related test --- client/allocrunner/taskrunner/stats_hook.go | 4 -- client/structs/structs.go | 3 -- drivers/docker/config.go | 2 + drivers/shared/executor/client.go | 19 ++------ drivers/shared/executor/executor_test.go | 50 ++++++++++++++----- drivers/shared/executor/server.go | 53 +++++++++++---------- plugins/drivers/client.go | 19 ++------ 7 files changed, 72 insertions(+), 78 deletions(-) diff --git a/client/allocrunner/taskrunner/stats_hook.go b/client/allocrunner/taskrunner/stats_hook.go index 9870f71cd..bf5bece54 100644 --- a/client/allocrunner/taskrunner/stats_hook.go +++ b/client/allocrunner/taskrunner/stats_hook.go @@ -121,10 +121,6 @@ func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interf continue } - if ru.Err != nil { - h.logger.Warn("stats collection for task failed", "error", err) - } - // Update stats on TaskRunner and emit them h.updater.UpdateStats(ru) diff --git a/client/structs/structs.go b/client/structs/structs.go index d8ee3f954..f57c1d67c 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -236,9 +236,6 @@ type TaskResourceUsage struct { ResourceUsage *ResourceUsage Timestamp int64 // UnixNano Pids map[string]*ResourceUsage - - // Err is set by the driver if an error occurred during an RPC stream - Err error } // AllocResourceUsage holds the aggregated task resource usage of the diff --git a/drivers/docker/config.go b/drivers/docker/config.go index 52931b42d..2c1e5567a 100644 --- a/drivers/docker/config.go +++ b/drivers/docker/config.go @@ -131,6 +131,7 @@ var ( // and is used to parse the contents of the 'plugin "docker" {...}' block. // Example: // plugin "docker" { + // config { // endpoint = "unix:///var/run/docker.sock" // auth { // config = "/etc/docker-auth.json" @@ -152,6 +153,7 @@ var ( // } // allow_privileged = false // allow_caps = ["CHOWN", "NET_RAW" ... ] + // } // } configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ "endpoint": hclspec.NewAttr("endpoint", "string", false), diff --git a/drivers/shared/executor/client.go b/drivers/shared/executor/client.go index 67698425f..1e44788f6 100644 --- a/drivers/shared/executor/client.go +++ b/drivers/shared/executor/client.go @@ -14,7 +14,6 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/drivers/shared/executor/proto" "github.com/hashicorp/nomad/plugins/drivers" - "github.com/hashicorp/nomad/plugins/shared/grpcutils" ) var _ Executor = (*grpcExecutorClient)(nil) @@ -121,14 +120,7 @@ func (c *grpcExecutorClient) handleStats(ctx context.Context, stream proto.Execu resp, err := stream.Recv() if err != nil { if err != io.EOF { - c.logger.Error("error receiving stream from Stats executor RPC", "error", err) - truErr := &cstructs.TaskResourceUsage{ - Err: grpcutils.HandleReqCtxGrpcErr(err, ctx, c.doneCtx), - } - select { - case ch <- truErr: - case <-ctx.Done(): - } + c.logger.Error("error receiving stream from Stats executor RPC, closing stream", "error", err) } // End stream @@ -137,13 +129,8 @@ func (c *grpcExecutorClient) handleStats(ctx context.Context, stream proto.Execu stats, err := drivers.TaskStatsFromProto(resp.Stats) if err != nil { - truErr := &cstructs.TaskResourceUsage{ - Err: fmt.Errorf("failed to decode stats from RPC: %v", err), - } - select { - case ch <- truErr: - case <-ctx.Done(): - } + c.logger.Error("failed to decode stats from RPC", "error", err, "stats", resp.Stats) + continue } select { diff --git a/drivers/shared/executor/executor_test.go b/drivers/shared/executor/executor_test.go index dc59f4579..76db5816f 100644 --- a/drivers/shared/executor/executor_test.go +++ b/drivers/shared/executor/executor_test.go @@ -78,6 +78,8 @@ func TestExecutor_Start_Invalid(pt *testing.T) { pt.Parallel() invalid := "/bin/foobar" for name, factory := range executorFactories { + name := name + factory := factory pt.Run(name, func(t *testing.T) { require := require.New(t) execCmd, allocDir := testExecutorCommand(t) @@ -96,6 +98,8 @@ func TestExecutor_Start_Invalid(pt *testing.T) { func TestExecutor_Start_Wait_Failure_Code(pt *testing.T) { pt.Parallel() for name, factory := range executorFactories { + name := name + factory := factory pt.Run(name, func(t *testing.T) { require := require.New(t) execCmd, allocDir := testExecutorCommand(t) @@ -118,6 +122,8 @@ func TestExecutor_Start_Wait_Failure_Code(pt *testing.T) { func TestExecutor_Start_Wait(pt *testing.T) { pt.Parallel() for name, factory := range executorFactories { + name := name + factory := factory pt.Run(name, func(t *testing.T) { require := require.New(t) execCmd, allocDir := testExecutorCommand(t) @@ -155,6 +161,8 @@ func TestExecutor_Start_Wait(pt *testing.T) { func TestExecutor_WaitExitSignal(pt *testing.T) { pt.Parallel() for name, factory := range executorFactories { + name := name + factory := factory pt.Run(name, func(t *testing.T) { require := require.New(t) execCmd, allocDir := testExecutorCommand(t) @@ -170,18 +178,32 @@ func TestExecutor_WaitExitSignal(pt *testing.T) { go func() { // Give process time to start time.Sleep(time.Second) - ch, err := executor.Stats(context.Background(), time.Second) - assert.NoError(t, err) - select { - case <-time.After(time.Second): - assert.Fail(t, "stats failed to send on interval") - case ru := <-ch: - assert.NotEmpty(t, ru.Pids) - } - proc, err := os.FindProcess(ps.Pid) - assert.NoError(t, err) - err = proc.Signal(syscall.SIGKILL) - assert.NoError(t, err) + tu.WaitForResult(func() (bool, error) { + ch, err := executor.Stats(context.Background(), time.Second) + if err != nil { + return false, err + } + select { + case <-time.After(time.Second): + return false, fmt.Errorf("stats failed to send on interval") + case ru := <-ch: + if len(ru.Pids) == 0 { + return false, fmt.Errorf("no pids recorded in stats") + } + } + proc, err := os.FindProcess(ps.Pid) + if err != nil { + return false, err + } + err = proc.Signal(syscall.SIGKILL) + if err != nil { + return false, err + } + return true, nil + }, func(err error) { + assert.NoError(t, err) + executor.Shutdown("SIGINT", 0) + }) }() ps, err = executor.Wait(context.Background()) @@ -194,6 +216,8 @@ func TestExecutor_WaitExitSignal(pt *testing.T) { func TestExecutor_Start_Kill(pt *testing.T) { pt.Parallel() for name, factory := range executorFactories { + name := name + factory := factory pt.Run(name, func(t *testing.T) { require := require.New(t) execCmd, allocDir := testExecutorCommand(t) @@ -296,7 +320,7 @@ func TestUniversalExecutor_LookupPath(t *testing.T) { // than mounting the underlying host filesystem func setupRootfs(t *testing.T, rootfs string) { paths := []string{ - "/bin/sh", + // "/bin/sh", "/bin/sleep", "/bin/echo", "/bin/date", diff --git a/drivers/shared/executor/server.go b/drivers/shared/executor/server.go index 3cda738cd..b9db752d6 100644 --- a/drivers/shared/executor/server.go +++ b/drivers/shared/executor/server.go @@ -6,8 +6,12 @@ import ( "github.com/golang/protobuf/ptypes" "github.com/hashicorp/nomad/drivers/shared/executor/proto" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" + sproto "github.com/hashicorp/nomad/plugins/shared/structs/proto" "golang.org/x/net/context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type grpcExecutorServer struct { @@ -88,45 +92,42 @@ func (s *grpcExecutorServer) Version(context.Context, *proto.VersionRequest) (*p } func (s *grpcExecutorServer) Stats(req *proto.StatsRequest, stream proto.Executor_StatsServer) error { - ctx := stream.Context() - interval := time.Duration(req.Interval) - if interval.Nanoseconds() == 0 { + if interval == 0 { interval = time.Second } outCh, err := s.impl.Stats(stream.Context(), time.Duration(req.Interval)) if err != nil { + if rec, ok := err.(structs.Recoverable); ok { + st := status.New(codes.FailedPrecondition, rec.Error()) + st, err := st.WithDetails(&sproto.RecoverableError{Recoverable: rec.IsRecoverable()}) + if err != nil { + // If this error, it will always error + panic(err) + } + return st.Err() + } return err } - for { + for resp := range outCh { + pbStats, err := drivers.TaskStatsToProto(resp) + if err != nil { + return err + } - select { - case <-ctx.Done(): - return nil - case resp, ok := <-outCh: - // chan closed, end stream - if !ok { - return nil - } - - pbStats, err := drivers.TaskStatsToProto(resp) - if err != nil { - return err - } - - presp := &proto.StatsResponse{ - Stats: pbStats, - } - - // Send the stats - if err := stream.Send(presp); err != nil { - return err - } + presp := &proto.StatsResponse{ + Stats: pbStats, + } + // Send the stats + if err := stream.Send(presp); err != nil { + return err } } + + return nil } func (s *grpcExecutorServer) Signal(ctx context.Context, req *proto.SignalRequest) (*proto.SignalResponse, error) { diff --git a/plugins/drivers/client.go b/plugins/drivers/client.go index 10d5135f9..d5225e384 100644 --- a/plugins/drivers/client.go +++ b/plugins/drivers/client.go @@ -3,7 +3,6 @@ package drivers import ( "context" "errors" - "fmt" "io" "time" @@ -285,14 +284,7 @@ func (d *driverPluginClient) handleStats(ctx context.Context, ch chan<- *cstruct resp, err := stream.Recv() if err != nil { if err != io.EOF { - d.logger.Error("error receiving stream from TaskStats driver RPC", "error", err) - truErr := &cstructs.TaskResourceUsage{ - Err: grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx), - } - select { - case ch <- truErr: - case <-ctx.Done(): - } + d.logger.Error("error receiving stream from TaskStats driver RPC, closing stream", "error", err) } // End of stream @@ -301,13 +293,8 @@ func (d *driverPluginClient) handleStats(ctx context.Context, ch chan<- *cstruct stats, err := TaskStatsFromProto(resp.Stats) if err != nil { - truErr := &cstructs.TaskResourceUsage{ - Err: fmt.Errorf("failed to decode stats from RPC: %v", err), - } - select { - case ch <- truErr: - case <-ctx.Done(): - } + d.logger.Error("failed to decode stats from RPC", "error", err, "stats", resp.Stats) + continue } select {