mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 01:15:43 +03:00
executor: fix failing stats related test
This commit is contained in:
@@ -121,10 +121,6 @@ func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interf
|
||||
continue
|
||||
}
|
||||
|
||||
if ru.Err != nil {
|
||||
h.logger.Warn("stats collection for task failed", "error", err)
|
||||
}
|
||||
|
||||
// Update stats on TaskRunner and emit them
|
||||
h.updater.UpdateStats(ru)
|
||||
|
||||
|
||||
@@ -236,9 +236,6 @@ type TaskResourceUsage struct {
|
||||
ResourceUsage *ResourceUsage
|
||||
Timestamp int64 // UnixNano
|
||||
Pids map[string]*ResourceUsage
|
||||
|
||||
// Err is set by the driver if an error occurred during an RPC stream
|
||||
Err error
|
||||
}
|
||||
|
||||
// AllocResourceUsage holds the aggregated task resource usage of the
|
||||
|
||||
@@ -131,6 +131,7 @@ var (
|
||||
// and is used to parse the contents of the 'plugin "docker" {...}' block.
|
||||
// Example:
|
||||
// plugin "docker" {
|
||||
// config {
|
||||
// endpoint = "unix:///var/run/docker.sock"
|
||||
// auth {
|
||||
// config = "/etc/docker-auth.json"
|
||||
@@ -152,6 +153,7 @@ var (
|
||||
// }
|
||||
// allow_privileged = false
|
||||
// allow_caps = ["CHOWN", "NET_RAW" ... ]
|
||||
// }
|
||||
// }
|
||||
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
|
||||
"endpoint": hclspec.NewAttr("endpoint", "string", false),
|
||||
|
||||
@@ -14,7 +14,6 @@ import (
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/drivers/shared/executor/proto"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
"github.com/hashicorp/nomad/plugins/shared/grpcutils"
|
||||
)
|
||||
|
||||
var _ Executor = (*grpcExecutorClient)(nil)
|
||||
@@ -121,14 +120,7 @@ func (c *grpcExecutorClient) handleStats(ctx context.Context, stream proto.Execu
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
c.logger.Error("error receiving stream from Stats executor RPC", "error", err)
|
||||
truErr := &cstructs.TaskResourceUsage{
|
||||
Err: grpcutils.HandleReqCtxGrpcErr(err, ctx, c.doneCtx),
|
||||
}
|
||||
select {
|
||||
case ch <- truErr:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
c.logger.Error("error receiving stream from Stats executor RPC, closing stream", "error", err)
|
||||
}
|
||||
|
||||
// End stream
|
||||
@@ -137,13 +129,8 @@ func (c *grpcExecutorClient) handleStats(ctx context.Context, stream proto.Execu
|
||||
|
||||
stats, err := drivers.TaskStatsFromProto(resp.Stats)
|
||||
if err != nil {
|
||||
truErr := &cstructs.TaskResourceUsage{
|
||||
Err: fmt.Errorf("failed to decode stats from RPC: %v", err),
|
||||
}
|
||||
select {
|
||||
case ch <- truErr:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
c.logger.Error("failed to decode stats from RPC", "error", err, "stats", resp.Stats)
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
|
||||
@@ -78,6 +78,8 @@ func TestExecutor_Start_Invalid(pt *testing.T) {
|
||||
pt.Parallel()
|
||||
invalid := "/bin/foobar"
|
||||
for name, factory := range executorFactories {
|
||||
name := name
|
||||
factory := factory
|
||||
pt.Run(name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
execCmd, allocDir := testExecutorCommand(t)
|
||||
@@ -96,6 +98,8 @@ func TestExecutor_Start_Invalid(pt *testing.T) {
|
||||
func TestExecutor_Start_Wait_Failure_Code(pt *testing.T) {
|
||||
pt.Parallel()
|
||||
for name, factory := range executorFactories {
|
||||
name := name
|
||||
factory := factory
|
||||
pt.Run(name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
execCmd, allocDir := testExecutorCommand(t)
|
||||
@@ -118,6 +122,8 @@ func TestExecutor_Start_Wait_Failure_Code(pt *testing.T) {
|
||||
func TestExecutor_Start_Wait(pt *testing.T) {
|
||||
pt.Parallel()
|
||||
for name, factory := range executorFactories {
|
||||
name := name
|
||||
factory := factory
|
||||
pt.Run(name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
execCmd, allocDir := testExecutorCommand(t)
|
||||
@@ -155,6 +161,8 @@ func TestExecutor_Start_Wait(pt *testing.T) {
|
||||
func TestExecutor_WaitExitSignal(pt *testing.T) {
|
||||
pt.Parallel()
|
||||
for name, factory := range executorFactories {
|
||||
name := name
|
||||
factory := factory
|
||||
pt.Run(name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
execCmd, allocDir := testExecutorCommand(t)
|
||||
@@ -170,18 +178,32 @@ func TestExecutor_WaitExitSignal(pt *testing.T) {
|
||||
go func() {
|
||||
// Give process time to start
|
||||
time.Sleep(time.Second)
|
||||
ch, err := executor.Stats(context.Background(), time.Second)
|
||||
assert.NoError(t, err)
|
||||
select {
|
||||
case <-time.After(time.Second):
|
||||
assert.Fail(t, "stats failed to send on interval")
|
||||
case ru := <-ch:
|
||||
assert.NotEmpty(t, ru.Pids)
|
||||
}
|
||||
proc, err := os.FindProcess(ps.Pid)
|
||||
assert.NoError(t, err)
|
||||
err = proc.Signal(syscall.SIGKILL)
|
||||
assert.NoError(t, err)
|
||||
tu.WaitForResult(func() (bool, error) {
|
||||
ch, err := executor.Stats(context.Background(), time.Second)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
select {
|
||||
case <-time.After(time.Second):
|
||||
return false, fmt.Errorf("stats failed to send on interval")
|
||||
case ru := <-ch:
|
||||
if len(ru.Pids) == 0 {
|
||||
return false, fmt.Errorf("no pids recorded in stats")
|
||||
}
|
||||
}
|
||||
proc, err := os.FindProcess(ps.Pid)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
err = proc.Signal(syscall.SIGKILL)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
assert.NoError(t, err)
|
||||
executor.Shutdown("SIGINT", 0)
|
||||
})
|
||||
}()
|
||||
|
||||
ps, err = executor.Wait(context.Background())
|
||||
@@ -194,6 +216,8 @@ func TestExecutor_WaitExitSignal(pt *testing.T) {
|
||||
func TestExecutor_Start_Kill(pt *testing.T) {
|
||||
pt.Parallel()
|
||||
for name, factory := range executorFactories {
|
||||
name := name
|
||||
factory := factory
|
||||
pt.Run(name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
execCmd, allocDir := testExecutorCommand(t)
|
||||
@@ -296,7 +320,7 @@ func TestUniversalExecutor_LookupPath(t *testing.T) {
|
||||
// than mounting the underlying host filesystem
|
||||
func setupRootfs(t *testing.T, rootfs string) {
|
||||
paths := []string{
|
||||
"/bin/sh",
|
||||
// "/bin/sh",
|
||||
"/bin/sleep",
|
||||
"/bin/echo",
|
||||
"/bin/date",
|
||||
|
||||
@@ -6,8 +6,12 @@ import (
|
||||
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/hashicorp/nomad/drivers/shared/executor/proto"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
sproto "github.com/hashicorp/nomad/plugins/shared/structs/proto"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
type grpcExecutorServer struct {
|
||||
@@ -88,45 +92,42 @@ func (s *grpcExecutorServer) Version(context.Context, *proto.VersionRequest) (*p
|
||||
}
|
||||
|
||||
func (s *grpcExecutorServer) Stats(req *proto.StatsRequest, stream proto.Executor_StatsServer) error {
|
||||
ctx := stream.Context()
|
||||
|
||||
interval := time.Duration(req.Interval)
|
||||
if interval.Nanoseconds() == 0 {
|
||||
if interval == 0 {
|
||||
interval = time.Second
|
||||
}
|
||||
|
||||
outCh, err := s.impl.Stats(stream.Context(), time.Duration(req.Interval))
|
||||
if err != nil {
|
||||
if rec, ok := err.(structs.Recoverable); ok {
|
||||
st := status.New(codes.FailedPrecondition, rec.Error())
|
||||
st, err := st.WithDetails(&sproto.RecoverableError{Recoverable: rec.IsRecoverable()})
|
||||
if err != nil {
|
||||
// If this error, it will always error
|
||||
panic(err)
|
||||
}
|
||||
return st.Err()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
for resp := range outCh {
|
||||
pbStats, err := drivers.TaskStatsToProto(resp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case resp, ok := <-outCh:
|
||||
// chan closed, end stream
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
pbStats, err := drivers.TaskStatsToProto(resp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
presp := &proto.StatsResponse{
|
||||
Stats: pbStats,
|
||||
}
|
||||
|
||||
// Send the stats
|
||||
if err := stream.Send(presp); err != nil {
|
||||
return err
|
||||
}
|
||||
presp := &proto.StatsResponse{
|
||||
Stats: pbStats,
|
||||
}
|
||||
|
||||
// Send the stats
|
||||
if err := stream.Send(presp); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *grpcExecutorServer) Signal(ctx context.Context, req *proto.SignalRequest) (*proto.SignalResponse, error) {
|
||||
|
||||
@@ -3,7 +3,6 @@ package drivers
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
@@ -285,14 +284,7 @@ func (d *driverPluginClient) handleStats(ctx context.Context, ch chan<- *cstruct
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
d.logger.Error("error receiving stream from TaskStats driver RPC", "error", err)
|
||||
truErr := &cstructs.TaskResourceUsage{
|
||||
Err: grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx),
|
||||
}
|
||||
select {
|
||||
case ch <- truErr:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
d.logger.Error("error receiving stream from TaskStats driver RPC, closing stream", "error", err)
|
||||
}
|
||||
|
||||
// End of stream
|
||||
@@ -301,13 +293,8 @@ func (d *driverPluginClient) handleStats(ctx context.Context, ch chan<- *cstruct
|
||||
|
||||
stats, err := TaskStatsFromProto(resp.Stats)
|
||||
if err != nil {
|
||||
truErr := &cstructs.TaskResourceUsage{
|
||||
Err: fmt.Errorf("failed to decode stats from RPC: %v", err),
|
||||
}
|
||||
select {
|
||||
case ch <- truErr:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
d.logger.Error("failed to decode stats from RPC", "error", err, "stats", resp.Stats)
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
|
||||
Reference in New Issue
Block a user