From 4faf16b5c01f549d5c0901cd7e23decc65ef7036 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 24 Mar 2016 00:13:30 -0700 Subject: [PATCH] Added implementation to run checks for docker, exec and raw_exec --- client/driver/executor/checks.go | 237 +++++++++++++++++++++++++ client/driver/executor/checks_basic.go | 10 ++ client/driver/executor/checks_linux.go | 16 ++ client/driver/executor/checks_test.go | 37 ++++ client/driver/structs/structs.go | 12 ++ 5 files changed, 312 insertions(+) create mode 100644 client/driver/executor/checks.go create mode 100644 client/driver/executor/checks_basic.go create mode 100644 client/driver/executor/checks_linux.go create mode 100644 client/driver/executor/checks_test.go diff --git a/client/driver/executor/checks.go b/client/driver/executor/checks.go new file mode 100644 index 000000000..dc46d1062 --- /dev/null +++ b/client/driver/executor/checks.go @@ -0,0 +1,237 @@ +package executor + +import ( + "container/heap" + "fmt" + "log" + "os/exec" + "syscall" + "time" + + "github.com/armon/circbuf" + docker "github.com/fsouza/go-dockerclient" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" +) + +type Check interface { + Run() *cstructs.CheckResult + ID() string +} + +type DockerScriptCheck struct { + id string + containerID string + client *docker.Client + logger *log.Logger + script []string +} + +func (d *DockerScriptCheck) Run() *cstructs.CheckResult { + execOpts := docker.CreateExecOptions{ + AttachStdin: false, + AttachStdout: true, + AttachStderr: true, + Tty: false, + Cmd: d.script, + Container: d.containerID, + } + var ( + exec *docker.Exec + err error + execRes *docker.ExecInspect + time = time.Now() + ) + if exec, err = d.client.CreateExec(execOpts); err != nil { + return &cstructs.CheckResult{Err: err} + } + + output, _ := circbuf.NewBuffer(int64(cstructs.CheckBufSize)) + startOpts := docker.StartExecOptions{ + Detach: false, + Tty: false, + OutputStream: output, + ErrorStream: output, + } + + if err = d.client.StartExec(exec.ID, startOpts); err != nil { + return &cstructs.CheckResult{Err: err} + } + if execRes, err = d.client.InspectExec(exec.ID); err != nil { + return &cstructs.CheckResult{Err: err} + } + return &cstructs.CheckResult{ + ExitCode: execRes.ExitCode, + Output: string(output.Bytes()), + Timestamp: time, + } +} + +func (d *DockerScriptCheck) ID() string { + return d.id +} + +type ExecScriptCheck struct { + id string + cmd string + args []string + taskDir string + + ctx *ExecutorContext + FSIsolation bool +} + +func (e *ExecScriptCheck) Run() *cstructs.CheckResult { + buf, _ := circbuf.NewBuffer(int64(cstructs.CheckBufSize)) + cmd := exec.Command(e.cmd, e.args...) + cmd.Stdout = buf + cmd.Stderr = buf + e.setChroot(cmd) + ts := time.Now() + if err := cmd.Start(); err != nil { + return &cstructs.CheckResult{Err: err} + } + errCh := make(chan error, 2) + go func() { + errCh <- cmd.Wait() + }() + for { + select { + case err := <-errCh: + if err == nil { + return &cstructs.CheckResult{ExitCode: 0, Output: string(buf.Bytes()), Timestamp: ts} + } + exitCode := 1 + if exitErr, ok := err.(*exec.ExitError); ok { + if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { + exitCode = status.ExitStatus() + } + } + return &cstructs.CheckResult{ExitCode: exitCode, Output: string(buf.Bytes()), Timestamp: ts} + case <-time.After(30 * time.Second): + errCh <- fmt.Errorf("timed out after waiting 30s") + } + } + return nil +} + +func (e *ExecScriptCheck) ID() string { + return e.id +} + +type consulCheck struct { + check Check + next time.Time + index int +} + +type checkHeap struct { + index map[string]*consulCheck + heap checksHeapImp +} + +func NewConsulChecksHeap() *checkHeap { + return &checkHeap{ + index: make(map[string]*consulCheck), + heap: make(checksHeapImp, 0), + } +} + +func (c *checkHeap) Push(check Check, next time.Time) error { + if _, ok := c.index[check.ID()]; ok { + return fmt.Errorf("check %v already exists", check.ID()) + } + + cCheck := &consulCheck{check, next, 0} + + c.index[check.ID()] = cCheck + heap.Push(&c.heap, cCheck) + return nil +} + +func (c *checkHeap) Pop() *consulCheck { + if len(c.heap) == 0 { + return nil + } + + cCheck := heap.Pop(&c.heap).(*consulCheck) + delete(c.index, cCheck.check.ID()) + return cCheck +} + +func (c *checkHeap) Peek() *consulCheck { + if len(c.heap) == 0 { + return nil + } + return c.heap[0] +} + +func (c *checkHeap) Contains(check Check) bool { + _, ok := c.index[check.ID()] + return ok +} + +func (c *checkHeap) Update(check Check, next time.Time) error { + if cCheck, ok := c.index[check.ID()]; ok { + cCheck.check = check + cCheck.next = next + heap.Fix(&c.heap, cCheck.index) + return nil + } + + return fmt.Errorf("heap doesn't contain check %v", check.ID()) +} + +func (c *checkHeap) Remove(check Check) error { + if cCheck, ok := c.index[check.ID()]; ok { + heap.Remove(&c.heap, cCheck.index) + delete(c.index, check.ID()) + return nil + } + return fmt.Errorf("heap doesn't contain check %v", check.ID()) +} + +func (c *checkHeap) Len() int { return len(c.heap) } + +type checksHeapImp []*consulCheck + +func (h checksHeapImp) Len() int { return len(h) } + +func (h checksHeapImp) Less(i, j int) bool { + // Two zero times should return false. + // Otherwise, zero is "greater" than any other time. + // (To sort it at the end of the list.) + // Sort such that zero times are at the end of the list. + iZero, jZero := h[i].next.IsZero(), h[j].next.IsZero() + if iZero && jZero { + return false + } else if iZero { + return false + } else if jZero { + return true + } + + return h[i].next.Before(h[j].next) +} + +func (h checksHeapImp) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index = i + h[j].index = j +} + +func (h *checksHeapImp) Push(x interface{}) { + n := len(*h) + check := x.(*consulCheck) + check.index = n + *h = append(*h, check) +} + +func (h *checksHeapImp) Pop() interface{} { + old := *h + n := len(old) + check := old[n-1] + check.index = -1 // for safety + *h = old[0 : n-1] + return check +} diff --git a/client/driver/executor/checks_basic.go b/client/driver/executor/checks_basic.go new file mode 100644 index 000000000..c812bf6cc --- /dev/null +++ b/client/driver/executor/checks_basic.go @@ -0,0 +1,10 @@ +// +build !linux + +package executor + +import ( + "os/exec" +) + +func (e *ExecScriptCheck) setChroot(cmd *exec.Cmd) { +} diff --git a/client/driver/executor/checks_linux.go b/client/driver/executor/checks_linux.go new file mode 100644 index 000000000..d4a684a8b --- /dev/null +++ b/client/driver/executor/checks_linux.go @@ -0,0 +1,16 @@ +package executor + +import ( + "os/exec" + "syscall" +) + +func (e *ExecScriptCheck) setChroot(cmd *exec.Cmd) { + if e.FSIsolation { + if cmd.SysProcAttr == nil { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + } + cmd.SysProcAttr.Chroot = e.taskDir + cmd.Dir = "/" +} diff --git a/client/driver/executor/checks_test.go b/client/driver/executor/checks_test.go new file mode 100644 index 000000000..c959e7ee0 --- /dev/null +++ b/client/driver/executor/checks_test.go @@ -0,0 +1,37 @@ +package executor + +import ( + "reflect" + "testing" + "time" +) + +func TestCheckHeapOrder(t *testing.T) { + h := NewConsulChecksHeap() + + c1 := ExecScriptCheck{id: "a"} + c2 := ExecScriptCheck{id: "b"} + c3 := ExecScriptCheck{id: "c"} + + lookup := map[Check]string{ + &c1: "c1", + &c2: "c2", + &c3: "c3", + } + + h.Push(&c1, time.Time{}) + h.Push(&c2, time.Unix(10, 0)) + h.Push(&c3, time.Unix(11, 0)) + + expected := []string{"c2", "c3", "c1"} + var actual []string + for i := 0; i < 3; i++ { + cCheck := h.Pop() + + actual = append(actual, lookup[cCheck.check]) + } + + if !reflect.DeepEqual(actual, expected) { + t.Fatalf("Wrong ordering; got %v; want %v", actual, expected) + } +} diff --git a/client/driver/structs/structs.go b/client/driver/structs/structs.go index a23990427..60d4860eb 100644 --- a/client/driver/structs/structs.go +++ b/client/driver/structs/structs.go @@ -2,6 +2,7 @@ package structs import ( "fmt" + "time" cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" ) @@ -9,6 +10,9 @@ import ( const ( // The default user that the executor uses to run tasks DefaultUnpriviledgedUser = "nobody" + + // CheckBufSize is the size of the check output result + CheckBufSize = 4 * 1024 ) // WaitResult stores the result of a Wait operation. @@ -60,3 +64,11 @@ func NewRecoverableError(e error, recoverable bool) *RecoverableError { func (r *RecoverableError) Error() string { return r.Err.Error() } + +// CheckResult encapsulates the result of a check +type CheckResult struct { + ExitCode int + Output string + Timestamp time.Time + Err error +}