mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 02:15:43 +03:00
Merge pull request #6820 from hashicorp/f-skip-docker-logging-knob
driver: allow disabling log collection
This commit is contained in:
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
@@ -28,6 +29,8 @@ const (
|
||||
|
||||
// logmonHook launches logmon and manages task logging
|
||||
type logmonHook struct {
|
||||
runner *TaskRunner
|
||||
|
||||
// logmon is the handle to the log monitor process for the task.
|
||||
logmon logmon.LogMon
|
||||
logmonPluginClient *plugin.Client
|
||||
@@ -43,9 +46,10 @@ type logmonHookConfig struct {
|
||||
stderrFifo string
|
||||
}
|
||||
|
||||
func newLogMonHook(cfg *logmonHookConfig, logger hclog.Logger) *logmonHook {
|
||||
func newLogMonHook(tr *TaskRunner, logger hclog.Logger) *logmonHook {
|
||||
hook := &logmonHook{
|
||||
config: cfg,
|
||||
runner: tr,
|
||||
config: tr.logmonHookConfig,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
@@ -99,6 +103,11 @@ func reattachConfigFromHookData(data map[string]string) (*plugin.ReattachConfig,
|
||||
func (h *logmonHook) Prestart(ctx context.Context,
|
||||
req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
|
||||
|
||||
if h.isLoggingDisabled() {
|
||||
h.logger.Debug("logging is disabled by driver")
|
||||
return nil
|
||||
}
|
||||
|
||||
attempts := 0
|
||||
for {
|
||||
err := h.prestartOneLoop(ctx, req)
|
||||
@@ -130,6 +139,16 @@ func (h *logmonHook) Prestart(ctx context.Context,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *logmonHook) isLoggingDisabled() bool {
|
||||
ic, ok := h.runner.driver.(drivers.InternalCapabilitiesDriver)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
caps := ic.InternalCapabilities()
|
||||
return caps.DisableLogCollection
|
||||
}
|
||||
|
||||
func (h *logmonHook) prestartOneLoop(ctx context.Context, req *interfaces.TaskPrestartRequest) error {
|
||||
// attach to a running logmon if state indicates one
|
||||
if h.logmonPluginClient == nil {
|
||||
|
||||
@@ -72,7 +72,8 @@ func TestTaskRunner_LogmonHook_StartStop(t *testing.T) {
|
||||
}()
|
||||
|
||||
hookConf := newLogMonHookConfig(task.Name, dir)
|
||||
hook := newLogMonHook(hookConf, testlog.HCLogger(t))
|
||||
runner := &TaskRunner{logmonHookConfig: hookConf}
|
||||
hook := newLogMonHook(runner, testlog.HCLogger(t))
|
||||
|
||||
req := interfaces.TaskPrestartRequest{
|
||||
Task: task,
|
||||
|
||||
@@ -36,7 +36,8 @@ func TestTaskRunner_LogmonHook_StartCrashStop(t *testing.T) {
|
||||
}()
|
||||
|
||||
hookConf := newLogMonHookConfig(task.Name, dir)
|
||||
hook := newLogMonHook(hookConf, testlog.HCLogger(t))
|
||||
runner := &TaskRunner{logmonHookConfig: hookConf}
|
||||
hook := newLogMonHook(runner, testlog.HCLogger(t))
|
||||
|
||||
req := interfaces.TaskPrestartRequest{
|
||||
Task: task,
|
||||
@@ -104,7 +105,8 @@ func TestTaskRunner_LogmonHook_ShutdownMidStart(t *testing.T) {
|
||||
}()
|
||||
|
||||
hookConf := newLogMonHookConfig(task.Name, dir)
|
||||
hook := newLogMonHook(hookConf, testlog.HCLogger(t))
|
||||
runner := &TaskRunner{logmonHookConfig: hookConf}
|
||||
hook := newLogMonHook(runner, testlog.HCLogger(t))
|
||||
|
||||
req := interfaces.TaskPrestartRequest{
|
||||
Task: task,
|
||||
|
||||
@@ -61,7 +61,7 @@ func (tr *TaskRunner) initHooks() {
|
||||
tr.runnerHooks = []interfaces.TaskHook{
|
||||
newValidateHook(tr.clientConfig, hookLogger),
|
||||
newTaskDirHook(tr, hookLogger),
|
||||
newLogMonHook(tr.logmonHookConfig, hookLogger),
|
||||
newLogMonHook(tr, hookLogger),
|
||||
newDispatchHook(alloc, hookLogger),
|
||||
newVolumeHook(tr, hookLogger),
|
||||
newArtifactHook(tr, hookLogger),
|
||||
|
||||
@@ -257,6 +257,10 @@ var (
|
||||
hclspec.NewAttr("infra_image", "string", false),
|
||||
hclspec.NewLiteral(`"gcr.io/google_containers/pause-amd64:3.0"`),
|
||||
),
|
||||
|
||||
// disable_log_collection indicates whether docker driver should collect logs of docker
|
||||
// task containers. If true, nomad doesn't start docker_logger/logmon processes
|
||||
"disable_log_collection": hclspec.NewAttr("disable_log_collection", "bool", false),
|
||||
})
|
||||
|
||||
// taskConfigSpec is the hcl specification for the driver config section of
|
||||
@@ -549,15 +553,16 @@ type ContainerGCConfig struct {
|
||||
}
|
||||
|
||||
type DriverConfig struct {
|
||||
Endpoint string `codec:"endpoint"`
|
||||
Auth AuthConfig `codec:"auth"`
|
||||
TLS TLSConfig `codec:"tls"`
|
||||
GC GCConfig `codec:"gc"`
|
||||
Volumes VolumeConfig `codec:"volumes"`
|
||||
AllowPrivileged bool `codec:"allow_privileged"`
|
||||
AllowCaps []string `codec:"allow_caps"`
|
||||
GPURuntimeName string `codec:"nvidia_runtime"`
|
||||
InfraImage string `codec:"infra_image"`
|
||||
Endpoint string `codec:"endpoint"`
|
||||
Auth AuthConfig `codec:"auth"`
|
||||
TLS TLSConfig `codec:"tls"`
|
||||
GC GCConfig `codec:"gc"`
|
||||
Volumes VolumeConfig `codec:"volumes"`
|
||||
AllowPrivileged bool `codec:"allow_privileged"`
|
||||
AllowCaps []string `codec:"allow_caps"`
|
||||
GPURuntimeName string `codec:"nvidia_runtime"`
|
||||
InfraImage string `codec:"infra_image"`
|
||||
DisableLogCollection bool `codec:"disable_log_collection"`
|
||||
}
|
||||
|
||||
type AuthConfig struct {
|
||||
@@ -660,3 +665,11 @@ func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
|
||||
func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
|
||||
return capabilities, nil
|
||||
}
|
||||
|
||||
var _ drivers.InternalCapabilitiesDriver = (*Driver)(nil)
|
||||
|
||||
func (d *Driver) InternalCapabilities() drivers.InternalCapabilities {
|
||||
return drivers.InternalCapabilities{
|
||||
DisableLogCollection: d.config != nil && d.config.DisableLogCollection,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/helper/pluginutils/hclutils"
|
||||
"github.com/hashicorp/nomad/plugins/drivers"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@@ -488,3 +489,38 @@ func TestConfig_DriverConfig_DanglingContainers(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfig_InternalCapabilities(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
config string
|
||||
expected drivers.InternalCapabilities
|
||||
}{
|
||||
{
|
||||
name: "pure default",
|
||||
config: `{}`,
|
||||
expected: drivers.InternalCapabilities{},
|
||||
},
|
||||
{
|
||||
name: "disabled",
|
||||
config: `{ disable_log_collection = true }`,
|
||||
expected: drivers.InternalCapabilities{DisableLogCollection: true},
|
||||
},
|
||||
{
|
||||
name: "enabled explicitly",
|
||||
config: `{ disable_log_collection = false }`,
|
||||
expected: drivers.InternalCapabilities{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
var tc DriverConfig
|
||||
hclutils.NewConfigParser(configSpec).ParseHCL(t, "config "+c.config, &tc)
|
||||
|
||||
d := &Driver{config: &tc}
|
||||
require.Equal(t, c.expected, d.InternalCapabilities())
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -209,23 +209,25 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
|
||||
net: handleState.DriverNetwork,
|
||||
}
|
||||
|
||||
h.dlogger, h.dloggerPluginClient, err = d.reattachToDockerLogger(handleState.ReattachConfig)
|
||||
if err != nil {
|
||||
d.logger.Warn("failed to reattach to docker logger process", "error", err)
|
||||
|
||||
h.dlogger, h.dloggerPluginClient, err = d.setupNewDockerLogger(container, handle.Config, time.Now())
|
||||
if !d.config.DisableLogCollection {
|
||||
h.dlogger, h.dloggerPluginClient, err = d.reattachToDockerLogger(handleState.ReattachConfig)
|
||||
if err != nil {
|
||||
if err := client.StopContainer(handleState.ContainerID, 0); err != nil {
|
||||
d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
|
||||
}
|
||||
return fmt.Errorf("failed to setup replacement docker logger: %v", err)
|
||||
}
|
||||
d.logger.Warn("failed to reattach to docker logger process", "error", err)
|
||||
|
||||
if err := handle.SetDriverState(h.buildState()); err != nil {
|
||||
if err := client.StopContainer(handleState.ContainerID, 0); err != nil {
|
||||
d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
|
||||
h.dlogger, h.dloggerPluginClient, err = d.setupNewDockerLogger(container, handle.Config, time.Now())
|
||||
if err != nil {
|
||||
if err := client.StopContainer(handleState.ContainerID, 0); err != nil {
|
||||
d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
|
||||
}
|
||||
return fmt.Errorf("failed to setup replacement docker logger: %v", err)
|
||||
}
|
||||
|
||||
if err := handle.SetDriverState(h.buildState()); err != nil {
|
||||
if err := client.StopContainer(handleState.ContainerID, 0); err != nil {
|
||||
d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
|
||||
}
|
||||
return fmt.Errorf("failed to store driver state: %v", err)
|
||||
}
|
||||
return fmt.Errorf("failed to store driver state: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -334,11 +336,18 @@ CREATE:
|
||||
container.ID, "container_state", container.State.String())
|
||||
}
|
||||
|
||||
dlogger, pluginClient, err := d.setupNewDockerLogger(container, cfg, time.Unix(0, 0))
|
||||
if err != nil {
|
||||
d.logger.Error("an error occurred after container startup, terminating container", "container_id", container.ID)
|
||||
client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true})
|
||||
return nil, nil, err
|
||||
collectingLogs := !d.config.DisableLogCollection
|
||||
|
||||
var dlogger docklog.DockerLogger
|
||||
var pluginClient *plugin.Client
|
||||
|
||||
if collectingLogs {
|
||||
dlogger, pluginClient, err = d.setupNewDockerLogger(container, cfg, time.Unix(0, 0))
|
||||
if err != nil {
|
||||
d.logger.Error("an error occurred after container startup, terminating container", "container_id", container.ID)
|
||||
client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true})
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Detect container address
|
||||
@@ -368,8 +377,10 @@ CREATE:
|
||||
|
||||
if err := handle.SetDriverState(h.buildState()); err != nil {
|
||||
d.logger.Error("error encoding container occurred after startup, terminating container", "container_id", container.ID, "error", err)
|
||||
dlogger.Stop()
|
||||
pluginClient.Kill()
|
||||
if collectingLogs {
|
||||
dlogger.Stop()
|
||||
pluginClient.Kill()
|
||||
}
|
||||
client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true})
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
@@ -51,11 +51,14 @@ type taskHandleState struct {
|
||||
}
|
||||
|
||||
func (h *taskHandle) buildState() *taskHandleState {
|
||||
return &taskHandleState{
|
||||
ReattachConfig: pstructs.ReattachConfigFromGoPlugin(h.dloggerPluginClient.ReattachConfig()),
|
||||
ContainerID: h.containerID,
|
||||
DriverNetwork: h.net,
|
||||
s := &taskHandleState{
|
||||
ContainerID: h.containerID,
|
||||
DriverNetwork: h.net,
|
||||
}
|
||||
if h.dloggerPluginClient != nil {
|
||||
s.ReattachConfig = pstructs.ReattachConfigFromGoPlugin(h.dloggerPluginClient.ReattachConfig())
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (h *taskHandle) Exec(ctx context.Context, cmd string, args []string) (*drivers.ExecTaskResult, error) {
|
||||
@@ -171,6 +174,10 @@ func (h *taskHandle) Kill(killTimeout time.Duration, signal os.Signal) error {
|
||||
}
|
||||
|
||||
func (h *taskHandle) shutdownLogger() {
|
||||
if h.dlogger == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.dlogger.Stop(); err != nil {
|
||||
h.logger.Error("failed to stop docker logger process during StopTask",
|
||||
"error", err, "logger_pid", h.dloggerPluginClient.ReattachConfig().Pid)
|
||||
|
||||
@@ -527,3 +527,18 @@ type ExecTaskStream interface {
|
||||
|
||||
type ExecTaskStreamingRequestMsg = proto.ExecTaskStreamingRequest
|
||||
type ExecTaskStreamingResponseMsg = proto.ExecTaskStreamingResponse
|
||||
|
||||
// InternalCapabilitiesDriver is an experimental interface enabling a driver
|
||||
// to disable some nomad functionality (e.g. logs or metrics).
|
||||
//
|
||||
// Intended for internal drivers only while the interface is stabalized.
|
||||
type InternalCapabilitiesDriver interface {
|
||||
InternalCapabilities() InternalCapabilities
|
||||
}
|
||||
|
||||
// InternalCapabilities flags disabled functionality.
|
||||
// Zero value means all is supported.
|
||||
type InternalCapabilities struct {
|
||||
DisableLogCollection bool
|
||||
DisableMetricsCollection bool
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user