diff --git a/api/tasks.go b/api/tasks.go index 7fa476b72..cdcf4e5d4 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -74,6 +74,12 @@ func (g *TaskGroup) AddTask(t *Task) *TaskGroup { return g } +// LogConfig provides configuration for log rotation +type LogConfig struct { + MaxFiles int + MaxFileSizeMB int +} + // Task is a single process in a task group. type Task struct { Name string @@ -85,6 +91,7 @@ type Task struct { Resources *Resources Meta map[string]string KillTimeout time.Duration + LogConfig *LogConfig } // NewTask creates and initializes a new Task. @@ -126,6 +133,12 @@ func (t *Task) Constrain(c *Constraint) *Task { return t } +// SetLogConfig sets a log config to a task +func (t *Task) SetLogConfig(l *LogConfig) *Task { + t.LogConfig = l + return t +} + // TaskState tracks the current state of a task and events that caused state // transistions. type TaskState struct { diff --git a/api/util_test.go b/api/util_test.go index 30feafdd3..190e0f3e7 100644 --- a/api/util_test.go +++ b/api/util_test.go @@ -26,6 +26,10 @@ func testJob() *Job { MemoryMB: 256, DiskMB: 25, IOPS: 10, + }). + SetLogConfig(&LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, }) group := NewTaskGroup("group1", 1). diff --git a/client/driver/docker.go b/client/driver/docker.go index c50c2b3d8..60bce0fc9 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -6,6 +6,7 @@ import ( "log" "net" "os" + "os/exec" "path/filepath" "strconv" "strings" @@ -14,10 +15,14 @@ import ( docker "github.com/fsouza/go-dockerclient" + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/driver/logcollector" cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" + "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/mapstructure" ) @@ -69,12 +74,15 @@ func (c *DockerDriverConfig) Validate() error { } type dockerPID struct { - ImageID string - ContainerID string - KillTimeout time.Duration + ImageID string + ContainerID string + KillTimeout time.Duration + PluginConfig *PluginReattachConfig } type DockerHandle struct { + pluginClient *plugin.Client + logCollector logcollector.LogCollector client *docker.Client logger *log.Logger cleanupContainer bool @@ -173,7 +181,8 @@ func (d *DockerDriver) containerBinds(alloc *allocdir.AllocDir, task *structs.Ta } // createContainer initializes a struct needed to call docker.client.CreateContainer() -func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, driverConfig *DockerDriverConfig) (docker.CreateContainerOptions, error) { +func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, + driverConfig *DockerDriverConfig, syslogAddr string) (docker.CreateContainerOptions, error) { var c docker.CreateContainerOptions if task.Resources == nil { // Guard against missing resources. We should never have been able to @@ -230,6 +239,12 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, dri // local directory for storage and a shared alloc directory that can be // used to share data between different tasks in the same task group. Binds: binds, + LogConfig: docker.LogConfig{ + Type: "syslog", + Config: map[string]string{ + "syslog-address": fmt.Sprintf("tcp://%v", syslogAddr), + }, + }, } d.logger.Printf("[DEBUG] driver.docker: using %d bytes memory for %s", hostConfig.Memory, task.Config["image"]) @@ -467,11 +482,44 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err) } } + + taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] + if !ok { + return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) + } + d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID) - config, err := d.createContainer(ctx, task, &driverConfig) + bin, err := discover.NomadExecutable() + if err != nil { + return nil, fmt.Errorf("unable to find the nomad binary: %v", err) + } + pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-syslog-collector.out", task.Name)) + pluginConfig := &plugin.ClientConfig{ + Cmd: exec.Command(bin, "syslog", pluginLogFile), + } + + logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config) + if err != nil { + return nil, err + } + logCollectorCtx := &logcollector.LogCollectorContext{ + TaskName: task.Name, + AllocDir: ctx.AllocDir, + LogConfig: task.LogConfig, + PortLowerBound: d.config.ClientMinPort, + PortUpperBound: d.config.ClientMaxPort, + } + ss, err := logCollector.LaunchCollector(logCollectorCtx) + if err != nil { + return nil, fmt.Errorf("failed to start syslog collector: %v", err) + } + d.logger.Printf("Started the syslog server at %v", ss.Addr) + + config, err := d.createContainer(ctx, task, &driverConfig, ss.Addr) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err) + pluginClient.Kill() return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", image, err) } // Create a container @@ -490,12 +538,14 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle }) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to query list of containers matching name:%s", config.Name) + pluginClient.Kill() return nil, fmt.Errorf("Failed to query list of containers: %s", err) } // Couldn't find any matching containers if len(containers) == 0 { d.logger.Printf("[ERR] driver.docker: failed to get id for container %s: %#v", config.Name, containers) + pluginClient.Kill() return nil, fmt.Errorf("Failed to get id for container %s", config.Name) } @@ -507,6 +557,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle }) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to purge container %s", container.ID) + pluginClient.Kill() return nil, fmt.Errorf("Failed to purge container %s: %s", container.ID, err) } d.logger.Printf("[INFO] driver.docker: purged container %s", container.ID) @@ -515,11 +566,13 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle container, err = client.CreateContainer(config) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to re-create container %s; aborting", config.Name) + pluginClient.Kill() return nil, fmt.Errorf("Failed to re-create container %s; aborting", config.Name) } } else { // We failed to create the container for some other reason. d.logger.Printf("[ERR] driver.docker: failed to create container from image %s: %s", image, err) + pluginClient.Kill() return nil, fmt.Errorf("Failed to create container from image %s: %s", image, err) } } @@ -529,6 +582,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle err = client.StartContainer(container.ID, container.HostConfig) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to start container %s: %s", container.ID, err) + pluginClient.Kill() return nil, fmt.Errorf("Failed to start container %s: %s", container.ID, err) } d.logger.Printf("[INFO] driver.docker: started container %s", container.ID) @@ -536,6 +590,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle // Return a driver handle h := &DockerHandle{ client: client, + logCollector: logCollector, + pluginClient: pluginClient, cleanupContainer: cleanupContainer, cleanupImage: cleanupImage, logger: d.logger, @@ -560,8 +616,10 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) } d.logger.Printf("[INFO] driver.docker: re-attaching to docker process: %s", handleID) + pluginConfig := &plugin.ClientConfig{ + Reattach: pid.PluginConfig.PluginConfig(), + } - // Initialize docker API client client, err := d.dockerClient() if err != nil { return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) @@ -586,10 +644,20 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er if !found { return nil, fmt.Errorf("Failed to find container %s: %v", pid.ContainerID, err) } + logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config) + if err != nil { + d.logger.Printf("[INFO] driver.docker: couldn't re-attach to the plugin process: %v", err) + if e := client.StopContainer(pid.ContainerID, uint(pid.KillTimeout*time.Second)); e != nil { + d.logger.Printf("[DEBUG] driver.docker: couldn't stop container: %v", e) + } + return nil, err + } // Return a driver handle h := &DockerHandle{ client: client, + logCollector: logCollector, + pluginClient: pluginClient, cleanupContainer: cleanupContainer, cleanupImage: cleanupImage, logger: d.logger, @@ -606,9 +674,10 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er func (h *DockerHandle) ID() string { // Return a handle to the PID pid := dockerPID{ - ImageID: h.imageID, - ContainerID: h.containerID, - KillTimeout: h.killTimeout, + ImageID: h.imageID, + ContainerID: h.containerID, + KillTimeout: h.killTimeout, + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), } data, err := json.Marshal(pid) if err != nil { @@ -628,6 +697,9 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult { func (h *DockerHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = task.KillTimeout + if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil { + h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err) + } // Update is not possible return nil @@ -699,4 +771,10 @@ func (h *DockerHandle) run() { close(h.doneCh) h.waitCh <- cstructs.NewWaitResult(exitCode, 0, err) close(h.waitCh) + + // Shutdown the syslog collector + if err := h.logCollector.Exit(); err != nil { + h.logger.Printf("[ERR] driver.docker: failed to kill the syslog collector: %v", err) + } + h.pluginClient.Kill() } diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 0eb09ce2e..281076818 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -4,6 +4,8 @@ import ( "fmt" "io/ioutil" "math/rand" + "os" + "os/exec" "path/filepath" "reflect" "runtime/debug" @@ -11,9 +13,11 @@ import ( "time" docker "github.com/fsouza/go-dockerclient" + "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/env" cstructs "github.com/hashicorp/nomad/client/driver/structs" + "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" ) @@ -67,6 +71,10 @@ func dockerTask() (*structs.Task, int, int) { Config: map[string]interface{}{ "image": "redis", }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: &structs.Resources{ MemoryMB: 256, CPU: 512, @@ -125,16 +133,37 @@ func dockerSetup(t *testing.T, task *structs.Task) (*docker.Client, DriverHandle func TestDockerDriver_Handle(t *testing.T) { t.Parallel() + + bin, err := discover.NomadExecutable() + if err != nil { + t.Fatalf("got an err: %v", err) + } + + f, _ := ioutil.TempFile(os.TempDir(), "") + defer f.Close() + defer os.Remove(f.Name()) + pluginConfig := &plugin.ClientConfig{ + Cmd: exec.Command(bin, "syslog", f.Name()), + } + logCollector, pluginClient, err := createLogCollector(pluginConfig, os.Stdout, &config.Config{}) + if err != nil { + t.Fatalf("got an err: %v", err) + } + defer pluginClient.Kill() + h := &DockerHandle{ - imageID: "imageid", - containerID: "containerid", - killTimeout: 5 * time.Nanosecond, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + imageID: "imageid", + logCollector: logCollector, + pluginClient: pluginClient, + containerID: "containerid", + killTimeout: 5 * time.Nanosecond, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } actual := h.ID() - expected := `DOCKER:{"ImageID":"imageid","ContainerID":"containerid","KillTimeout":5}` + expected := fmt.Sprintf("DOCKER:{\"ImageID\":\"imageid\",\"ContainerID\":\"containerid\",\"KillTimeout\":5,\"PluginConfig\":{\"Pid\":%d,\"AddrNet\":\"unix\",\"AddrName\":\"%s\"}}", + pluginClient.ReattachConfig().Pid, pluginClient.ReattachConfig().Addr.String()) if actual != expected { t.Errorf("Expected `%s`, found `%s`", expected, actual) } @@ -172,6 +201,10 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) { Config: map[string]interface{}{ "image": "redis", }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -211,6 +244,10 @@ func TestDockerDriver_Start_Wait(t *testing.T) { MemoryMB: 256, CPU: 512, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, } _, handle, cleanup := dockerSetup(t, task) @@ -254,6 +291,10 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { string(exp), env.AllocDir, file), }, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: &structs.Resources{ MemoryMB: 256, CPU: 512, @@ -303,6 +344,10 @@ func TestDockerDriver_Start_Kill_Wait(t *testing.T) { "command": "/bin/sleep", "args": []string{"10"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -437,6 +482,10 @@ func TestDockerHostNet(t *testing.T) { MemoryMB: 256, CPU: 512, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, } client, handle, cleanup := dockerSetup(t, task) diff --git a/client/driver/exec.go b/client/driver/exec.go index 35e73dd1b..c00a483b9 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -120,6 +120,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, AllocDir: ctx.AllocDir, TaskName: task.Name, TaskResources: task.Resources, + LogConfig: task.LogConfig, ResourceLimits: true, FSIsolation: true, UnprivilegedUser: true, @@ -153,7 +154,7 @@ type execId struct { TaskDir string AllocDir *allocdir.AllocDir IsolationConfig *executor.IsolationConfig - PluginConfig *ExecutorReattachConfig + PluginConfig *PluginReattachConfig } func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { @@ -203,7 +204,7 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro func (h *execHandle) ID() string { id := execId{ KillTimeout: h.killTimeout, - PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), UserPid: h.userPid, AllocDir: h.allocDir, IsolationConfig: h.isolationConfig, @@ -223,6 +224,7 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult { func (h *execHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = task.KillTimeout + h.executor.UpdateLogConfig(task.LogConfig) // Update is not possible return nil diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 85da6bfa5..dcfe56604 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -50,6 +50,10 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) { "command": "/bin/sleep", "args": []string{"5"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -87,6 +91,10 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) { "command": "/bin/sleep", "args": []string{"1000000"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -144,6 +152,10 @@ func TestExecDriver_Start_Wait(t *testing.T) { "command": "/bin/sleep", "args": []string{"2"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -188,6 +200,10 @@ func TestExecDriver_Start_Artifact_basic(t *testing.T) { "artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s?checksum=%s", file, checksum), "command": file, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -232,6 +248,10 @@ func TestExecDriver_Start_Artifact_expanded(t *testing.T) { "command": "/bin/bash", "args": []string{"-c", fmt.Sprintf("/bin/sleep 1 && %s", file)}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -278,6 +298,10 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) { fmt.Sprintf(`sleep 1; echo -n %s > ${%s}/%s`, string(exp), env.AllocDir, file), }, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -324,6 +348,10 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) { "command": "/bin/sleep", "args": []string{"100"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, KillTimeout: 10 * time.Second, } diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 2de8e7bca..7ab5bc96e 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -2,6 +2,7 @@ package executor import ( "fmt" + "io" "log" "os" "os/exec" @@ -17,13 +18,13 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/driver/env" + "github.com/hashicorp/nomad/client/driver/logrotator" "github.com/hashicorp/nomad/nomad/structs" ) // ExecutorContext holds context to configure the command user // wants to run and isolate it type ExecutorContext struct { - // TaskEnv holds information about the environment of a Task TaskEnv *env.TaskEnvironment @@ -48,6 +49,9 @@ type ExecutorContext struct { // UnprivilegedUser is a flag for drivers to make the process // run as nobody UnprivilegedUser bool + + // LogConfig provides the configuration related to log rotation + LogConfig *structs.LogConfig } // ExecCommand holds the user command and args. It's a lightweight replacement @@ -79,6 +83,7 @@ type Executor interface { Wait() (*ProcessState, error) ShutDown() error Exit() error + UpdateLogConfig(logConfig *structs.LogConfig) error } // UniversalExecutor is an implementation of the Executor which launches and @@ -92,6 +97,8 @@ type UniversalExecutor struct { groups *cgroupConfig.Cgroup exitState *ProcessState processExited chan interface{} + lre *logrotator.LogRotator + lro *logrotator.LogRotator logger *log.Logger lock sync.Mutex @@ -127,20 +134,29 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext } } - // configuring log rotate - stdoPath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", ctx.TaskName)) - stdo, err := os.OpenFile(stdoPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) - if err != nil { - return nil, err - } - e.cmd.Stdout = stdo + logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) - stdePath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", ctx.TaskName)) - stde, err := os.OpenFile(stdePath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + stdor, stdow := io.Pipe() + lro, err := logrotator.NewLogRotator(filepath.Join(e.taskDir, allocdir.TaskLocal), + fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles, + logFileSize, e.logger) if err != nil { - return nil, err + return nil, fmt.Errorf("error creating log rotator for stdout of task %v", err) } - e.cmd.Stderr = stde + e.cmd.Stdout = stdow + e.lro = lro + go lro.Start(stdor) + + stder, stdew := io.Pipe() + lre, err := logrotator.NewLogRotator(filepath.Join(e.taskDir, allocdir.TaskLocal), + fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles, + logFileSize, e.logger) + if err != nil { + return nil, fmt.Errorf("error creating log rotator for stderr of task %v", err) + } + e.cmd.Stderr = stdew + e.lre = lre + go lre.Start(stder) // setting the env, path and args for the command e.ctx.TaskEnv.Build() @@ -169,6 +185,23 @@ func (e *UniversalExecutor) Wait() (*ProcessState, error) { return e.exitState, nil } +// UpdateLogConfig updates the log configuration +func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error { + e.ctx.LogConfig = logConfig + if e.lro == nil { + return fmt.Errorf("log rotator for stdout doesn't exist") + } + e.lro.MaxFiles = logConfig.MaxFiles + e.lro.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) + + if e.lre == nil { + return fmt.Errorf("log rotator for stderr doesn't exist") + } + e.lre.MaxFiles = logConfig.MaxFiles + e.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) + return nil +} + func (e *UniversalExecutor) wait() { defer close(e.processExited) err := e.cmd.Wait() diff --git a/client/driver/executor/executor_test.go b/client/driver/executor/executor_test.go index 26876e7e9..609e7e64a 100644 --- a/client/driver/executor/executor_test.go +++ b/client/driver/executor/executor_test.go @@ -50,6 +50,7 @@ func testExecutorContext(t *testing.T) *ExecutorContext { TaskName: taskName, AllocDir: allocDir, TaskResources: constraint, + LogConfig: structs.DefaultLogConfig(), } return ctx } @@ -84,7 +85,7 @@ func TestExecutor_Start_Wait_Failure_Code(t *testing.T) { func TestExecutor_Start_Wait(t *testing.T) { execCmd := ExecCommand{Cmd: "/bin/echo", Args: []string{"hello world"}} ctx := testExecutorContext(t) - defer ctx.AllocDir.Destroy() + //defer ctx.AllocDir.Destroy() executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags)) ps, err := executor.LaunchCmd(&execCmd, ctx) if err != nil { @@ -105,7 +106,7 @@ func TestExecutor_Start_Wait(t *testing.T) { } expected := "hello world" - file := filepath.Join(allocdir.TaskLocal, "web.stdout") + file := filepath.Join(allocdir.TaskLocal, "web.stdout.0") absFilePath := filepath.Join(taskDir, file) output, err := ioutil.ReadFile(absFilePath) if err != nil { @@ -149,7 +150,7 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) { } expected := "hello world" - file := filepath.Join(allocdir.TaskLocal, "web.stdout") + file := filepath.Join(allocdir.TaskLocal, "web.stdout.0") absFilePath := filepath.Join(taskDir, file) output, err := ioutil.ReadFile(absFilePath) if err != nil { @@ -185,7 +186,7 @@ func TestExecutor_Start_Kill(t *testing.T) { t.Fatalf("No task directory found for task %v", task) } - file := filepath.Join(allocdir.TaskLocal, "web.stdout") + file := filepath.Join(allocdir.TaskLocal, "web.stdout.0") absFilePath := filepath.Join(taskDir, file) time.Sleep(time.Duration(tu.TestMultiplier()*2) * time.Second) diff --git a/client/driver/executor_plugin.go b/client/driver/executor_plugin.go index 5d1af4f5e..1189c19dd 100644 --- a/client/driver/executor_plugin.go +++ b/client/driver/executor_plugin.go @@ -1,51 +1,14 @@ package driver import ( - "io" "log" - "net" "net/rpc" "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/driver/executor" + "github.com/hashicorp/nomad/nomad/structs" ) -var HandshakeConfig = plugin.HandshakeConfig{ - ProtocolVersion: 1, - MagicCookieKey: "NOMAD_PLUGIN_MAGIC_COOKIE", - MagicCookieValue: "e4327c2e01eabfd75a8a67adb114fb34a757d57eee7728d857a8cec6e91a7255", -} - -func GetPluginMap(w io.Writer) map[string]plugin.Plugin { - p := new(ExecutorPlugin) - p.logger = log.New(w, "", log.LstdFlags) - return map[string]plugin.Plugin{"executor": p} -} - -// ExecutorReattachConfig is the config that we seralize and de-serialize and -// store in disk -type ExecutorReattachConfig struct { - Pid int - AddrNet string - AddrName string -} - -// PluginConfig returns a config from an ExecutorReattachConfig -func (c *ExecutorReattachConfig) PluginConfig() *plugin.ReattachConfig { - var addr net.Addr - switch c.AddrNet { - case "unix", "unixgram", "unixpacket": - addr, _ = net.ResolveUnixAddr(c.AddrNet, c.AddrName) - case "tcp", "tcp4", "tcp6": - addr, _ = net.ResolveTCPAddr(c.AddrNet, c.AddrName) - } - return &plugin.ReattachConfig{Pid: c.Pid, Addr: addr} -} - -func NewExecutorReattachConfig(c *plugin.ReattachConfig) *ExecutorReattachConfig { - return &ExecutorReattachConfig{Pid: c.Pid, AddrNet: c.Addr.Network(), AddrName: c.Addr.String()} -} - type ExecutorRPC struct { client *rpc.Client } @@ -76,6 +39,10 @@ func (e *ExecutorRPC) Exit() error { return e.client.Call("Plugin.Exit", new(interface{}), new(interface{})) } +func (e *ExecutorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error { + return e.client.Call("Plugin.UpdateLogConfig", logConfig, new(interface{})) +} + type ExecutorRPCServer struct { Impl executor.Executor } @@ -104,6 +71,10 @@ func (e *ExecutorRPCServer) Exit(args interface{}, resp *interface{}) error { return e.Impl.Exit() } +func (e *ExecutorRPCServer) UpdateLogConfig(args *structs.LogConfig, resp *interface{}) error { + return e.Impl.UpdateLogConfig(args) +} + type ExecutorPlugin struct { logger *log.Logger Impl *ExecutorRPCServer diff --git a/client/driver/java.go b/client/driver/java.go index 0c206c174..f7d724efc 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -167,9 +167,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, AllocDir: ctx.AllocDir, TaskName: task.Name, TaskResources: task.Resources, - FSIsolation: true, - ResourceLimits: true, UnprivilegedUser: true, + LogConfig: task.LogConfig, } ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: "java", Args: args}, executorCtx) if err != nil { @@ -198,7 +197,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, type javaId struct { KillTimeout time.Duration - PluginConfig *ExecutorReattachConfig + PluginConfig *PluginReattachConfig IsolationConfig *executor.IsolationConfig TaskDir string AllocDir *allocdir.AllocDir @@ -255,7 +254,7 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro func (h *javaHandle) ID() string { id := javaId{ KillTimeout: h.killTimeout, - PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), UserPid: h.userPid, TaskDir: h.taskDir, AllocDir: h.allocDir, @@ -276,6 +275,7 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult { func (h *javaHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = task.KillTimeout + h.executor.UpdateLogConfig(task.LogConfig) // Update is not possible return nil diff --git a/client/driver/java_test.go b/client/driver/java_test.go index a546c611f..7b3fe5076 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -58,6 +58,10 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) { "jvm_options": []string{"-Xmx64m", "-Xms32m"}, "checksum": "sha256:58d6e8130308d32e197c5108edd4f56ddf1417408f743097c2e662df0f0b17c8", }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -103,6 +107,10 @@ func TestJavaDriver_Start_Wait(t *testing.T) { "artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar", "checksum": "sha256:58d6e8130308d32e197c5108edd4f56ddf1417408f743097c2e662df0f0b17c8", }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } @@ -148,6 +156,10 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) { Config: map[string]interface{}{ "artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar", }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } diff --git a/client/driver/logcollector/collector.go b/client/driver/logcollector/collector.go new file mode 100644 index 000000000..fab901428 --- /dev/null +++ b/client/driver/logcollector/collector.go @@ -0,0 +1,185 @@ +package logcollector + +import ( + "fmt" + "io" + "log" + s1 "log/syslog" + "net" + "path/filepath" + + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/driver/executor" + "github.com/hashicorp/nomad/client/driver/logrotator" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/mcuadros/go-syslog" +) + +// LogCollectorContext holds context to configure the syslog server +type LogCollectorContext struct { + // TaskName is the name of the Task + TaskName string + + // AllocDir is the handle to do operations on the alloc dir of + // the task + AllocDir *allocdir.AllocDir + + // LogConfig provides configuration related to log rotation + LogConfig *structs.LogConfig + + // PortUpperBound is the upper bound of the ports that we can use to start + // the syslog server + PortUpperBound uint + + // PortLowerBound is the lower bound of the ports that we can use to start + // the syslog server + PortLowerBound uint +} + +// SyslogCollectorState holds the address and islation information of a launched +// syslog server +type SyslogCollectorState struct { + IsolationConfig *executor.IsolationConfig + Addr string +} + +// LogCollector is an interface which allows a driver to launch a log server +// and update log configuration +type LogCollector interface { + LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) + Exit() error + UpdateLogConfig(logConfig *structs.LogConfig) error +} + +// SyslogCollector is a LogCollector which starts a syslog server and does +// rotation to incoming stream +type SyslogCollector struct { + addr net.Addr + logConfig *structs.LogConfig + ctx *LogCollectorContext + + lro *logrotator.LogRotator + lre *logrotator.LogRotator + server *syslog.Server + taskDir string + + logger *log.Logger +} + +// NewSyslogCollector returns an implementation of the SyslogCollector +func NewSyslogCollector(logger *log.Logger) *SyslogCollector { + return &SyslogCollector{logger: logger} +} + +// LaunchCollector launches a new syslog server and starts writing log lines to +// files and rotates them +func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) { + addr, err := s.getFreePort(ctx.PortLowerBound, ctx.PortUpperBound) + if err != nil { + return nil, err + } + s.logger.Printf("[DEBUG] sylog-server: launching syslog server on addr: %v", addr) + s.ctx = ctx + // configuring the task dir + if err := s.configureTaskDir(); err != nil { + return nil, err + } + + channel := make(syslog.LogPartsChannel) + handler := syslog.NewChannelHandler(channel) + + s.server = syslog.NewServer() + s.server.SetFormat(&CustomParser{logger: s.logger}) + s.server.SetHandler(handler) + s.server.ListenTCP(addr.String()) + if err := s.server.Boot(); err != nil { + return nil, err + } + logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) + + ro, wo := io.Pipe() + lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal), + fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles, + logFileSize, s.logger) + if err != nil { + return nil, err + } + s.lro = lro + go lro.Start(ro) + + re, we := io.Pipe() + lre, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal), + fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles, + logFileSize, s.logger) + if err != nil { + return nil, err + } + s.lre = lre + go lre.Start(re) + + go func(channel syslog.LogPartsChannel) { + for logParts := range channel { + // If the severity of the log line is err then we write to stderr + // otherwise all messages go to stdout + s := logParts["severity"].(Priority) + if s.Severity == s1.LOG_ERR { + we.Write(logParts["content"].([]byte)) + } else { + wo.Write(logParts["content"].([]byte)) + } + wo.Write([]byte("\n")) + } + }(channel) + go s.server.Wait() + return &SyslogCollectorState{Addr: addr.String()}, nil +} + +// Exit kills the syslog server +func (s *SyslogCollector) Exit() error { + return s.server.Kill() +} + +// UpdateLogConfig updates the log configuration +func (s *SyslogCollector) UpdateLogConfig(logConfig *structs.LogConfig) error { + s.ctx.LogConfig = logConfig + if s.lro == nil { + return fmt.Errorf("log rotator for stdout doesn't exist") + } + s.lro.MaxFiles = logConfig.MaxFiles + s.lro.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) + + if s.lre == nil { + return fmt.Errorf("log rotator for stderr doesn't exist") + } + s.lre.MaxFiles = logConfig.MaxFiles + s.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) + return nil +} + +// configureTaskDir sets the task dir in the SyslogCollector +func (s *SyslogCollector) configureTaskDir() error { + taskDir, ok := s.ctx.AllocDir.TaskDirs[s.ctx.TaskName] + if !ok { + return fmt.Errorf("couldn't find task directory for task %v", s.ctx.TaskName) + } + s.taskDir = taskDir + return nil +} + +// getFreePort returns a free port ready to be listened on between upper and +// lower bounds +func (s *SyslogCollector) getFreePort(lowerBound uint, upperBound uint) (net.Addr, error) { + for i := lowerBound; i <= upperBound; i++ { + addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i)) + if err != nil { + return nil, err + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + continue + } + defer l.Close() + return l.Addr(), nil + } + return nil, fmt.Errorf("No free port found") +} diff --git a/client/driver/logcollector/parser.go b/client/driver/logcollector/parser.go new file mode 100644 index 000000000..eeb910a6f --- /dev/null +++ b/client/driver/logcollector/parser.go @@ -0,0 +1,159 @@ +package logcollector + +import ( + "bufio" + "fmt" + "log" + "log/syslog" + "strconv" + "time" + + "github.com/jeromer/syslogparser" +) + +// Errors related to parsing priority +var ( + ErrPriorityNoStart = fmt.Errorf("No start char found for priority") + ErrPriorityEmpty = fmt.Errorf("Priority field empty") + ErrPriorityNoEnd = fmt.Errorf("No end char found for priority") + ErrPriorityTooShort = fmt.Errorf("Priority field too short") + ErrPriorityTooLong = fmt.Errorf("Priority field too long") + ErrPriorityNonDigit = fmt.Errorf("Non digit found in priority") +) + +// Priority header and ending characters +const ( + PRI_PART_START = '<' + PRI_PART_END = '>' +) + +// Priority holds all the priority bits in a syslog log line +type Priority struct { + Pri int + Facility syslog.Priority + Severity syslog.Priority +} + +// DockerLogParser parses a line of log message that the docker daemon ships +type DockerLogParser struct { + line []byte + content []byte + severity Priority + + log *log.Logger +} + +// NewDockerLogParser creates a new DockerLogParser +func NewDockerLogParser(line []byte) *DockerLogParser { + return &DockerLogParser{line: line} +} + +// Parse parses a syslog log line +func (d *DockerLogParser) Parse() error { + severity, _, _ := d.parsePriority(d.line) + msgIdx := d.logContentIndex(d.line) + d.severity = severity + d.content = d.line[msgIdx:] + return nil +} + +// Dump creates a map of the parsed log line and severity +func (d *DockerLogParser) Dump() syslogparser.LogParts { + return map[string]interface{}{ + "content": d.content, + "severity": d.severity, + } +} + +// logContentIndex finds out the index of the start index of the content in a +// syslog line +func (d *DockerLogParser) logContentIndex(line []byte) int { + cursor := 0 + numSpace := 0 + for i := 0; i < len(line); i++ { + if line[i] == ' ' { + numSpace += 1 + if numSpace == 1 { + cursor = i + break + } + } + } + for i := cursor; i < len(line); i++ { + if line[i] == ':' { + cursor = i + break + } + } + return cursor + 1 +} + +// parsePriority parses the priority in a syslog message +func (d *DockerLogParser) parsePriority(line []byte) (Priority, int, error) { + cursor := 0 + pri := d.newPriority(0) + if len(line) <= 0 { + return pri, cursor, ErrPriorityEmpty + } + if line[cursor] != PRI_PART_START { + return pri, cursor, ErrPriorityNoStart + } + i := 1 + priDigit := 0 + for i < len(line) { + if i >= 5 { + return pri, cursor, ErrPriorityTooLong + } + c := line[i] + if c == PRI_PART_END { + if i == 1 { + return pri, cursor, ErrPriorityTooShort + } + cursor = i + 1 + return d.newPriority(priDigit), cursor, nil + } + if d.isDigit(c) { + v, e := strconv.Atoi(string(c)) + if e != nil { + return pri, cursor, e + } + priDigit = (priDigit * 10) + v + } else { + return pri, cursor, ErrPriorityNonDigit + } + i++ + } + return pri, cursor, ErrPriorityNoEnd +} + +// isDigit checks if a byte is a numeric char +func (d *DockerLogParser) isDigit(c byte) bool { + return c >= '0' && c <= '9' +} + +// newPriority creates a new default priority +func (d *DockerLogParser) newPriority(p int) Priority { + // The Priority value is calculated by first multiplying the Facility + // number by 8 and then adding the numerical value of the Severity. + return Priority{ + Pri: p, + Facility: syslog.Priority(p / 8), + Severity: syslog.Priority(p % 8), + } +} + +func (d *DockerLogParser) Location(location *time.Location) { +} + +// CustomParser is a parser to parse docker syslog lines +type CustomParser struct { + logger *log.Logger +} + +func (c *CustomParser) GetParser(line []byte) syslogparser.LogParser { + return NewDockerLogParser(line) +} + +func (c *CustomParser) GetSplitFunc() bufio.SplitFunc { + return nil +} diff --git a/client/driver/logcollector/parser_test.go b/client/driver/logcollector/parser_test.go new file mode 100644 index 000000000..2c62049c5 --- /dev/null +++ b/client/driver/logcollector/parser_test.go @@ -0,0 +1,24 @@ +package logcollector + +import ( + "log/syslog" + "testing" +) + +func TestLogParser_Priority(t *testing.T) { + line := []byte("<30>2016-02-10T10:16:43-08:00 d-thinkpad docker/e2a1e3ebd3a3[22950]: 1:C 10 Feb 18:16:43.391 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf") + d := NewDockerLogParser(line) + p, _, err := d.parsePriority(line) + if err != nil { + t.Fatalf("got an err: %v", err) + } + if p.Severity != syslog.LOG_INFO { + t.Fatalf("expected serverity: %v, got: %v", syslog.LOG_INFO, p.Severity) + } + + idx := d.logContentIndex(line) + expected := 68 + if idx != expected { + t.Fatalf("expected idx: %v, got: %v", expected, idx) + } +} diff --git a/client/driver/logs.go b/client/driver/logrotator/logs.go similarity index 61% rename from client/driver/logs.go rename to client/driver/logrotator/logs.go index 7297382cd..a9eecf983 100644 --- a/client/driver/logs.go +++ b/client/driver/logrotator/logs.go @@ -1,4 +1,4 @@ -package driver +package logrotator import ( "fmt" @@ -18,14 +18,16 @@ const ( // LogRotator ingests data and writes out to a rotated set of files type LogRotator struct { - maxFiles int // maximum number of rotated files retained by the log rotator - fileSize int64 // maximum file size of a rotated file + MaxFiles int // maximum number of rotated files retained by the log rotator + FileSize int64 // maximum file size of a rotated file path string // path where the rotated files are created fileName string // base file name of the rotated files - logFileIdx int // index to the current file + logFileIdx int // index to the current file + oldestLogFileIdx int // index to the oldest log file - logger *log.Logger + logger *log.Logger + purgeCh chan struct{} } // NewLogRotator configures and returns a new LogRotator @@ -51,14 +53,18 @@ func NewLogRotator(path string, fileName string, maxFiles int, fileSize int64, l } } - return &LogRotator{ - maxFiles: maxFiles, - fileSize: fileSize, + lr := &LogRotator{ + MaxFiles: maxFiles, + FileSize: fileSize, path: path, fileName: fileName, logFileIdx: logFileIdx, logger: logger, - }, nil + purgeCh: make(chan struct{}, 1), + } + go lr.PurgeOldFiles() + + return lr, nil } // Start reads from a Reader and writes them to files and rotates them when the @@ -67,15 +73,15 @@ func (l *LogRotator) Start(r io.Reader) error { buf := make([]byte, bufSize) for { logFileName := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, l.logFileIdx)) - remainingSize := l.fileSize + var fileSize int64 if f, err := os.Stat(logFileName); err == nil { // Skipping the current file if it happens to be a directory if f.IsDir() { l.logFileIdx += 1 continue } + fileSize = f.Size() // Calculating the remaining capacity of the log file - remainingSize = l.fileSize - f.Size() } f, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { @@ -84,17 +90,23 @@ func (l *LogRotator) Start(r io.Reader) error { l.logger.Printf("[DEBUG] client.logrotator: opened a new file: %s", logFileName) // Closing the current log file if it doesn't have any more capacity - if remainingSize <= 0 { - l.logFileIdx = l.logFileIdx + 1 + if fileSize >= l.FileSize { + l.logFileIdx += 1 f.Close() continue } // Reading from the reader and writing into the current log file as long // as it has capacity or the reader closes + totalWritten := 0 for { + if l.FileSize-(fileSize+int64(totalWritten)) < 1 { + f.Close() + break + } var nr int var err error + remainingSize := l.FileSize - (int64(totalWritten) + fileSize) if remainingSize < bufSize { nr, err = r.Read(buf[0:remainingSize]) } else { @@ -113,13 +125,16 @@ func (l *LogRotator) Start(r io.Reader) error { f.Close() return fmt.Errorf("failed to write data read from the reader into file, R: %d W: %d", nr, nw) } - remainingSize -= int64(nr) - if remainingSize < 1 { - f.Close() - break - } + totalWritten += nr } l.logFileIdx = l.logFileIdx + 1 + // Purge old files if we have more files than MaxFiles + if l.logFileIdx-l.oldestLogFileIdx >= l.MaxFiles { + select { + case l.purgeCh <- struct{}{}: + default: + } + } } return nil } @@ -127,29 +142,36 @@ func (l *LogRotator) Start(r io.Reader) error { // PurgeOldFiles removes older files and keeps only the last N files rotated for // a file func (l *LogRotator) PurgeOldFiles() { - var fIndexes []int - files, err := ioutil.ReadDir(l.path) - if err != nil { - return - } - // Inserting all the rotated files in a slice - for _, f := range files { - if strings.HasPrefix(f.Name(), l.fileName) { - fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName)) - n, err := strconv.Atoi(fileIdx) + for { + select { + case <-l.purgeCh: + var fIndexes []int + files, err := ioutil.ReadDir(l.path) if err != nil { - continue + return } - fIndexes = append(fIndexes, n) + // Inserting all the rotated files in a slice + for _, f := range files { + if strings.HasPrefix(f.Name(), l.fileName) { + fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName)) + n, err := strconv.Atoi(fileIdx) + if err != nil { + continue + } + fIndexes = append(fIndexes, n) + } + } + + // Sorting the file indexes so that we can purge the older files and keep + // only the number of files as configured by the user + sort.Sort(sort.IntSlice(fIndexes)) + var toDelete []int + toDelete = fIndexes[0 : len(fIndexes)-l.MaxFiles] + for _, fIndex := range toDelete { + fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex)) + os.RemoveAll(fname) + } + l.oldestLogFileIdx = fIndexes[0] } } - - // Sorting the file indexes so that we can purge the older files and keep - // only the number of files as configured by the user - sort.Sort(sort.IntSlice(fIndexes)) - toDelete := fIndexes[l.maxFiles-1 : len(fIndexes)-1] - for _, fIndex := range toDelete { - fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex)) - os.RemoveAll(fname) - } } diff --git a/client/driver/logs_test.go b/client/driver/logrotator/logs_test.go similarity index 80% rename from client/driver/logs_test.go rename to client/driver/logrotator/logs_test.go index d45ae5fdd..b497ab993 100644 --- a/client/driver/logs_test.go +++ b/client/driver/logrotator/logs_test.go @@ -1,12 +1,14 @@ -package driver +package logrotator import ( + "fmt" "io" "io/ioutil" "log" "os" "path/filepath" "testing" + "time" ) var ( @@ -228,7 +230,10 @@ func TestLogRotator_PurgeDirs(t *testing.T) { r, w := io.Pipe() go func() { - w.Write([]byte("abcdefghijklmno")) + w.Write([]byte("abcdefghijklmnopqrxyz")) + time.Sleep(1 * time.Second) + l.MaxFiles = 1 + w.Write([]byte("abcdefghijklmnopqrxyz")) w.Close() }() @@ -236,13 +241,56 @@ func TestLogRotator_PurgeDirs(t *testing.T) { if err != nil && err != io.EOF { t.Fatalf("failure in logrotator start: %v", err) } - l.PurgeOldFiles() + // sleeping for a second because purging is async + time.Sleep(1 * time.Second) files, err := ioutil.ReadDir(path) if err != nil { t.Fatalf("err: %v", err) } - if len(files) != 2 { - t.Fatalf("expected number of files: %v, actual: %v", 2, len(files)) + expected := 1 + if len(files) != expected { + t.Fatalf("expected number of files: %v, actual: %v", expected, len(files)) + } +} + +func TestLogRotator_UpdateConfig(t *testing.T) { + var path string + var err error + defer os.RemoveAll(path) + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + + l, err := NewLogRotator(path, "redis.stdout", 10, 10, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + + r, w := io.Pipe() + go func() { + w.Write([]byte("abcdefg")) + l.FileSize = 5 + w.Write([]byte("hijklmnojkp")) + w.Close() + }() + err = l.Start(r) + if err != nil && err != io.EOF { + t.Fatalf("Failure in logrotator start %v", err) + } + + finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0")) + finfo1, err1 := os.Stat(filepath.Join(path, "redis.stdout.1")) + if err != nil { + t.Fatal(err) + } + if finfo.Size() != 10 { + t.Fatalf("expected size of file: %v, actual: %v", 7, finfo.Size()) + } + if err1 != nil { + t.Fatal(err) + } + if finfo1.Size() != 5 { + t.Fatalf("expected size of file: %v, actual: %v", 5, finfo.Size()) } } diff --git a/client/driver/plugins.go b/client/driver/plugins.go new file mode 100644 index 000000000..4808d81e1 --- /dev/null +++ b/client/driver/plugins.go @@ -0,0 +1,51 @@ +package driver + +import ( + "io" + "log" + "net" + + "github.com/hashicorp/go-plugin" +) + +var HandshakeConfig = plugin.HandshakeConfig{ + ProtocolVersion: 1, + MagicCookieKey: "NOMAD_PLUGIN_MAGIC_COOKIE", + MagicCookieValue: "e4327c2e01eabfd75a8a67adb114fb34a757d57eee7728d857a8cec6e91a7255", +} + +func GetPluginMap(w io.Writer) map[string]plugin.Plugin { + e := new(ExecutorPlugin) + e.logger = log.New(w, "", log.LstdFlags) + + s := new(SyslogCollectorPlugin) + s.logger = log.New(w, "", log.LstdFlags) + return map[string]plugin.Plugin{ + "executor": e, + "syslogcollector": s, + } +} + +// ExecutorReattachConfig is the config that we seralize and de-serialize and +// store in disk +type PluginReattachConfig struct { + Pid int + AddrNet string + AddrName string +} + +// PluginConfig returns a config from an ExecutorReattachConfig +func (c *PluginReattachConfig) PluginConfig() *plugin.ReattachConfig { + var addr net.Addr + switch c.AddrNet { + case "unix", "unixgram", "unixpacket": + addr, _ = net.ResolveUnixAddr(c.AddrNet, c.AddrName) + case "tcp", "tcp4", "tcp6": + addr, _ = net.ResolveTCPAddr(c.AddrNet, c.AddrName) + } + return &plugin.ReattachConfig{Pid: c.Pid, Addr: addr} +} + +func NewPluginReattachConfig(c *plugin.ReattachConfig) *PluginReattachConfig { + return &PluginReattachConfig{Pid: c.Pid, AddrNet: c.Addr.Network(), AddrName: c.Addr.String()} +} diff --git a/client/driver/qemu.go b/client/driver/qemu.go index deae7d273..54e28bbbd 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -208,6 +208,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, AllocDir: ctx.AllocDir, TaskName: task.Name, TaskResources: task.Resources, + LogConfig: task.LogConfig, } ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: args[0], Args: args[1:]}, executorCtx) if err != nil { @@ -235,7 +236,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, type qemuId struct { KillTimeout time.Duration UserPid int - PluginConfig *ExecutorReattachConfig + PluginConfig *PluginReattachConfig AllocDir *allocdir.AllocDir } @@ -276,7 +277,7 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro func (h *qemuHandle) ID() string { id := qemuId{ KillTimeout: h.killTimeout, - PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), UserPid: h.userPid, AllocDir: h.allocDir, } @@ -295,6 +296,7 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult { func (h *qemuHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = task.KillTimeout + h.executor.UpdateLogConfig(task.LogConfig) // Update is not possible return nil diff --git a/client/driver/qemu_test.go b/client/driver/qemu_test.go index 151fcf7f9..ff163f975 100644 --- a/client/driver/qemu_test.go +++ b/client/driver/qemu_test.go @@ -49,6 +49,10 @@ func TestQemuDriver_StartOpen_Wait(t *testing.T) { "web": 8080, }}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: &structs.Resources{ CPU: 500, MemoryMB: 512, @@ -101,6 +105,10 @@ func TestQemuDriver_RequiresMemory(t *testing.T) { "checksum": "sha256:a5e836985934c3392cbbd9b26db55a7d35a8d7ae1deb7ca559dd9c0159572544", // ssh u/p would be here }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, } driverCtx, execCtx := testDriverContexts(task) diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index fe25ecb04..6c8adace2 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -114,6 +114,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl AllocDir: ctx.AllocDir, TaskName: task.Name, TaskResources: task.Resources, + LogConfig: task.LogConfig, } ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx) if err != nil { @@ -140,7 +141,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl type rawExecId struct { KillTimeout time.Duration UserPid int - PluginConfig *ExecutorReattachConfig + PluginConfig *PluginReattachConfig AllocDir *allocdir.AllocDir } @@ -180,7 +181,7 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e func (h *rawExecHandle) ID() string { id := rawExecId{ KillTimeout: h.killTimeout, - PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()), + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), UserPid: h.userPid, AllocDir: h.allocDir, } @@ -199,6 +200,7 @@ func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult { func (h *rawExecHandle) Update(task *structs.Task) error { // Store the updated kill timeout. h.killTimeout = task.KillTimeout + h.executor.UpdateLogConfig(task.LogConfig) // Update is not possible return nil diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go index a09010d3e..942780ae6 100644 --- a/client/driver/raw_exec_test.go +++ b/client/driver/raw_exec_test.go @@ -61,6 +61,10 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) { "command": testtask.Path(), "args": []string{"sleep", "1s"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } testtask.SetTaskEnv(task) @@ -109,6 +113,10 @@ func TestRawExecDriver_Start_Artifact_basic(t *testing.T) { "command": file, "args": []string{"sleep", "1s"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } testtask.SetTaskEnv(task) @@ -156,6 +164,10 @@ func TestRawExecDriver_Start_Artifact_expanded(t *testing.T) { "command": file, "args": []string{"sleep", "1s"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } testtask.SetTaskEnv(task) @@ -197,6 +209,10 @@ func TestRawExecDriver_Start_Wait(t *testing.T) { "command": testtask.Path(), "args": []string{"sleep", "1s"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } testtask.SetTaskEnv(task) @@ -243,6 +259,10 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) { "write", string(exp), outPath, }, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } testtask.SetTaskEnv(task) @@ -289,6 +309,10 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) { "command": testtask.Path(), "args": []string{"sleep", "45s"}, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, Resources: basicResources, } testtask.SetTaskEnv(task) diff --git a/client/driver/syslog_plugin.go b/client/driver/syslog_plugin.go new file mode 100644 index 000000000..b4cee69fb --- /dev/null +++ b/client/driver/syslog_plugin.go @@ -0,0 +1,69 @@ +package driver + +import ( + "log" + "net/rpc" + + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/driver/logcollector" + "github.com/hashicorp/nomad/nomad/structs" +) + +type SyslogCollectorRPC struct { + client *rpc.Client +} + +type LaunchCollectorArgs struct { + Ctx *logcollector.LogCollectorContext +} + +func (e *SyslogCollectorRPC) LaunchCollector(ctx *logcollector.LogCollectorContext) (*logcollector.SyslogCollectorState, error) { + var ss *logcollector.SyslogCollectorState + err := e.client.Call("Plugin.LaunchCollector", LaunchCollectorArgs{Ctx: ctx}, &ss) + return ss, err +} + +func (e *SyslogCollectorRPC) Exit() error { + return e.client.Call("Plugin.Exit", new(interface{}), new(interface{})) +} + +func (e *SyslogCollectorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error { + return e.client.Call("Plugin.UpdateLogConfig", logConfig, new(interface{})) +} + +type SyslogCollectorRPCServer struct { + Impl logcollector.LogCollector +} + +func (s *SyslogCollectorRPCServer) LaunchCollector(args LaunchCollectorArgs, + resp *logcollector.SyslogCollectorState) error { + ss, err := s.Impl.LaunchCollector(args.Ctx) + if ss != nil { + *resp = *ss + } + return err +} + +func (s *SyslogCollectorRPCServer) Exit(args interface{}, resp *interface{}) error { + return s.Impl.Exit() +} + +func (s *SyslogCollectorRPCServer) UpdateLogConfig(logConfig *structs.LogConfig, resp *interface{}) error { + return s.Impl.UpdateLogConfig(logConfig) +} + +type SyslogCollectorPlugin struct { + logger *log.Logger + Impl *SyslogCollectorRPCServer +} + +func (p *SyslogCollectorPlugin) Server(*plugin.MuxBroker) (interface{}, error) { + if p.Impl == nil { + p.Impl = &SyslogCollectorRPCServer{Impl: logcollector.NewSyslogCollector(p.logger)} + } + return p.Impl, nil +} + +func (p *SyslogCollectorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) { + return &SyslogCollectorRPC{client: c}, nil +} diff --git a/client/driver/utils.go b/client/driver/utils.go index 188e467d2..bfb923e1b 100644 --- a/client/driver/utils.go +++ b/client/driver/utils.go @@ -9,11 +9,13 @@ import ( "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" + "github.com/hashicorp/nomad/client/driver/logcollector" ) // createExecutor launches an executor plugin and returns an instance of the // Executor interface -func createExecutor(config *plugin.ClientConfig, w io.Writer, clientConfig *config.Config) (executor.Executor, *plugin.Client, error) { +func createExecutor(config *plugin.ClientConfig, w io.Writer, + clientConfig *config.Config) (executor.Executor, *plugin.Client, error) { config.HandshakeConfig = HandshakeConfig config.Plugins = GetPluginMap(w) config.MaxPort = clientConfig.ClientMaxPort @@ -39,6 +41,30 @@ func createExecutor(config *plugin.ClientConfig, w io.Writer, clientConfig *conf return executorPlugin, executorClient, nil } +func createLogCollector(config *plugin.ClientConfig, w io.Writer, + clientConfig *config.Config) (logcollector.LogCollector, *plugin.Client, error) { + config.HandshakeConfig = HandshakeConfig + config.Plugins = GetPluginMap(w) + config.MaxPort = clientConfig.ClientMaxPort + config.MinPort = clientConfig.ClientMinPort + if config.Cmd != nil { + isolateCommand(config.Cmd) + } + + syslogClient := plugin.NewClient(config) + rpcCLient, err := syslogClient.Client() + if err != nil { + return nil, nil, fmt.Errorf("error creating rpc client for syslog plugin: %v", err) + } + + raw, err := rpcCLient.Dispense("syslogcollector") + if err != nil { + return nil, nil, fmt.Errorf("unable to dispense the syslog plugin: %v", err) + } + logCollector := raw.(logcollector.LogCollector) + return logCollector, syslogClient, nil +} + // killProcess kills a process with the given pid func killProcess(pid int) error { proc, err := os.FindProcess(pid) diff --git a/command/init.go b/command/init.go index a0b7d988f..9b24a2f36 100644 --- a/command/init.go +++ b/command/init.go @@ -158,7 +158,13 @@ job "example" { } } } - + + # Specify configuration related to log rotation + # logs { + # max_files = 10 + # max_file_size = 15 + # } + # Controls the timeout between signalling a task it will be killed # and killing the task. If not set a default is used. # kill_timeout = "20s" diff --git a/command/syslog_plugin.go b/command/syslog_plugin.go new file mode 100644 index 000000000..b10d6217d --- /dev/null +++ b/command/syslog_plugin.go @@ -0,0 +1,43 @@ +package command + +import ( + "os" + "strings" + + "github.com/hashicorp/go-plugin" + + "github.com/hashicorp/nomad/client/driver" +) + +type SyslogPluginCommand struct { + Meta +} + +func (e *SyslogPluginCommand) Help() string { + helpText := ` + This is a command used by Nomad internally to launch a syslog collector" + ` + return strings.TrimSpace(helpText) +} + +func (s *SyslogPluginCommand) Synopsis() string { + return "internal - lanch a syslog collector plugin" +} + +func (s *SyslogPluginCommand) Run(args []string) int { + if len(args) == 0 { + s.Ui.Error("log output file isn't provided") + } + logFileName := args[0] + stdo, err := os.OpenFile(logFileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + s.Ui.Error(err.Error()) + return 1 + } + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: driver.HandshakeConfig, + Plugins: driver.GetPluginMap(stdo), + }) + + return 0 +} diff --git a/command/util_test.go b/command/util_test.go index c9bd741f6..a94497c37 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -44,6 +44,10 @@ func testJob(jobID string) *api.Job { MemoryMB: 256, DiskMB: 20, CPU: 100, + }). + SetLogConfig(&api.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, }) group := api.NewTaskGroup("group1", 1). diff --git a/commands.go b/commands.go index 6ad1ba3c3..c77c8482a 100644 --- a/commands.go +++ b/commands.go @@ -105,7 +105,11 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory { Meta: meta, }, nil }, - + "syslog": func() (cli.Command, error) { + return &command.SyslogPluginCommand{ + Meta: meta, + }, nil + }, "server-force-leave": func() (cli.Command, error) { return &command.ServerForceLeaveCommand{ Meta: meta, diff --git a/jobspec/parse.go b/jobspec/parse.go index 8623a7e43..ffef6bfef 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -401,6 +401,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l delete(m, "service") delete(m, "meta") delete(m, "resources") + delete(m, "logs") // Build the task var t structs.Task @@ -484,6 +485,24 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l t.Resources = &r } + // If we have logs then parse that + logConfig := structs.DefaultLogConfig() + if o := listVal.Filter("logs"); len(o.Items) > 0 { + if len(o.Items) > 1 { + return fmt.Errorf("only one logs block is allowed in a Task. Number of logs block found: %d", len(o.Items)) + } + var m map[string]interface{} + logsBlock := o.Items[0] + if err := hcl.DecodeObject(&m, logsBlock.Val); err != nil { + return err + } + + if err := mapstructure.WeakDecode(m, &logConfig); err != nil { + return err + } + } + t.LogConfig = logConfig + *result = append(*result, &t) } diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 399bb4a32..00e5476c7 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -58,6 +58,10 @@ func TestParse(t *testing.T) { Meta: map[string]string{ "my-cool-key": "foobar", }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, }, }, }, @@ -123,6 +127,10 @@ func TestParse(t *testing.T) { }, }, KillTimeout: 22 * time.Second, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 100, + }, }, &structs.Task{ Name: "storagelocker", @@ -143,6 +151,10 @@ func TestParse(t *testing.T) { Operand: "=", }, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, }, }, }, @@ -284,6 +296,10 @@ func TestParse(t *testing.T) { }, }, }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, }, }, }, diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index 352aeb7cd..84413e3c1 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -42,6 +42,10 @@ job "binstore-storagelocker" { config { image = "hashicorp/binstore" } + logs { + max_files = 10 + max_file_size = 100 + } env { HELLO = "world" LOREM = "ipsum" diff --git a/main.go b/main.go index 1e3558658..11d237709 100644 --- a/main.go +++ b/main.go @@ -33,6 +33,7 @@ func RunCustom(args []string, commands map[string]cli.CommandFactory) int { for k, _ := range commands { switch k { case "executor": + case "syslog": default: commandsInclude = append(commandsInclude, k) } diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index c0d6705ef..233bad589 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -102,6 +102,7 @@ func Job() *structs.Job { PortLabel: "admin", }, }, + LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, @@ -177,6 +178,7 @@ func SystemJob() *structs.Job { }, }, }, + LogConfig: structs.DefaultLogConfig(), }, }, }, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index fe81774d2..10f72e7a6 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1533,6 +1533,32 @@ const ( DefaultKillTimeout = 5 * time.Second ) +// LogConfig provides configuration for log rotation +type LogConfig struct { + MaxFiles int `mapstructure:"max_files"` + MaxFileSizeMB int `mapstructure:"max_file_size"` +} + +func DefaultLogConfig() *LogConfig { + return &LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + } +} + +// Validate returns an error if the log config specified are less than +// the minimum allowed. +func (l *LogConfig) Validate() error { + var mErr multierror.Error + if l.MaxFiles < 1 { + mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum number of files is 1; got %d", l.MaxFiles)) + } + if l.MaxFileSizeMB < 1 { + mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum file size is 1MB; got %d", l.MaxFileSizeMB)) + } + return mErr.ErrorOrNil() +} + // Task is a single process typically that is executed as part of a task group. type Task struct { // Name of the task @@ -1564,6 +1590,9 @@ type Task struct { // KillTimeout is the time between signaling a task that it will be // killed and killing it. KillTimeout time.Duration `mapstructure:"kill_timeout"` + + // LogConfig provides configuration for log rotation + LogConfig *LogConfig `mapstructure:"logs"` } func (t *Task) Copy() *Task { @@ -1754,6 +1783,13 @@ func (t *Task) Validate() error { mErr.Errors = append(mErr.Errors, err) } + // Validate the log config + if t.LogConfig == nil { + mErr.Errors = append(mErr.Errors, errors.New("Missing Log Config")) + } else if err := t.LogConfig.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + for idx, constr := range t.Constraints { if err := constr.Validate(); err != nil { outer := fmt.Errorf("Constraint %d validation failed: %s", idx+1, err) @@ -1766,6 +1802,10 @@ func (t *Task) Validate() error { mErr.Errors = append(mErr.Errors, err) } } + + if t.Resources.DiskMB <= (t.LogConfig.MaxFiles * t.LogConfig.MaxFileSizeMB) { + mErr.Errors = append(mErr.Errors, fmt.Errorf("log storage exceeds requested disk capacity")) + } return mErr.ErrorOrNil() } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index a5cce7730..f77f6a540 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -258,6 +258,7 @@ func TestTask_Validate(t *testing.T) { MemoryMB: 100, IOPS: 10, }, + LogConfig: DefaultLogConfig(), } err = task.Validate() if err != nil { @@ -265,6 +266,21 @@ func TestTask_Validate(t *testing.T) { } } +func TestTask_Validate_LogConfig(t *testing.T) { + task := &Task{ + LogConfig: DefaultLogConfig(), + Resources: &Resources{ + DiskMB: 1, + }, + } + + err := task.Validate() + mErr := err.(*multierror.Error) + if !strings.Contains(mErr.Errors[3].Error(), "log storage") { + t.Fatalf("err: %s", err) + } +} + func TestConstraint_Validate(t *testing.T) { c := &Constraint{} err := c.Validate()