Added implementation to run checks for docker, exec and raw_exec

This commit is contained in:
Diptanu Choudhury
2016-03-24 00:13:30 -07:00
parent 2d472734a3
commit 4faf16b5c0
5 changed files with 312 additions and 0 deletions

View File

@@ -0,0 +1,237 @@
package executor
import (
"container/heap"
"fmt"
"log"
"os/exec"
"syscall"
"time"
"github.com/armon/circbuf"
docker "github.com/fsouza/go-dockerclient"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
)
type Check interface {
Run() *cstructs.CheckResult
ID() string
}
type DockerScriptCheck struct {
id string
containerID string
client *docker.Client
logger *log.Logger
script []string
}
func (d *DockerScriptCheck) Run() *cstructs.CheckResult {
execOpts := docker.CreateExecOptions{
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Tty: false,
Cmd: d.script,
Container: d.containerID,
}
var (
exec *docker.Exec
err error
execRes *docker.ExecInspect
time = time.Now()
)
if exec, err = d.client.CreateExec(execOpts); err != nil {
return &cstructs.CheckResult{Err: err}
}
output, _ := circbuf.NewBuffer(int64(cstructs.CheckBufSize))
startOpts := docker.StartExecOptions{
Detach: false,
Tty: false,
OutputStream: output,
ErrorStream: output,
}
if err = d.client.StartExec(exec.ID, startOpts); err != nil {
return &cstructs.CheckResult{Err: err}
}
if execRes, err = d.client.InspectExec(exec.ID); err != nil {
return &cstructs.CheckResult{Err: err}
}
return &cstructs.CheckResult{
ExitCode: execRes.ExitCode,
Output: string(output.Bytes()),
Timestamp: time,
}
}
func (d *DockerScriptCheck) ID() string {
return d.id
}
type ExecScriptCheck struct {
id string
cmd string
args []string
taskDir string
ctx *ExecutorContext
FSIsolation bool
}
func (e *ExecScriptCheck) Run() *cstructs.CheckResult {
buf, _ := circbuf.NewBuffer(int64(cstructs.CheckBufSize))
cmd := exec.Command(e.cmd, e.args...)
cmd.Stdout = buf
cmd.Stderr = buf
e.setChroot(cmd)
ts := time.Now()
if err := cmd.Start(); err != nil {
return &cstructs.CheckResult{Err: err}
}
errCh := make(chan error, 2)
go func() {
errCh <- cmd.Wait()
}()
for {
select {
case err := <-errCh:
if err == nil {
return &cstructs.CheckResult{ExitCode: 0, Output: string(buf.Bytes()), Timestamp: ts}
}
exitCode := 1
if exitErr, ok := err.(*exec.ExitError); ok {
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
exitCode = status.ExitStatus()
}
}
return &cstructs.CheckResult{ExitCode: exitCode, Output: string(buf.Bytes()), Timestamp: ts}
case <-time.After(30 * time.Second):
errCh <- fmt.Errorf("timed out after waiting 30s")
}
}
return nil
}
func (e *ExecScriptCheck) ID() string {
return e.id
}
type consulCheck struct {
check Check
next time.Time
index int
}
type checkHeap struct {
index map[string]*consulCheck
heap checksHeapImp
}
func NewConsulChecksHeap() *checkHeap {
return &checkHeap{
index: make(map[string]*consulCheck),
heap: make(checksHeapImp, 0),
}
}
func (c *checkHeap) Push(check Check, next time.Time) error {
if _, ok := c.index[check.ID()]; ok {
return fmt.Errorf("check %v already exists", check.ID())
}
cCheck := &consulCheck{check, next, 0}
c.index[check.ID()] = cCheck
heap.Push(&c.heap, cCheck)
return nil
}
func (c *checkHeap) Pop() *consulCheck {
if len(c.heap) == 0 {
return nil
}
cCheck := heap.Pop(&c.heap).(*consulCheck)
delete(c.index, cCheck.check.ID())
return cCheck
}
func (c *checkHeap) Peek() *consulCheck {
if len(c.heap) == 0 {
return nil
}
return c.heap[0]
}
func (c *checkHeap) Contains(check Check) bool {
_, ok := c.index[check.ID()]
return ok
}
func (c *checkHeap) Update(check Check, next time.Time) error {
if cCheck, ok := c.index[check.ID()]; ok {
cCheck.check = check
cCheck.next = next
heap.Fix(&c.heap, cCheck.index)
return nil
}
return fmt.Errorf("heap doesn't contain check %v", check.ID())
}
func (c *checkHeap) Remove(check Check) error {
if cCheck, ok := c.index[check.ID()]; ok {
heap.Remove(&c.heap, cCheck.index)
delete(c.index, check.ID())
return nil
}
return fmt.Errorf("heap doesn't contain check %v", check.ID())
}
func (c *checkHeap) Len() int { return len(c.heap) }
type checksHeapImp []*consulCheck
func (h checksHeapImp) Len() int { return len(h) }
func (h checksHeapImp) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
// Sort such that zero times are at the end of the list.
iZero, jZero := h[i].next.IsZero(), h[j].next.IsZero()
if iZero && jZero {
return false
} else if iZero {
return false
} else if jZero {
return true
}
return h[i].next.Before(h[j].next)
}
func (h checksHeapImp) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *checksHeapImp) Push(x interface{}) {
n := len(*h)
check := x.(*consulCheck)
check.index = n
*h = append(*h, check)
}
func (h *checksHeapImp) Pop() interface{} {
old := *h
n := len(old)
check := old[n-1]
check.index = -1 // for safety
*h = old[0 : n-1]
return check
}

View File

@@ -0,0 +1,10 @@
// +build !linux
package executor
import (
"os/exec"
)
func (e *ExecScriptCheck) setChroot(cmd *exec.Cmd) {
}

View File

@@ -0,0 +1,16 @@
package executor
import (
"os/exec"
"syscall"
)
func (e *ExecScriptCheck) setChroot(cmd *exec.Cmd) {
if e.FSIsolation {
if cmd.SysProcAttr == nil {
cmd.SysProcAttr = &syscall.SysProcAttr{}
}
}
cmd.SysProcAttr.Chroot = e.taskDir
cmd.Dir = "/"
}

View File

@@ -0,0 +1,37 @@
package executor
import (
"reflect"
"testing"
"time"
)
func TestCheckHeapOrder(t *testing.T) {
h := NewConsulChecksHeap()
c1 := ExecScriptCheck{id: "a"}
c2 := ExecScriptCheck{id: "b"}
c3 := ExecScriptCheck{id: "c"}
lookup := map[Check]string{
&c1: "c1",
&c2: "c2",
&c3: "c3",
}
h.Push(&c1, time.Time{})
h.Push(&c2, time.Unix(10, 0))
h.Push(&c3, time.Unix(11, 0))
expected := []string{"c2", "c3", "c1"}
var actual []string
for i := 0; i < 3; i++ {
cCheck := h.Pop()
actual = append(actual, lookup[cCheck.check])
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("Wrong ordering; got %v; want %v", actual, expected)
}
}

View File

@@ -2,6 +2,7 @@ package structs
import (
"fmt"
"time"
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)
@@ -9,6 +10,9 @@ import (
const (
// The default user that the executor uses to run tasks
DefaultUnpriviledgedUser = "nobody"
// CheckBufSize is the size of the check output result
CheckBufSize = 4 * 1024
)
// WaitResult stores the result of a Wait operation.
@@ -60,3 +64,11 @@ func NewRecoverableError(e error, recoverable bool) *RecoverableError {
func (r *RecoverableError) Error() string {
return r.Err.Error()
}
// CheckResult encapsulates the result of a check
type CheckResult struct {
ExitCode int
Output string
Timestamp time.Time
Err error
}