From 025f3b424a5ae86c92335ac3cb1efa031386cc0e Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 22 Aug 2016 11:34:24 -0500 Subject: [PATCH 1/2] blocking chained allocations until previous allocation hasn't terminated --- client/client.go | 28 ++++++ client/client_test.go | 93 +++++++++++++++++++ client/driver/driver.go | 13 +-- client/driver/mock_driver.go | 168 ++++++++++++++++++++++++++++++++++ client/task_runner.go | 4 +- nomad/structs/structs.go | 10 ++ nomad/structs/structs_test.go | 40 ++++++++ 7 files changed, 349 insertions(+), 7 deletions(-) create mode 100644 client/driver/mock_driver.go diff --git a/client/client.go b/client/client.go index d6fdbb1b0..17015da3c 100644 --- a/client/client.go +++ b/client/client.go @@ -128,6 +128,11 @@ type Client struct { allocs map[string]*AllocRunner allocLock sync.RWMutex + // blockedAllocations are allocations which are blocked because their + // chained allocations haven't finished running + blockedAllocations map[string]*structs.Allocation + blockedAllocsLock sync.RWMutex + // allocUpdates stores allocations that need to be synced to the server. allocUpdates chan *structs.Allocation @@ -155,6 +160,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg logger: logger, hostStatsCollector: stats.NewHostStatsCollector(), allocs: make(map[string]*AllocRunner), + blockedAllocations: make(map[string]*structs.Allocation), allocUpdates: make(chan *structs.Allocation, 64), shutdownCh: make(chan struct{}), } @@ -966,6 +972,18 @@ func (c *Client) allocSync() { case alloc := <-c.allocUpdates: // Batch the allocation updates until the timer triggers. updates[alloc.ID] = alloc + + // If this alloc was blocking another alloc and transitioned to a + // terminal state then start the blocked allocation + c.blockedAllocsLock.Lock() + if blockedAlloc, ok := c.blockedAllocations[alloc.ID]; ok && alloc.Terminated() { + if err := c.addAlloc(blockedAlloc); err != nil { + c.logger.Printf("[ERR] client: failed to add alloc '%s': %v", + blockedAlloc.ID, err) + } + delete(c.blockedAllocations, blockedAlloc.PreviousAllocation) + } + c.blockedAllocsLock.Unlock() case <-syncTicker.C: // Fast path if there are no updates if len(updates) == 0 { @@ -1191,6 +1209,16 @@ func (c *Client) runAllocs(update *allocUpdates) { // Start the new allocations for _, add := range diff.added { + // If the allocation is chanined and the previous allocation hasn't + // terminated yet, then add the alloc to the blocked queue. + if ar, ok := c.getAllocRunners()[add.PreviousAllocation]; ok && !ar.Alloc().Terminated() { + c.logger.Printf("[DEBUG] client: added alloc %q to blocked queue", add.ID) + c.blockedAllocsLock.Lock() + c.blockedAllocations[add.PreviousAllocation] = add + c.blockedAllocsLock.Unlock() + continue + } + if err := c.addAlloc(add); err != nil { c.logger.Printf("[ERR] client: failed to add alloc '%s': %v", add.ID, err) diff --git a/client/client_test.go b/client/client_test.go index 1df297271..1bc984bc3 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -598,3 +598,96 @@ func TestClient_Init(t *testing.T) { t.Fatalf("err: %s", err) } } + +func TestClient_BlockedAllocations(t *testing.T) { + s1, _ := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + c1 := testClient(t, func(c *config.Config) { + c.RPCHandler = s1 + }) + + // Wait for the node to be ready + state := s1.State() + testutil.WaitForResult(func() (bool, error) { + out, err := state.NodeByID(c1.Node().ID) + if err != nil { + return false, err + } + if out == nil || out.Status != structs.NodeStatusReady { + return false, fmt.Errorf("bad node: %#v", out) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Add an allocation + alloc := mock.Alloc() + alloc.NodeID = c1.Node().ID + alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + alloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "kill_after": "1s", + "run_for": "100s", + "exit_code": 0, + "exit_signal": 0, + "exit_err": "", + } + + state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) + state.UpsertAllocs(100, []*structs.Allocation{alloc}) + + // Wait until the client downloads and starts the allocation + testutil.WaitForResult(func() (bool, error) { + out, err := state.AllocByID(alloc.ID) + if err != nil { + return false, err + } + if out == nil || out.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("bad alloc: %#v", out) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Add a new chained alloc + alloc2 := alloc.Copy() + alloc2.ID = structs.GenerateUUID() + alloc2.Job = alloc.Job + alloc2.JobID = alloc.JobID + alloc2.PreviousAllocation = alloc.ID + if err := state.UpsertAllocs(200, []*structs.Allocation{alloc2}); err != nil { + t.Fatalf("err: %v", err) + } + + // Enusre that the chained allocation is being tracked as blocked + testutil.WaitForResult(func() (bool, error) { + alloc, ok := c1.blockedAllocations[alloc2.PreviousAllocation] + if ok && alloc.ID == alloc2.ID { + return true, nil + } + return false, fmt.Errorf("no blocked allocations") + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Change the desired state of the parent alloc to stop + alloc1 := alloc.Copy() + alloc1.DesiredStatus = structs.AllocDesiredStatusStop + if err := state.UpsertAllocs(300, []*structs.Allocation{alloc1}); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure that there are no blocked allocations + testutil.WaitForResult(func() (bool, error) { + _, ok := c1.blockedAllocations[alloc2.PreviousAllocation] + if ok { + return false, fmt.Errorf("blocked evals present") + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} diff --git a/client/driver/driver.go b/client/driver/driver.go index 112626585..cafe42105 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -19,12 +19,13 @@ import ( // BuiltinDrivers contains the built in registered drivers // which are available for allocation handling var BuiltinDrivers = map[string]Factory{ - "docker": NewDockerDriver, - "exec": NewExecDriver, - "raw_exec": NewRawExecDriver, - "java": NewJavaDriver, - "qemu": NewQemuDriver, - "rkt": NewRktDriver, + "docker": NewDockerDriver, + "exec": NewExecDriver, + "raw_exec": NewRawExecDriver, + "java": NewJavaDriver, + "qemu": NewQemuDriver, + "rkt": NewRktDriver, + "mock_driver": NewMockDriver, } // NewDriver is used to instantiate and return a new driver diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go new file mode 100644 index 000000000..8c0d23b79 --- /dev/null +++ b/client/driver/mock_driver.go @@ -0,0 +1,168 @@ +package driver + +import ( + "errors" + "log" + "time" + + "github.com/mitchellh/mapstructure" + + "github.com/hashicorp/nomad/client/config" + dstructs "github.com/hashicorp/nomad/client/driver/structs" + "github.com/hashicorp/nomad/client/fingerprint" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/structs" +) + +// MockDriverConfig is the driver configuration for the MockDriver +type MockDriverConfig struct { + + // KillAfter is the duration after which the mock driver indicates the task + // has exited after getting the initial SIGINT signal + KillAfter time.Duration `mapstructure:"kill_after"` + + // RunFor is the duration for which the fake task runs for. After this + // period the MockDriver responds to the task running indicating that the + // task has terminated + RunFor time.Duration `mapstructure:"run_for"` + + // ExitCode is the exit code with which the MockDriver indicates the task + // has exited + ExitCode int `mapstructure:"exit_code"` + + // ExitSignal is the signal with which the MockDriver indicates the task has + // been killed + ExitSignal int `mapstructure:"exit_signal"` + + // ExitErrMsg is the error message that the task returns while exiting + ExitErrMsg string `mapstructure:"exit_err_msg"` +} + +// MockDriver is a driver which is used for testing purposes +type MockDriver struct { + DriverContext + fingerprint.StaticFingerprinter +} + +// NewMockDriver is a factory method which returns a new Mock Driver +func NewMockDriver(ctx *DriverContext) Driver { + return &MockDriver{DriverContext: *ctx} +} + +// Start starts the mock driver +func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { + var driverConfig MockDriverConfig + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + WeaklyTypedInput: true, + Result: &driverConfig, + }) + if err != nil { + return nil, err + } + if err := dec.Decode(task.Config); err != nil { + return nil, err + } + + h := mockDriverHandle{ + taskName: task.Name, + runFor: driverConfig.RunFor, + killAfter: driverConfig.KillAfter, + killTimeout: task.KillTimeout, + exitCode: driverConfig.ExitCode, + exitSignal: driverConfig.ExitSignal, + logger: m.logger, + doneCh: make(chan struct{}), + waitCh: make(chan *dstructs.WaitResult, 1), + } + if driverConfig.ExitErrMsg != "" { + h.exitErr = errors.New(driverConfig.ExitErrMsg) + } + m.logger.Printf("[DEBUG] driver.mock: starting task %q", task.Name) + go h.run() + return &h, nil +} + +// TODO implement Open when we need it. +// Open re-connects the driver to the running task +func (m *MockDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { + return nil, nil +} + +// TODO implement Open when we need it. +// Validate validates the mock driver configuration +func (m *MockDriver) Validate(map[string]interface{}) error { + return nil +} + +// TODO implement Open when we need it. +// Fingerprint fingerprints a node and returns if MockDriver is enabled +func (m *MockDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, error) { + node.Attributes["driver.mock_driver"] = "1" + return true, nil +} + +// MockDriverHandle is a driver handler which supervises a mock task +type mockDriverHandle struct { + taskName string + runFor time.Duration + killAfter time.Duration + killTimeout time.Duration + exitCode int + exitSignal int + exitErr error + logger *log.Logger + waitCh chan *dstructs.WaitResult + doneCh chan struct{} +} + +// TODO Implement when we need it. +func (h *mockDriverHandle) ID() string { + return "" +} + +// TODO Implement when we need it. +func (h *mockDriverHandle) WaitCh() chan *dstructs.WaitResult { + return h.waitCh +} + +// TODO Implement when we need it. +func (h *mockDriverHandle) Update(task *structs.Task) error { + return nil +} + +// Kill kills a mock task +func (h *mockDriverHandle) Kill() error { + h.logger.Printf("[DEBUG] driver.mock: killing task %q after kill timeout: %v", h.taskName, h.killTimeout) + select { + case <-h.doneCh: + case <-time.After(h.killAfter): + close(h.doneCh) + case <-time.After(h.killTimeout): + h.logger.Printf("[DEBUG] driver.mock: terminating task %q", h.taskName) + close(h.doneCh) + } + return nil +} + +// TODO Implement when we need it. +func (h *mockDriverHandle) Stats() (*cstructs.TaskResourceUsage, error) { + return nil, nil +} + +// run waits for the configured amount of time and then indicates the task has +// terminated +func (h *mockDriverHandle) run() { + timer := time.NewTimer(h.runFor) + defer timer.Stop() + for { + select { + case <-timer.C: + close(h.doneCh) + case <-h.doneCh: + h.logger.Printf("[DEBUG] driver.mock: finished running task %q", h.taskName) + h.waitCh <- dstructs.NewWaitResult(h.exitCode, h.exitSignal, h.exitErr) + return + } + } +} diff --git a/client/task_runner.go b/client/task_runner.go index 89d5f9f14..37f228ac2 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -507,7 +507,9 @@ func (r *TaskRunner) collectResourceUsageStats(stopCollection <-chan struct{}) { r.resourceUsageLock.Lock() r.resourceUsage = ru r.resourceUsageLock.Unlock() - r.emitStats(ru) + if ru != nil { + r.emitStats(ru) + } case <-stopCollection: return } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5a25d99c6..b95b88d9a 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2632,6 +2632,16 @@ func (a *Allocation) TerminalStatus() bool { } } +// Terminated returns if the allocation is in a terminal state on a client. +func (a *Allocation) Terminated() bool { + if a.ClientStatus == AllocClientStatusFailed || + a.ClientStatus == AllocClientStatusComplete || + a.ClientStatus == AllocClientStatusLost { + return true + } + return false +} + // RanSuccessfully returns whether the client has ran the allocation and all // tasks finished successfully func (a *Allocation) RanSuccessfully() bool { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index ec0ee0aa3..413603d70 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1091,3 +1091,43 @@ func TestTaskArtifact_Validate_Checksum(t *testing.T) { } } } + +func TestAllocation_Terminated(t *testing.T) { + type desiredState struct { + ClientStatus string + DesiredStatus string + Terminated bool + } + + harness := []desiredState{ + { + ClientStatus: AllocClientStatusPending, + DesiredStatus: AllocDesiredStatusStop, + Terminated: false, + }, + { + ClientStatus: AllocClientStatusRunning, + DesiredStatus: AllocDesiredStatusStop, + Terminated: false, + }, + { + ClientStatus: AllocClientStatusFailed, + DesiredStatus: AllocDesiredStatusStop, + Terminated: true, + }, + { + ClientStatus: AllocClientStatusFailed, + DesiredStatus: AllocDesiredStatusRun, + Terminated: true, + }, + } + + for _, state := range harness { + alloc := Allocation{} + alloc.DesiredStatus = state.DesiredStatus + alloc.ClientStatus = state.ClientStatus + if alloc.Terminated() != state.Terminated { + t.Fatalf("expected: %v, actual: %v", state.Terminated, alloc.Terminated()) + } + } +} From 92fc6ba32f77a51c44b45f36155266a6c6c154e4 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 22 Aug 2016 15:02:28 -0500 Subject: [PATCH 2/2] Putting the mock driver behind a build flag --- client/client.go | 2 +- client/driver/driver.go | 13 ++++++------- client/driver/driver_mock.go | 8 ++++++++ client/driver/mock_driver.go | 2 ++ scripts/test.sh | 4 ++-- 5 files changed, 19 insertions(+), 10 deletions(-) create mode 100644 client/driver/driver_mock.go diff --git a/client/client.go b/client/client.go index 17015da3c..8379fd4bf 100644 --- a/client/client.go +++ b/client/client.go @@ -978,7 +978,7 @@ func (c *Client) allocSync() { c.blockedAllocsLock.Lock() if blockedAlloc, ok := c.blockedAllocations[alloc.ID]; ok && alloc.Terminated() { if err := c.addAlloc(blockedAlloc); err != nil { - c.logger.Printf("[ERR] client: failed to add alloc '%s': %v", + c.logger.Printf("[ERR] client: failed to add alloc which was previously blocked %q: %v", blockedAlloc.ID, err) } delete(c.blockedAllocations, blockedAlloc.PreviousAllocation) diff --git a/client/driver/driver.go b/client/driver/driver.go index cafe42105..112626585 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -19,13 +19,12 @@ import ( // BuiltinDrivers contains the built in registered drivers // which are available for allocation handling var BuiltinDrivers = map[string]Factory{ - "docker": NewDockerDriver, - "exec": NewExecDriver, - "raw_exec": NewRawExecDriver, - "java": NewJavaDriver, - "qemu": NewQemuDriver, - "rkt": NewRktDriver, - "mock_driver": NewMockDriver, + "docker": NewDockerDriver, + "exec": NewExecDriver, + "raw_exec": NewRawExecDriver, + "java": NewJavaDriver, + "qemu": NewQemuDriver, + "rkt": NewRktDriver, } // NewDriver is used to instantiate and return a new driver diff --git a/client/driver/driver_mock.go b/client/driver/driver_mock.go new file mode 100644 index 000000000..c0db54e85 --- /dev/null +++ b/client/driver/driver_mock.go @@ -0,0 +1,8 @@ +// +build nomad_test + +package driver + +// Add the mock driver +func init() { + BuiltinDrivers["mock_driver"] = NewMockDriver +} diff --git a/client/driver/mock_driver.go b/client/driver/mock_driver.go index 8c0d23b79..bc1004683 100644 --- a/client/driver/mock_driver.go +++ b/client/driver/mock_driver.go @@ -1,3 +1,5 @@ +// +build nomad_test + package driver import ( diff --git a/scripts/test.sh b/scripts/test.sh index 8e9622781..033876ddf 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -6,11 +6,11 @@ trap "rm -rf $TEMPDIR" EXIT HUP INT QUIT TERM # Build the Nomad binary for the API tests echo "--> Building nomad" -go build -o $TEMPDIR/nomad || exit 1 +go build -tags "nomad_test" -o $TEMPDIR/nomad || exit 1 # Run the tests echo "--> Running tests" GOBIN="`which go`" sudo -E PATH=$TEMPDIR:$PATH -E GOPATH=$GOPATH \ - $GOBIN test -v ${GOTEST_FLAGS:--cover -timeout=900s} $($GOBIN list ./... | grep -v /vendor/) + $GOBIN test -tags "nomad_test" -v ${GOTEST_FLAGS:--cover -timeout=900s} $($GOBIN list ./... | grep -v /vendor/)