Merge pull request #763 from hashicorp/f-logrotator

Implemented log rotation for drivers
This commit is contained in:
Diptanu Choudhury
2016-02-11 15:02:39 -08:00
34 changed files with 1092 additions and 126 deletions

View File

@@ -74,6 +74,12 @@ func (g *TaskGroup) AddTask(t *Task) *TaskGroup {
return g
}
// LogConfig provides configuration for log rotation
type LogConfig struct {
MaxFiles int
MaxFileSizeMB int
}
// Task is a single process in a task group.
type Task struct {
Name string
@@ -85,6 +91,7 @@ type Task struct {
Resources *Resources
Meta map[string]string
KillTimeout time.Duration
LogConfig *LogConfig
}
// NewTask creates and initializes a new Task.
@@ -126,6 +133,12 @@ func (t *Task) Constrain(c *Constraint) *Task {
return t
}
// SetLogConfig sets a log config to a task
func (t *Task) SetLogConfig(l *LogConfig) *Task {
t.LogConfig = l
return t
}
// TaskState tracks the current state of a task and events that caused state
// transistions.
type TaskState struct {

View File

@@ -26,6 +26,10 @@ func testJob() *Job {
MemoryMB: 256,
DiskMB: 25,
IOPS: 10,
}).
SetLogConfig(&LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
})
group := NewTaskGroup("group1", 1).

View File

@@ -6,6 +6,7 @@ import (
"log"
"net"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
@@ -14,10 +15,14 @@ import (
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/logcollector"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
@@ -69,12 +74,15 @@ func (c *DockerDriverConfig) Validate() error {
}
type dockerPID struct {
ImageID string
ContainerID string
KillTimeout time.Duration
ImageID string
ContainerID string
KillTimeout time.Duration
PluginConfig *PluginReattachConfig
}
type DockerHandle struct {
pluginClient *plugin.Client
logCollector logcollector.LogCollector
client *docker.Client
logger *log.Logger
cleanupContainer bool
@@ -173,7 +181,8 @@ func (d *DockerDriver) containerBinds(alloc *allocdir.AllocDir, task *structs.Ta
}
// createContainer initializes a struct needed to call docker.client.CreateContainer()
func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, driverConfig *DockerDriverConfig) (docker.CreateContainerOptions, error) {
func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task,
driverConfig *DockerDriverConfig, syslogAddr string) (docker.CreateContainerOptions, error) {
var c docker.CreateContainerOptions
if task.Resources == nil {
// Guard against missing resources. We should never have been able to
@@ -230,6 +239,12 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, dri
// local directory for storage and a shared alloc directory that can be
// used to share data between different tasks in the same task group.
Binds: binds,
LogConfig: docker.LogConfig{
Type: "syslog",
Config: map[string]string{
"syslog-address": fmt.Sprintf("tcp://%v", syslogAddr),
},
},
}
d.logger.Printf("[DEBUG] driver.docker: using %d bytes memory for %s", hostConfig.Memory, task.Config["image"])
@@ -467,11 +482,44 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err)
}
}
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID)
config, err := d.createContainer(ctx, task, &driverConfig)
bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-syslog-collector.out", task.Name))
pluginConfig := &plugin.ClientConfig{
Cmd: exec.Command(bin, "syslog", pluginLogFile),
}
logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, err
}
logCollectorCtx := &logcollector.LogCollectorContext{
TaskName: task.Name,
AllocDir: ctx.AllocDir,
LogConfig: task.LogConfig,
PortLowerBound: d.config.ClientMinPort,
PortUpperBound: d.config.ClientMaxPort,
}
ss, err := logCollector.LaunchCollector(logCollectorCtx)
if err != nil {
return nil, fmt.Errorf("failed to start syslog collector: %v", err)
}
d.logger.Printf("Started the syslog server at %v", ss.Addr)
config, err := d.createContainer(ctx, task, &driverConfig, ss.Addr)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", image, err)
}
// Create a container
@@ -490,12 +538,14 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
})
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to query list of containers matching name:%s", config.Name)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to query list of containers: %s", err)
}
// Couldn't find any matching containers
if len(containers) == 0 {
d.logger.Printf("[ERR] driver.docker: failed to get id for container %s: %#v", config.Name, containers)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to get id for container %s", config.Name)
}
@@ -507,6 +557,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
})
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to purge container %s", container.ID)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to purge container %s: %s", container.ID, err)
}
d.logger.Printf("[INFO] driver.docker: purged container %s", container.ID)
@@ -515,11 +566,13 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
container, err = client.CreateContainer(config)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to re-create container %s; aborting", config.Name)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to re-create container %s; aborting", config.Name)
}
} else {
// We failed to create the container for some other reason.
d.logger.Printf("[ERR] driver.docker: failed to create container from image %s: %s", image, err)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to create container from image %s: %s", image, err)
}
}
@@ -529,6 +582,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
err = client.StartContainer(container.ID, container.HostConfig)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to start container %s: %s", container.ID, err)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to start container %s: %s", container.ID, err)
}
d.logger.Printf("[INFO] driver.docker: started container %s", container.ID)
@@ -536,6 +590,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
// Return a driver handle
h := &DockerHandle{
client: client,
logCollector: logCollector,
pluginClient: pluginClient,
cleanupContainer: cleanupContainer,
cleanupImage: cleanupImage,
logger: d.logger,
@@ -560,8 +616,10 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err)
}
d.logger.Printf("[INFO] driver.docker: re-attaching to docker process: %s", handleID)
pluginConfig := &plugin.ClientConfig{
Reattach: pid.PluginConfig.PluginConfig(),
}
// Initialize docker API client
client, err := d.dockerClient()
if err != nil {
return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err)
@@ -586,10 +644,20 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
if !found {
return nil, fmt.Errorf("Failed to find container %s: %v", pid.ContainerID, err)
}
logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
d.logger.Printf("[INFO] driver.docker: couldn't re-attach to the plugin process: %v", err)
if e := client.StopContainer(pid.ContainerID, uint(pid.KillTimeout*time.Second)); e != nil {
d.logger.Printf("[DEBUG] driver.docker: couldn't stop container: %v", e)
}
return nil, err
}
// Return a driver handle
h := &DockerHandle{
client: client,
logCollector: logCollector,
pluginClient: pluginClient,
cleanupContainer: cleanupContainer,
cleanupImage: cleanupImage,
logger: d.logger,
@@ -606,9 +674,10 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
func (h *DockerHandle) ID() string {
// Return a handle to the PID
pid := dockerPID{
ImageID: h.imageID,
ContainerID: h.containerID,
KillTimeout: h.killTimeout,
ImageID: h.imageID,
ContainerID: h.containerID,
KillTimeout: h.killTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
}
data, err := json.Marshal(pid)
if err != nil {
@@ -628,6 +697,9 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult {
func (h *DockerHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil {
h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err)
}
// Update is not possible
return nil
@@ -699,4 +771,10 @@ func (h *DockerHandle) run() {
close(h.doneCh)
h.waitCh <- cstructs.NewWaitResult(exitCode, 0, err)
close(h.waitCh)
// Shutdown the syslog collector
if err := h.logCollector.Exit(); err != nil {
h.logger.Printf("[ERR] driver.docker: failed to kill the syslog collector: %v", err)
}
h.pluginClient.Kill()
}

View File

@@ -4,6 +4,8 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"os/exec"
"path/filepath"
"reflect"
"runtime/debug"
@@ -11,9 +13,11 @@ import (
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/env"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
@@ -67,6 +71,10 @@ func dockerTask() (*structs.Task, int, int) {
Config: map[string]interface{}{
"image": "redis",
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: &structs.Resources{
MemoryMB: 256,
CPU: 512,
@@ -125,16 +133,37 @@ func dockerSetup(t *testing.T, task *structs.Task) (*docker.Client, DriverHandle
func TestDockerDriver_Handle(t *testing.T) {
t.Parallel()
bin, err := discover.NomadExecutable()
if err != nil {
t.Fatalf("got an err: %v", err)
}
f, _ := ioutil.TempFile(os.TempDir(), "")
defer f.Close()
defer os.Remove(f.Name())
pluginConfig := &plugin.ClientConfig{
Cmd: exec.Command(bin, "syslog", f.Name()),
}
logCollector, pluginClient, err := createLogCollector(pluginConfig, os.Stdout, &config.Config{})
if err != nil {
t.Fatalf("got an err: %v", err)
}
defer pluginClient.Kill()
h := &DockerHandle{
imageID: "imageid",
containerID: "containerid",
killTimeout: 5 * time.Nanosecond,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
imageID: "imageid",
logCollector: logCollector,
pluginClient: pluginClient,
containerID: "containerid",
killTimeout: 5 * time.Nanosecond,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
actual := h.ID()
expected := `DOCKER:{"ImageID":"imageid","ContainerID":"containerid","KillTimeout":5}`
expected := fmt.Sprintf("DOCKER:{\"ImageID\":\"imageid\",\"ContainerID\":\"containerid\",\"KillTimeout\":5,\"PluginConfig\":{\"Pid\":%d,\"AddrNet\":\"unix\",\"AddrName\":\"%s\"}}",
pluginClient.ReattachConfig().Pid, pluginClient.ReattachConfig().Addr.String())
if actual != expected {
t.Errorf("Expected `%s`, found `%s`", expected, actual)
}
@@ -172,6 +201,10 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) {
Config: map[string]interface{}{
"image": "redis",
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
@@ -211,6 +244,10 @@ func TestDockerDriver_Start_Wait(t *testing.T) {
MemoryMB: 256,
CPU: 512,
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
}
_, handle, cleanup := dockerSetup(t, task)
@@ -254,6 +291,10 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) {
string(exp), env.AllocDir, file),
},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: &structs.Resources{
MemoryMB: 256,
CPU: 512,
@@ -303,6 +344,10 @@ func TestDockerDriver_Start_Kill_Wait(t *testing.T) {
"command": "/bin/sleep",
"args": []string{"10"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
@@ -437,6 +482,10 @@ func TestDockerHostNet(t *testing.T) {
MemoryMB: 256,
CPU: 512,
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
}
client, handle, cleanup := dockerSetup(t, task)

View File

@@ -120,6 +120,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
AllocDir: ctx.AllocDir,
TaskName: task.Name,
TaskResources: task.Resources,
LogConfig: task.LogConfig,
ResourceLimits: true,
FSIsolation: true,
UnprivilegedUser: true,
@@ -153,7 +154,7 @@ type execId struct {
TaskDir string
AllocDir *allocdir.AllocDir
IsolationConfig *executor.IsolationConfig
PluginConfig *ExecutorReattachConfig
PluginConfig *PluginReattachConfig
}
func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
@@ -203,7 +204,7 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
func (h *execHandle) ID() string {
id := execId{
KillTimeout: h.killTimeout,
PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
AllocDir: h.allocDir,
IsolationConfig: h.isolationConfig,
@@ -223,6 +224,7 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult {
func (h *execHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
h.executor.UpdateLogConfig(task.LogConfig)
// Update is not possible
return nil

View File

@@ -50,6 +50,10 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) {
"command": "/bin/sleep",
"args": []string{"5"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
@@ -87,6 +91,10 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) {
"command": "/bin/sleep",
"args": []string{"1000000"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
@@ -144,6 +152,10 @@ func TestExecDriver_Start_Wait(t *testing.T) {
"command": "/bin/sleep",
"args": []string{"2"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
@@ -188,6 +200,10 @@ func TestExecDriver_Start_Artifact_basic(t *testing.T) {
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s?checksum=%s", file, checksum),
"command": file,
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
@@ -232,6 +248,10 @@ func TestExecDriver_Start_Artifact_expanded(t *testing.T) {
"command": "/bin/bash",
"args": []string{"-c", fmt.Sprintf("/bin/sleep 1 && %s", file)},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
@@ -278,6 +298,10 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
fmt.Sprintf(`sleep 1; echo -n %s > ${%s}/%s`, string(exp), env.AllocDir, file),
},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
@@ -324,6 +348,10 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) {
"command": "/bin/sleep",
"args": []string{"100"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
KillTimeout: 10 * time.Second,
}

View File

@@ -2,6 +2,7 @@ package executor
import (
"fmt"
"io"
"log"
"os"
"os/exec"
@@ -17,13 +18,13 @@ import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/client/driver/logrotator"
"github.com/hashicorp/nomad/nomad/structs"
)
// ExecutorContext holds context to configure the command user
// wants to run and isolate it
type ExecutorContext struct {
// TaskEnv holds information about the environment of a Task
TaskEnv *env.TaskEnvironment
@@ -48,6 +49,9 @@ type ExecutorContext struct {
// UnprivilegedUser is a flag for drivers to make the process
// run as nobody
UnprivilegedUser bool
// LogConfig provides the configuration related to log rotation
LogConfig *structs.LogConfig
}
// ExecCommand holds the user command and args. It's a lightweight replacement
@@ -79,6 +83,7 @@ type Executor interface {
Wait() (*ProcessState, error)
ShutDown() error
Exit() error
UpdateLogConfig(logConfig *structs.LogConfig) error
}
// UniversalExecutor is an implementation of the Executor which launches and
@@ -92,6 +97,8 @@ type UniversalExecutor struct {
groups *cgroupConfig.Cgroup
exitState *ProcessState
processExited chan interface{}
lre *logrotator.LogRotator
lro *logrotator.LogRotator
logger *log.Logger
lock sync.Mutex
@@ -127,20 +134,29 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
}
}
// configuring log rotate
stdoPath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", ctx.TaskName))
stdo, err := os.OpenFile(stdoPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
e.cmd.Stdout = stdo
logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
stdePath := filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", ctx.TaskName))
stde, err := os.OpenFile(stdePath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
stdor, stdow := io.Pipe()
lro, err := logrotator.NewLogRotator(filepath.Join(e.taskDir, allocdir.TaskLocal),
fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles,
logFileSize, e.logger)
if err != nil {
return nil, err
return nil, fmt.Errorf("error creating log rotator for stdout of task %v", err)
}
e.cmd.Stderr = stde
e.cmd.Stdout = stdow
e.lro = lro
go lro.Start(stdor)
stder, stdew := io.Pipe()
lre, err := logrotator.NewLogRotator(filepath.Join(e.taskDir, allocdir.TaskLocal),
fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles,
logFileSize, e.logger)
if err != nil {
return nil, fmt.Errorf("error creating log rotator for stderr of task %v", err)
}
e.cmd.Stderr = stdew
e.lre = lre
go lre.Start(stder)
// setting the env, path and args for the command
e.ctx.TaskEnv.Build()
@@ -169,6 +185,23 @@ func (e *UniversalExecutor) Wait() (*ProcessState, error) {
return e.exitState, nil
}
// UpdateLogConfig updates the log configuration
func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error {
e.ctx.LogConfig = logConfig
if e.lro == nil {
return fmt.Errorf("log rotator for stdout doesn't exist")
}
e.lro.MaxFiles = logConfig.MaxFiles
e.lro.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
if e.lre == nil {
return fmt.Errorf("log rotator for stderr doesn't exist")
}
e.lre.MaxFiles = logConfig.MaxFiles
e.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
return nil
}
func (e *UniversalExecutor) wait() {
defer close(e.processExited)
err := e.cmd.Wait()

View File

@@ -50,6 +50,7 @@ func testExecutorContext(t *testing.T) *ExecutorContext {
TaskName: taskName,
AllocDir: allocDir,
TaskResources: constraint,
LogConfig: structs.DefaultLogConfig(),
}
return ctx
}
@@ -84,7 +85,7 @@ func TestExecutor_Start_Wait_Failure_Code(t *testing.T) {
func TestExecutor_Start_Wait(t *testing.T) {
execCmd := ExecCommand{Cmd: "/bin/echo", Args: []string{"hello world"}}
ctx := testExecutorContext(t)
defer ctx.AllocDir.Destroy()
//defer ctx.AllocDir.Destroy()
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
ps, err := executor.LaunchCmd(&execCmd, ctx)
if err != nil {
@@ -105,7 +106,7 @@ func TestExecutor_Start_Wait(t *testing.T) {
}
expected := "hello world"
file := filepath.Join(allocdir.TaskLocal, "web.stdout")
file := filepath.Join(allocdir.TaskLocal, "web.stdout.0")
absFilePath := filepath.Join(taskDir, file)
output, err := ioutil.ReadFile(absFilePath)
if err != nil {
@@ -149,7 +150,7 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) {
}
expected := "hello world"
file := filepath.Join(allocdir.TaskLocal, "web.stdout")
file := filepath.Join(allocdir.TaskLocal, "web.stdout.0")
absFilePath := filepath.Join(taskDir, file)
output, err := ioutil.ReadFile(absFilePath)
if err != nil {
@@ -185,7 +186,7 @@ func TestExecutor_Start_Kill(t *testing.T) {
t.Fatalf("No task directory found for task %v", task)
}
file := filepath.Join(allocdir.TaskLocal, "web.stdout")
file := filepath.Join(allocdir.TaskLocal, "web.stdout.0")
absFilePath := filepath.Join(taskDir, file)
time.Sleep(time.Duration(tu.TestMultiplier()*2) * time.Second)

View File

@@ -1,51 +1,14 @@
package driver
import (
"io"
"log"
"net"
"net/rpc"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/driver/executor"
"github.com/hashicorp/nomad/nomad/structs"
)
var HandshakeConfig = plugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "NOMAD_PLUGIN_MAGIC_COOKIE",
MagicCookieValue: "e4327c2e01eabfd75a8a67adb114fb34a757d57eee7728d857a8cec6e91a7255",
}
func GetPluginMap(w io.Writer) map[string]plugin.Plugin {
p := new(ExecutorPlugin)
p.logger = log.New(w, "", log.LstdFlags)
return map[string]plugin.Plugin{"executor": p}
}
// ExecutorReattachConfig is the config that we seralize and de-serialize and
// store in disk
type ExecutorReattachConfig struct {
Pid int
AddrNet string
AddrName string
}
// PluginConfig returns a config from an ExecutorReattachConfig
func (c *ExecutorReattachConfig) PluginConfig() *plugin.ReattachConfig {
var addr net.Addr
switch c.AddrNet {
case "unix", "unixgram", "unixpacket":
addr, _ = net.ResolveUnixAddr(c.AddrNet, c.AddrName)
case "tcp", "tcp4", "tcp6":
addr, _ = net.ResolveTCPAddr(c.AddrNet, c.AddrName)
}
return &plugin.ReattachConfig{Pid: c.Pid, Addr: addr}
}
func NewExecutorReattachConfig(c *plugin.ReattachConfig) *ExecutorReattachConfig {
return &ExecutorReattachConfig{Pid: c.Pid, AddrNet: c.Addr.Network(), AddrName: c.Addr.String()}
}
type ExecutorRPC struct {
client *rpc.Client
}
@@ -76,6 +39,10 @@ func (e *ExecutorRPC) Exit() error {
return e.client.Call("Plugin.Exit", new(interface{}), new(interface{}))
}
func (e *ExecutorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error {
return e.client.Call("Plugin.UpdateLogConfig", logConfig, new(interface{}))
}
type ExecutorRPCServer struct {
Impl executor.Executor
}
@@ -104,6 +71,10 @@ func (e *ExecutorRPCServer) Exit(args interface{}, resp *interface{}) error {
return e.Impl.Exit()
}
func (e *ExecutorRPCServer) UpdateLogConfig(args *structs.LogConfig, resp *interface{}) error {
return e.Impl.UpdateLogConfig(args)
}
type ExecutorPlugin struct {
logger *log.Logger
Impl *ExecutorRPCServer

View File

@@ -167,9 +167,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
AllocDir: ctx.AllocDir,
TaskName: task.Name,
TaskResources: task.Resources,
FSIsolation: true,
ResourceLimits: true,
UnprivilegedUser: true,
LogConfig: task.LogConfig,
}
ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: "java", Args: args}, executorCtx)
if err != nil {
@@ -198,7 +197,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
type javaId struct {
KillTimeout time.Duration
PluginConfig *ExecutorReattachConfig
PluginConfig *PluginReattachConfig
IsolationConfig *executor.IsolationConfig
TaskDir string
AllocDir *allocdir.AllocDir
@@ -255,7 +254,7 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
func (h *javaHandle) ID() string {
id := javaId{
KillTimeout: h.killTimeout,
PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
TaskDir: h.taskDir,
AllocDir: h.allocDir,
@@ -276,6 +275,7 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult {
func (h *javaHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
h.executor.UpdateLogConfig(task.LogConfig)
// Update is not possible
return nil

View File

@@ -58,6 +58,10 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) {
"jvm_options": []string{"-Xmx64m", "-Xms32m"},
"checksum": "sha256:58d6e8130308d32e197c5108edd4f56ddf1417408f743097c2e662df0f0b17c8",
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
@@ -103,6 +107,10 @@ func TestJavaDriver_Start_Wait(t *testing.T) {
"artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar",
"checksum": "sha256:58d6e8130308d32e197c5108edd4f56ddf1417408f743097c2e662df0f0b17c8",
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
@@ -148,6 +156,10 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) {
Config: map[string]interface{}{
"artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar",
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}

View File

@@ -0,0 +1,185 @@
package logcollector
import (
"fmt"
"io"
"log"
s1 "log/syslog"
"net"
"path/filepath"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/driver/executor"
"github.com/hashicorp/nomad/client/driver/logrotator"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mcuadros/go-syslog"
)
// LogCollectorContext holds context to configure the syslog server
type LogCollectorContext struct {
// TaskName is the name of the Task
TaskName string
// AllocDir is the handle to do operations on the alloc dir of
// the task
AllocDir *allocdir.AllocDir
// LogConfig provides configuration related to log rotation
LogConfig *structs.LogConfig
// PortUpperBound is the upper bound of the ports that we can use to start
// the syslog server
PortUpperBound uint
// PortLowerBound is the lower bound of the ports that we can use to start
// the syslog server
PortLowerBound uint
}
// SyslogCollectorState holds the address and islation information of a launched
// syslog server
type SyslogCollectorState struct {
IsolationConfig *executor.IsolationConfig
Addr string
}
// LogCollector is an interface which allows a driver to launch a log server
// and update log configuration
type LogCollector interface {
LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error)
Exit() error
UpdateLogConfig(logConfig *structs.LogConfig) error
}
// SyslogCollector is a LogCollector which starts a syslog server and does
// rotation to incoming stream
type SyslogCollector struct {
addr net.Addr
logConfig *structs.LogConfig
ctx *LogCollectorContext
lro *logrotator.LogRotator
lre *logrotator.LogRotator
server *syslog.Server
taskDir string
logger *log.Logger
}
// NewSyslogCollector returns an implementation of the SyslogCollector
func NewSyslogCollector(logger *log.Logger) *SyslogCollector {
return &SyslogCollector{logger: logger}
}
// LaunchCollector launches a new syslog server and starts writing log lines to
// files and rotates them
func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) {
addr, err := s.getFreePort(ctx.PortLowerBound, ctx.PortUpperBound)
if err != nil {
return nil, err
}
s.logger.Printf("[DEBUG] sylog-server: launching syslog server on addr: %v", addr)
s.ctx = ctx
// configuring the task dir
if err := s.configureTaskDir(); err != nil {
return nil, err
}
channel := make(syslog.LogPartsChannel)
handler := syslog.NewChannelHandler(channel)
s.server = syslog.NewServer()
s.server.SetFormat(&CustomParser{logger: s.logger})
s.server.SetHandler(handler)
s.server.ListenTCP(addr.String())
if err := s.server.Boot(); err != nil {
return nil, err
}
logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
ro, wo := io.Pipe()
lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal),
fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles,
logFileSize, s.logger)
if err != nil {
return nil, err
}
s.lro = lro
go lro.Start(ro)
re, we := io.Pipe()
lre, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal),
fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles,
logFileSize, s.logger)
if err != nil {
return nil, err
}
s.lre = lre
go lre.Start(re)
go func(channel syslog.LogPartsChannel) {
for logParts := range channel {
// If the severity of the log line is err then we write to stderr
// otherwise all messages go to stdout
s := logParts["severity"].(Priority)
if s.Severity == s1.LOG_ERR {
we.Write(logParts["content"].([]byte))
} else {
wo.Write(logParts["content"].([]byte))
}
wo.Write([]byte("\n"))
}
}(channel)
go s.server.Wait()
return &SyslogCollectorState{Addr: addr.String()}, nil
}
// Exit kills the syslog server
func (s *SyslogCollector) Exit() error {
return s.server.Kill()
}
// UpdateLogConfig updates the log configuration
func (s *SyslogCollector) UpdateLogConfig(logConfig *structs.LogConfig) error {
s.ctx.LogConfig = logConfig
if s.lro == nil {
return fmt.Errorf("log rotator for stdout doesn't exist")
}
s.lro.MaxFiles = logConfig.MaxFiles
s.lro.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
if s.lre == nil {
return fmt.Errorf("log rotator for stderr doesn't exist")
}
s.lre.MaxFiles = logConfig.MaxFiles
s.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
return nil
}
// configureTaskDir sets the task dir in the SyslogCollector
func (s *SyslogCollector) configureTaskDir() error {
taskDir, ok := s.ctx.AllocDir.TaskDirs[s.ctx.TaskName]
if !ok {
return fmt.Errorf("couldn't find task directory for task %v", s.ctx.TaskName)
}
s.taskDir = taskDir
return nil
}
// getFreePort returns a free port ready to be listened on between upper and
// lower bounds
func (s *SyslogCollector) getFreePort(lowerBound uint, upperBound uint) (net.Addr, error) {
for i := lowerBound; i <= upperBound; i++ {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i))
if err != nil {
return nil, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
continue
}
defer l.Close()
return l.Addr(), nil
}
return nil, fmt.Errorf("No free port found")
}

View File

@@ -0,0 +1,159 @@
package logcollector
import (
"bufio"
"fmt"
"log"
"log/syslog"
"strconv"
"time"
"github.com/jeromer/syslogparser"
)
// Errors related to parsing priority
var (
ErrPriorityNoStart = fmt.Errorf("No start char found for priority")
ErrPriorityEmpty = fmt.Errorf("Priority field empty")
ErrPriorityNoEnd = fmt.Errorf("No end char found for priority")
ErrPriorityTooShort = fmt.Errorf("Priority field too short")
ErrPriorityTooLong = fmt.Errorf("Priority field too long")
ErrPriorityNonDigit = fmt.Errorf("Non digit found in priority")
)
// Priority header and ending characters
const (
PRI_PART_START = '<'
PRI_PART_END = '>'
)
// Priority holds all the priority bits in a syslog log line
type Priority struct {
Pri int
Facility syslog.Priority
Severity syslog.Priority
}
// DockerLogParser parses a line of log message that the docker daemon ships
type DockerLogParser struct {
line []byte
content []byte
severity Priority
log *log.Logger
}
// NewDockerLogParser creates a new DockerLogParser
func NewDockerLogParser(line []byte) *DockerLogParser {
return &DockerLogParser{line: line}
}
// Parse parses a syslog log line
func (d *DockerLogParser) Parse() error {
severity, _, _ := d.parsePriority(d.line)
msgIdx := d.logContentIndex(d.line)
d.severity = severity
d.content = d.line[msgIdx:]
return nil
}
// Dump creates a map of the parsed log line and severity
func (d *DockerLogParser) Dump() syslogparser.LogParts {
return map[string]interface{}{
"content": d.content,
"severity": d.severity,
}
}
// logContentIndex finds out the index of the start index of the content in a
// syslog line
func (d *DockerLogParser) logContentIndex(line []byte) int {
cursor := 0
numSpace := 0
for i := 0; i < len(line); i++ {
if line[i] == ' ' {
numSpace += 1
if numSpace == 1 {
cursor = i
break
}
}
}
for i := cursor; i < len(line); i++ {
if line[i] == ':' {
cursor = i
break
}
}
return cursor + 1
}
// parsePriority parses the priority in a syslog message
func (d *DockerLogParser) parsePriority(line []byte) (Priority, int, error) {
cursor := 0
pri := d.newPriority(0)
if len(line) <= 0 {
return pri, cursor, ErrPriorityEmpty
}
if line[cursor] != PRI_PART_START {
return pri, cursor, ErrPriorityNoStart
}
i := 1
priDigit := 0
for i < len(line) {
if i >= 5 {
return pri, cursor, ErrPriorityTooLong
}
c := line[i]
if c == PRI_PART_END {
if i == 1 {
return pri, cursor, ErrPriorityTooShort
}
cursor = i + 1
return d.newPriority(priDigit), cursor, nil
}
if d.isDigit(c) {
v, e := strconv.Atoi(string(c))
if e != nil {
return pri, cursor, e
}
priDigit = (priDigit * 10) + v
} else {
return pri, cursor, ErrPriorityNonDigit
}
i++
}
return pri, cursor, ErrPriorityNoEnd
}
// isDigit checks if a byte is a numeric char
func (d *DockerLogParser) isDigit(c byte) bool {
return c >= '0' && c <= '9'
}
// newPriority creates a new default priority
func (d *DockerLogParser) newPriority(p int) Priority {
// The Priority value is calculated by first multiplying the Facility
// number by 8 and then adding the numerical value of the Severity.
return Priority{
Pri: p,
Facility: syslog.Priority(p / 8),
Severity: syslog.Priority(p % 8),
}
}
func (d *DockerLogParser) Location(location *time.Location) {
}
// CustomParser is a parser to parse docker syslog lines
type CustomParser struct {
logger *log.Logger
}
func (c *CustomParser) GetParser(line []byte) syslogparser.LogParser {
return NewDockerLogParser(line)
}
func (c *CustomParser) GetSplitFunc() bufio.SplitFunc {
return nil
}

View File

@@ -0,0 +1,24 @@
package logcollector
import (
"log/syslog"
"testing"
)
func TestLogParser_Priority(t *testing.T) {
line := []byte("<30>2016-02-10T10:16:43-08:00 d-thinkpad docker/e2a1e3ebd3a3[22950]: 1:C 10 Feb 18:16:43.391 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf")
d := NewDockerLogParser(line)
p, _, err := d.parsePriority(line)
if err != nil {
t.Fatalf("got an err: %v", err)
}
if p.Severity != syslog.LOG_INFO {
t.Fatalf("expected serverity: %v, got: %v", syslog.LOG_INFO, p.Severity)
}
idx := d.logContentIndex(line)
expected := 68
if idx != expected {
t.Fatalf("expected idx: %v, got: %v", expected, idx)
}
}

View File

@@ -1,4 +1,4 @@
package driver
package logrotator
import (
"fmt"
@@ -18,14 +18,16 @@ const (
// LogRotator ingests data and writes out to a rotated set of files
type LogRotator struct {
maxFiles int // maximum number of rotated files retained by the log rotator
fileSize int64 // maximum file size of a rotated file
MaxFiles int // maximum number of rotated files retained by the log rotator
FileSize int64 // maximum file size of a rotated file
path string // path where the rotated files are created
fileName string // base file name of the rotated files
logFileIdx int // index to the current file
logFileIdx int // index to the current file
oldestLogFileIdx int // index to the oldest log file
logger *log.Logger
logger *log.Logger
purgeCh chan struct{}
}
// NewLogRotator configures and returns a new LogRotator
@@ -51,14 +53,18 @@ func NewLogRotator(path string, fileName string, maxFiles int, fileSize int64, l
}
}
return &LogRotator{
maxFiles: maxFiles,
fileSize: fileSize,
lr := &LogRotator{
MaxFiles: maxFiles,
FileSize: fileSize,
path: path,
fileName: fileName,
logFileIdx: logFileIdx,
logger: logger,
}, nil
purgeCh: make(chan struct{}, 1),
}
go lr.PurgeOldFiles()
return lr, nil
}
// Start reads from a Reader and writes them to files and rotates them when the
@@ -67,15 +73,15 @@ func (l *LogRotator) Start(r io.Reader) error {
buf := make([]byte, bufSize)
for {
logFileName := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, l.logFileIdx))
remainingSize := l.fileSize
var fileSize int64
if f, err := os.Stat(logFileName); err == nil {
// Skipping the current file if it happens to be a directory
if f.IsDir() {
l.logFileIdx += 1
continue
}
fileSize = f.Size()
// Calculating the remaining capacity of the log file
remainingSize = l.fileSize - f.Size()
}
f, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
@@ -84,17 +90,23 @@ func (l *LogRotator) Start(r io.Reader) error {
l.logger.Printf("[DEBUG] client.logrotator: opened a new file: %s", logFileName)
// Closing the current log file if it doesn't have any more capacity
if remainingSize <= 0 {
l.logFileIdx = l.logFileIdx + 1
if fileSize >= l.FileSize {
l.logFileIdx += 1
f.Close()
continue
}
// Reading from the reader and writing into the current log file as long
// as it has capacity or the reader closes
totalWritten := 0
for {
if l.FileSize-(fileSize+int64(totalWritten)) < 1 {
f.Close()
break
}
var nr int
var err error
remainingSize := l.FileSize - (int64(totalWritten) + fileSize)
if remainingSize < bufSize {
nr, err = r.Read(buf[0:remainingSize])
} else {
@@ -113,13 +125,16 @@ func (l *LogRotator) Start(r io.Reader) error {
f.Close()
return fmt.Errorf("failed to write data read from the reader into file, R: %d W: %d", nr, nw)
}
remainingSize -= int64(nr)
if remainingSize < 1 {
f.Close()
break
}
totalWritten += nr
}
l.logFileIdx = l.logFileIdx + 1
// Purge old files if we have more files than MaxFiles
if l.logFileIdx-l.oldestLogFileIdx >= l.MaxFiles {
select {
case l.purgeCh <- struct{}{}:
default:
}
}
}
return nil
}
@@ -127,29 +142,36 @@ func (l *LogRotator) Start(r io.Reader) error {
// PurgeOldFiles removes older files and keeps only the last N files rotated for
// a file
func (l *LogRotator) PurgeOldFiles() {
var fIndexes []int
files, err := ioutil.ReadDir(l.path)
if err != nil {
return
}
// Inserting all the rotated files in a slice
for _, f := range files {
if strings.HasPrefix(f.Name(), l.fileName) {
fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName))
n, err := strconv.Atoi(fileIdx)
for {
select {
case <-l.purgeCh:
var fIndexes []int
files, err := ioutil.ReadDir(l.path)
if err != nil {
continue
return
}
fIndexes = append(fIndexes, n)
// Inserting all the rotated files in a slice
for _, f := range files {
if strings.HasPrefix(f.Name(), l.fileName) {
fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName))
n, err := strconv.Atoi(fileIdx)
if err != nil {
continue
}
fIndexes = append(fIndexes, n)
}
}
// Sorting the file indexes so that we can purge the older files and keep
// only the number of files as configured by the user
sort.Sort(sort.IntSlice(fIndexes))
var toDelete []int
toDelete = fIndexes[0 : len(fIndexes)-l.MaxFiles]
for _, fIndex := range toDelete {
fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex))
os.RemoveAll(fname)
}
l.oldestLogFileIdx = fIndexes[0]
}
}
// Sorting the file indexes so that we can purge the older files and keep
// only the number of files as configured by the user
sort.Sort(sort.IntSlice(fIndexes))
toDelete := fIndexes[l.maxFiles-1 : len(fIndexes)-1]
for _, fIndex := range toDelete {
fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex))
os.RemoveAll(fname)
}
}

View File

@@ -1,12 +1,14 @@
package driver
package logrotator
import (
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"testing"
"time"
)
var (
@@ -228,7 +230,10 @@ func TestLogRotator_PurgeDirs(t *testing.T) {
r, w := io.Pipe()
go func() {
w.Write([]byte("abcdefghijklmno"))
w.Write([]byte("abcdefghijklmnopqrxyz"))
time.Sleep(1 * time.Second)
l.MaxFiles = 1
w.Write([]byte("abcdefghijklmnopqrxyz"))
w.Close()
}()
@@ -236,13 +241,56 @@ func TestLogRotator_PurgeDirs(t *testing.T) {
if err != nil && err != io.EOF {
t.Fatalf("failure in logrotator start: %v", err)
}
l.PurgeOldFiles()
// sleeping for a second because purging is async
time.Sleep(1 * time.Second)
files, err := ioutil.ReadDir(path)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(files) != 2 {
t.Fatalf("expected number of files: %v, actual: %v", 2, len(files))
expected := 1
if len(files) != expected {
t.Fatalf("expected number of files: %v, actual: %v", expected, len(files))
}
}
func TestLogRotator_UpdateConfig(t *testing.T) {
var path string
var err error
defer os.RemoveAll(path)
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
l, err := NewLogRotator(path, "redis.stdout", 10, 10, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
r, w := io.Pipe()
go func() {
w.Write([]byte("abcdefg"))
l.FileSize = 5
w.Write([]byte("hijklmnojkp"))
w.Close()
}()
err = l.Start(r)
if err != nil && err != io.EOF {
t.Fatalf("Failure in logrotator start %v", err)
}
finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0"))
finfo1, err1 := os.Stat(filepath.Join(path, "redis.stdout.1"))
if err != nil {
t.Fatal(err)
}
if finfo.Size() != 10 {
t.Fatalf("expected size of file: %v, actual: %v", 7, finfo.Size())
}
if err1 != nil {
t.Fatal(err)
}
if finfo1.Size() != 5 {
t.Fatalf("expected size of file: %v, actual: %v", 5, finfo.Size())
}
}

51
client/driver/plugins.go Normal file
View File

@@ -0,0 +1,51 @@
package driver
import (
"io"
"log"
"net"
"github.com/hashicorp/go-plugin"
)
var HandshakeConfig = plugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "NOMAD_PLUGIN_MAGIC_COOKIE",
MagicCookieValue: "e4327c2e01eabfd75a8a67adb114fb34a757d57eee7728d857a8cec6e91a7255",
}
func GetPluginMap(w io.Writer) map[string]plugin.Plugin {
e := new(ExecutorPlugin)
e.logger = log.New(w, "", log.LstdFlags)
s := new(SyslogCollectorPlugin)
s.logger = log.New(w, "", log.LstdFlags)
return map[string]plugin.Plugin{
"executor": e,
"syslogcollector": s,
}
}
// ExecutorReattachConfig is the config that we seralize and de-serialize and
// store in disk
type PluginReattachConfig struct {
Pid int
AddrNet string
AddrName string
}
// PluginConfig returns a config from an ExecutorReattachConfig
func (c *PluginReattachConfig) PluginConfig() *plugin.ReattachConfig {
var addr net.Addr
switch c.AddrNet {
case "unix", "unixgram", "unixpacket":
addr, _ = net.ResolveUnixAddr(c.AddrNet, c.AddrName)
case "tcp", "tcp4", "tcp6":
addr, _ = net.ResolveTCPAddr(c.AddrNet, c.AddrName)
}
return &plugin.ReattachConfig{Pid: c.Pid, Addr: addr}
}
func NewPluginReattachConfig(c *plugin.ReattachConfig) *PluginReattachConfig {
return &PluginReattachConfig{Pid: c.Pid, AddrNet: c.Addr.Network(), AddrName: c.Addr.String()}
}

View File

@@ -208,6 +208,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
AllocDir: ctx.AllocDir,
TaskName: task.Name,
TaskResources: task.Resources,
LogConfig: task.LogConfig,
}
ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: args[0], Args: args[1:]}, executorCtx)
if err != nil {
@@ -235,7 +236,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
type qemuId struct {
KillTimeout time.Duration
UserPid int
PluginConfig *ExecutorReattachConfig
PluginConfig *PluginReattachConfig
AllocDir *allocdir.AllocDir
}
@@ -276,7 +277,7 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
func (h *qemuHandle) ID() string {
id := qemuId{
KillTimeout: h.killTimeout,
PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
AllocDir: h.allocDir,
}
@@ -295,6 +296,7 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult {
func (h *qemuHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
h.executor.UpdateLogConfig(task.LogConfig)
// Update is not possible
return nil

View File

@@ -49,6 +49,10 @@ func TestQemuDriver_StartOpen_Wait(t *testing.T) {
"web": 8080,
}},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 512,
@@ -101,6 +105,10 @@ func TestQemuDriver_RequiresMemory(t *testing.T) {
"checksum": "sha256:a5e836985934c3392cbbd9b26db55a7d35a8d7ae1deb7ca559dd9c0159572544",
// ssh u/p would be here
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
}
driverCtx, execCtx := testDriverContexts(task)

View File

@@ -114,6 +114,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
AllocDir: ctx.AllocDir,
TaskName: task.Name,
TaskResources: task.Resources,
LogConfig: task.LogConfig,
}
ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx)
if err != nil {
@@ -140,7 +141,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
type rawExecId struct {
KillTimeout time.Duration
UserPid int
PluginConfig *ExecutorReattachConfig
PluginConfig *PluginReattachConfig
AllocDir *allocdir.AllocDir
}
@@ -180,7 +181,7 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e
func (h *rawExecHandle) ID() string {
id := rawExecId{
KillTimeout: h.killTimeout,
PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
AllocDir: h.allocDir,
}
@@ -199,6 +200,7 @@ func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult {
func (h *rawExecHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
h.executor.UpdateLogConfig(task.LogConfig)
// Update is not possible
return nil

View File

@@ -61,6 +61,10 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) {
"command": testtask.Path(),
"args": []string{"sleep", "1s"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
testtask.SetTaskEnv(task)
@@ -109,6 +113,10 @@ func TestRawExecDriver_Start_Artifact_basic(t *testing.T) {
"command": file,
"args": []string{"sleep", "1s"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
testtask.SetTaskEnv(task)
@@ -156,6 +164,10 @@ func TestRawExecDriver_Start_Artifact_expanded(t *testing.T) {
"command": file,
"args": []string{"sleep", "1s"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
testtask.SetTaskEnv(task)
@@ -197,6 +209,10 @@ func TestRawExecDriver_Start_Wait(t *testing.T) {
"command": testtask.Path(),
"args": []string{"sleep", "1s"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
testtask.SetTaskEnv(task)
@@ -243,6 +259,10 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) {
"write", string(exp), outPath,
},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
testtask.SetTaskEnv(task)
@@ -289,6 +309,10 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) {
"command": testtask.Path(),
"args": []string{"sleep", "45s"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
testtask.SetTaskEnv(task)

View File

@@ -0,0 +1,69 @@
package driver
import (
"log"
"net/rpc"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/driver/logcollector"
"github.com/hashicorp/nomad/nomad/structs"
)
type SyslogCollectorRPC struct {
client *rpc.Client
}
type LaunchCollectorArgs struct {
Ctx *logcollector.LogCollectorContext
}
func (e *SyslogCollectorRPC) LaunchCollector(ctx *logcollector.LogCollectorContext) (*logcollector.SyslogCollectorState, error) {
var ss *logcollector.SyslogCollectorState
err := e.client.Call("Plugin.LaunchCollector", LaunchCollectorArgs{Ctx: ctx}, &ss)
return ss, err
}
func (e *SyslogCollectorRPC) Exit() error {
return e.client.Call("Plugin.Exit", new(interface{}), new(interface{}))
}
func (e *SyslogCollectorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error {
return e.client.Call("Plugin.UpdateLogConfig", logConfig, new(interface{}))
}
type SyslogCollectorRPCServer struct {
Impl logcollector.LogCollector
}
func (s *SyslogCollectorRPCServer) LaunchCollector(args LaunchCollectorArgs,
resp *logcollector.SyslogCollectorState) error {
ss, err := s.Impl.LaunchCollector(args.Ctx)
if ss != nil {
*resp = *ss
}
return err
}
func (s *SyslogCollectorRPCServer) Exit(args interface{}, resp *interface{}) error {
return s.Impl.Exit()
}
func (s *SyslogCollectorRPCServer) UpdateLogConfig(logConfig *structs.LogConfig, resp *interface{}) error {
return s.Impl.UpdateLogConfig(logConfig)
}
type SyslogCollectorPlugin struct {
logger *log.Logger
Impl *SyslogCollectorRPCServer
}
func (p *SyslogCollectorPlugin) Server(*plugin.MuxBroker) (interface{}, error) {
if p.Impl == nil {
p.Impl = &SyslogCollectorRPCServer{Impl: logcollector.NewSyslogCollector(p.logger)}
}
return p.Impl, nil
}
func (p *SyslogCollectorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {
return &SyslogCollectorRPC{client: c}, nil
}

View File

@@ -9,11 +9,13 @@ import (
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
"github.com/hashicorp/nomad/client/driver/logcollector"
)
// createExecutor launches an executor plugin and returns an instance of the
// Executor interface
func createExecutor(config *plugin.ClientConfig, w io.Writer, clientConfig *config.Config) (executor.Executor, *plugin.Client, error) {
func createExecutor(config *plugin.ClientConfig, w io.Writer,
clientConfig *config.Config) (executor.Executor, *plugin.Client, error) {
config.HandshakeConfig = HandshakeConfig
config.Plugins = GetPluginMap(w)
config.MaxPort = clientConfig.ClientMaxPort
@@ -39,6 +41,30 @@ func createExecutor(config *plugin.ClientConfig, w io.Writer, clientConfig *conf
return executorPlugin, executorClient, nil
}
func createLogCollector(config *plugin.ClientConfig, w io.Writer,
clientConfig *config.Config) (logcollector.LogCollector, *plugin.Client, error) {
config.HandshakeConfig = HandshakeConfig
config.Plugins = GetPluginMap(w)
config.MaxPort = clientConfig.ClientMaxPort
config.MinPort = clientConfig.ClientMinPort
if config.Cmd != nil {
isolateCommand(config.Cmd)
}
syslogClient := plugin.NewClient(config)
rpcCLient, err := syslogClient.Client()
if err != nil {
return nil, nil, fmt.Errorf("error creating rpc client for syslog plugin: %v", err)
}
raw, err := rpcCLient.Dispense("syslogcollector")
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the syslog plugin: %v", err)
}
logCollector := raw.(logcollector.LogCollector)
return logCollector, syslogClient, nil
}
// killProcess kills a process with the given pid
func killProcess(pid int) error {
proc, err := os.FindProcess(pid)

View File

@@ -158,7 +158,13 @@ job "example" {
}
}
}
# Specify configuration related to log rotation
# logs {
# max_files = 10
# max_file_size = 15
# }
# Controls the timeout between signalling a task it will be killed
# and killing the task. If not set a default is used.
# kill_timeout = "20s"

43
command/syslog_plugin.go Normal file
View File

@@ -0,0 +1,43 @@
package command
import (
"os"
"strings"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/driver"
)
type SyslogPluginCommand struct {
Meta
}
func (e *SyslogPluginCommand) Help() string {
helpText := `
This is a command used by Nomad internally to launch a syslog collector"
`
return strings.TrimSpace(helpText)
}
func (s *SyslogPluginCommand) Synopsis() string {
return "internal - lanch a syslog collector plugin"
}
func (s *SyslogPluginCommand) Run(args []string) int {
if len(args) == 0 {
s.Ui.Error("log output file isn't provided")
}
logFileName := args[0]
stdo, err := os.OpenFile(logFileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
s.Ui.Error(err.Error())
return 1
}
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: driver.HandshakeConfig,
Plugins: driver.GetPluginMap(stdo),
})
return 0
}

View File

@@ -44,6 +44,10 @@ func testJob(jobID string) *api.Job {
MemoryMB: 256,
DiskMB: 20,
CPU: 100,
}).
SetLogConfig(&api.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
})
group := api.NewTaskGroup("group1", 1).

View File

@@ -105,7 +105,11 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"syslog": func() (cli.Command, error) {
return &command.SyslogPluginCommand{
Meta: meta,
}, nil
},
"server-force-leave": func() (cli.Command, error) {
return &command.ServerForceLeaveCommand{
Meta: meta,

View File

@@ -401,6 +401,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
delete(m, "service")
delete(m, "meta")
delete(m, "resources")
delete(m, "logs")
// Build the task
var t structs.Task
@@ -484,6 +485,24 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
t.Resources = &r
}
// If we have logs then parse that
logConfig := structs.DefaultLogConfig()
if o := listVal.Filter("logs"); len(o.Items) > 0 {
if len(o.Items) > 1 {
return fmt.Errorf("only one logs block is allowed in a Task. Number of logs block found: %d", len(o.Items))
}
var m map[string]interface{}
logsBlock := o.Items[0]
if err := hcl.DecodeObject(&m, logsBlock.Val); err != nil {
return err
}
if err := mapstructure.WeakDecode(m, &logConfig); err != nil {
return err
}
}
t.LogConfig = logConfig
*result = append(*result, &t)
}

View File

@@ -58,6 +58,10 @@ func TestParse(t *testing.T) {
Meta: map[string]string{
"my-cool-key": "foobar",
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
},
},
},
@@ -123,6 +127,10 @@ func TestParse(t *testing.T) {
},
},
KillTimeout: 22 * time.Second,
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 100,
},
},
&structs.Task{
Name: "storagelocker",
@@ -143,6 +151,10 @@ func TestParse(t *testing.T) {
Operand: "=",
},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
},
},
},
@@ -284,6 +296,10 @@ func TestParse(t *testing.T) {
},
},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
},
},
},

View File

@@ -42,6 +42,10 @@ job "binstore-storagelocker" {
config {
image = "hashicorp/binstore"
}
logs {
max_files = 10
max_file_size = 100
}
env {
HELLO = "world"
LOREM = "ipsum"

View File

@@ -33,6 +33,7 @@ func RunCustom(args []string, commands map[string]cli.CommandFactory) int {
for k, _ := range commands {
switch k {
case "executor":
case "syslog":
default:
commandsInclude = append(commandsInclude, k)
}

View File

@@ -102,6 +102,7 @@ func Job() *structs.Job {
PortLabel: "admin",
},
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
@@ -177,6 +178,7 @@ func SystemJob() *structs.Job {
},
},
},
LogConfig: structs.DefaultLogConfig(),
},
},
},

View File

@@ -1533,6 +1533,32 @@ const (
DefaultKillTimeout = 5 * time.Second
)
// LogConfig provides configuration for log rotation
type LogConfig struct {
MaxFiles int `mapstructure:"max_files"`
MaxFileSizeMB int `mapstructure:"max_file_size"`
}
func DefaultLogConfig() *LogConfig {
return &LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
}
}
// Validate returns an error if the log config specified are less than
// the minimum allowed.
func (l *LogConfig) Validate() error {
var mErr multierror.Error
if l.MaxFiles < 1 {
mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum number of files is 1; got %d", l.MaxFiles))
}
if l.MaxFileSizeMB < 1 {
mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum file size is 1MB; got %d", l.MaxFileSizeMB))
}
return mErr.ErrorOrNil()
}
// Task is a single process typically that is executed as part of a task group.
type Task struct {
// Name of the task
@@ -1564,6 +1590,9 @@ type Task struct {
// KillTimeout is the time between signaling a task that it will be
// killed and killing it.
KillTimeout time.Duration `mapstructure:"kill_timeout"`
// LogConfig provides configuration for log rotation
LogConfig *LogConfig `mapstructure:"logs"`
}
func (t *Task) Copy() *Task {
@@ -1754,6 +1783,13 @@ func (t *Task) Validate() error {
mErr.Errors = append(mErr.Errors, err)
}
// Validate the log config
if t.LogConfig == nil {
mErr.Errors = append(mErr.Errors, errors.New("Missing Log Config"))
} else if err := t.LogConfig.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
for idx, constr := range t.Constraints {
if err := constr.Validate(); err != nil {
outer := fmt.Errorf("Constraint %d validation failed: %s", idx+1, err)
@@ -1766,6 +1802,10 @@ func (t *Task) Validate() error {
mErr.Errors = append(mErr.Errors, err)
}
}
if t.Resources.DiskMB <= (t.LogConfig.MaxFiles * t.LogConfig.MaxFileSizeMB) {
mErr.Errors = append(mErr.Errors, fmt.Errorf("log storage exceeds requested disk capacity"))
}
return mErr.ErrorOrNil()
}

View File

@@ -258,6 +258,7 @@ func TestTask_Validate(t *testing.T) {
MemoryMB: 100,
IOPS: 10,
},
LogConfig: DefaultLogConfig(),
}
err = task.Validate()
if err != nil {
@@ -265,6 +266,21 @@ func TestTask_Validate(t *testing.T) {
}
}
func TestTask_Validate_LogConfig(t *testing.T) {
task := &Task{
LogConfig: DefaultLogConfig(),
Resources: &Resources{
DiskMB: 1,
},
}
err := task.Validate()
mErr := err.(*multierror.Error)
if !strings.Contains(mErr.Errors[3].Error(), "log storage") {
t.Fatalf("err: %s", err)
}
}
func TestConstraint_Validate(t *testing.T) {
c := &Constraint{}
err := c.Validate()