From 2417ec56214120b33e2d90f02d0ad5c6ad7e87d4 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 11 Oct 2018 18:03:48 -0700 Subject: [PATCH 1/8] ar: fix task leader, update, and stop handling --- client/allocrunner/alloc_runner.go | 292 ++++++++++++------ client/allocrunner/alloc_runner_hooks.go | 2 - client/allocrunner/alloc_runner_test.go | 258 +++++++++++++++- client/allocrunner/taskrunner/task_runner.go | 26 +- .../taskrunner/task_runner_getters.go | 5 + 5 files changed, 448 insertions(+), 135 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 2e13092fc..c64ee3da6 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -34,9 +34,19 @@ type allocRunner struct { clientConfig *config.Config - // stateUpdater is used to emit updated task state + // stateUpdater is used to emit updated alloc state stateUpdater cinterfaces.AllocStateHandler + // taskStateUpdateCh is ticked whenever task state as changed. Must + // have len==1 to allow nonblocking notification of state updates while + // the goroutine is already processing a previous update. + taskStateUpdatedCh chan struct{} + + // taskStateUpdateHandlerCh is closed when thte task state handling + // goroutine exits. It is unsafe to destroy the local allocation state + // before this goroutine exits. + taskStateUpdateHandlerCh chan struct{} + // consulClient is the client used by the consul service hook for // registering services and checks consulClient consul.ConsulServiceAPI @@ -70,8 +80,7 @@ type allocRunner struct { runnerHooks []interfaces.RunnerHook // tasks are the set of task runners - tasks map[string]*taskrunner.TaskRunner - tasksLock sync.RWMutex + tasks map[string]*taskrunner.TaskRunner // allocBroadcaster sends client allocation updates to all listeners allocBroadcaster *cstructs.AllocBroadcaster @@ -94,19 +103,21 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { } ar := &allocRunner{ - id: alloc.ID, - alloc: alloc, - clientConfig: config.ClientConfig, - consulClient: config.Consul, - vaultClient: config.Vault, - tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), - waitCh: make(chan struct{}), - state: &state.State{}, - stateDB: config.StateDB, - stateUpdater: config.StateUpdater, - allocBroadcaster: cstructs.NewAllocBroadcaster(), - prevAllocWatcher: config.PrevAllocWatcher, - pluginSingletonLoader: config.PluginSingletonLoader, + id: alloc.ID, + alloc: alloc, + clientConfig: config.ClientConfig, + consulClient: config.Consul, + vaultClient: config.Vault, + tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), + waitCh: make(chan struct{}), + state: &state.State{}, + stateDB: config.StateDB, + stateUpdater: config.StateUpdater, + taskStateUpdatedCh: make(chan struct{}, 1), + taskStateUpdateHandlerCh: make(chan struct{}), + allocBroadcaster: cstructs.NewAllocBroadcaster(), + prevAllocWatcher: config.PrevAllocWatcher, + pluginSingletonLoader: config.PluginSingletonLoader, } // Create the logger based on the allocation ID @@ -157,32 +168,22 @@ func (ar *allocRunner) WaitCh() <-chan struct{} { return ar.waitCh } -// XXX How does alloc Restart work // Run is the main goroutine that executes all the tasks. func (ar *allocRunner) Run() { - // Close the wait channel + // Close the wait channel on return defer close(ar.waitCh) - var taskWaitCh <-chan struct{} + // Start the task state update handler + go ar.handleTaskStateUpdates() // Run the prestart hooks - // XXX Equivalent to TR.Prestart hook if err := ar.prerun(); err != nil { ar.logger.Error("prerun failed", "error", err) goto POST } - // Run the runners - taskWaitCh = ar.runImpl() - -MAIN: - for { - select { - case <-taskWaitCh: - // TaskRunners have all exited - break MAIN - } - } + // Run the runners and block until they exit + <-ar.runImpl() POST: // Run the postrun hooks @@ -244,71 +245,130 @@ func (ar *allocRunner) Restore() error { } // TaskStateUpdated is called by TaskRunner when a task's state has been -// updated. This hook is used to compute changes to the alloc's ClientStatus -// and to update the server with the new state. +// updated. It does not process the update synchronously but instead notifies a +// goroutine the state has change. Since processing the state change may cause +// the task to be killed (thus change its state again) it cannot be done +// synchronously as it would cause a deadlock due to reentrancy. +// +// The goroutine is used to compute changes to the alloc's ClientStatus and to +// update the server with the new state. func (ar *allocRunner) TaskStateUpdated(taskName string, state *structs.TaskState) { - // If a task is dead, we potentially want to kill other tasks in the group - if state.State == structs.TaskStateDead { - // Find all tasks that are not the one that is dead and check if the one - // that is dead is a leader - var otherTaskRunners []*taskrunner.TaskRunner - var otherTaskNames []string - leader := false + select { + case ar.taskStateUpdatedCh <- struct{}{}: + default: + // already pending updates + } +} + +// handleTaskStateUpdates must be run in goroutine as it monitors +// taskStateUpdateCh for task state update notifications and processes task +// states. +// +// Processing task state updates must be done in a goroutine as it may have to +// kill tasks which causes further task state updates. +func (ar *allocRunner) handleTaskStateUpdates() { + defer close(ar.taskStateUpdateHandlerCh) + + //TODO allocdirs are being left in working dir not temp! + //TODO remove taskname and state from TaskStateUpdated + //TODO Ensure Client updates don't callback into AR + + for done := false; !done; { + select { + case <-ar.taskStateUpdatedCh: + case <-ar.waitCh: + // Tasks have exited, run once more to ensure final + // states are collected. + done = true + } + + // Set with the appropriate event if task runners should be + // killed. + var killEvent *structs.TaskEvent + + // If task runners should be killed, this is set to the task + // name whose fault it is. + killTask := "" + + // True if task runners should be killed because a leader + // failed (informational). + leaderFailed := false + + // Task state has been updated; gather the state of the other tasks + trNum := len(ar.tasks) + liveRunners := make([]*taskrunner.TaskRunner, 0, trNum) + states := make(map[string]*structs.TaskState, trNum) + for name, tr := range ar.tasks { - if name != taskName { - otherTaskRunners = append(otherTaskRunners, tr) - otherTaskNames = append(otherTaskNames, name) - } else if tr.Task().Leader { - leader = true - } - } - - // If the task failed, we should kill all the other tasks in the task group. - if state.Failed { - if len(otherTaskRunners) > 0 { - ar.logger.Debug("task failure, destroying all tasks", "failed_task", taskName, "destroying", otherTaskNames) - } - for _, tr := range otherTaskRunners { - tr.Kill(context.Background(), structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName)) - } - } else if leader { - if len(otherTaskRunners) > 0 { - ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", taskName, "destroying", otherTaskNames) - } - // If the task was a leader task we should kill all the other tasks. - for _, tr := range otherTaskRunners { - tr.Kill(context.Background(), structs.NewTaskEvent(structs.TaskLeaderDead)) - } - } - } - - // Gather the state of the other tasks - ar.tasksLock.RLock() - states := make(map[string]*structs.TaskState, len(ar.tasks)) - for name, tr := range ar.tasks { - if name == taskName { + state := tr.TaskState() states[name] = state - } else { - states[name] = tr.TaskState() + + // Capture live task runners in case we need to kill them + if state.State != structs.TaskStateDead { + liveRunners = append(liveRunners, tr) + continue + } + + // Task is dead, determine if other tasks should be killed + if state.Failed { + // Only set failed event if no event has been + // set yet to give dead leaders priority. + if killEvent == nil { + killTask = name + killEvent = structs.NewTaskEvent(structs.TaskSiblingFailed). + SetFailedSibling(name) + } + } else if tr.IsLeader() { + killEvent = structs.NewTaskEvent(structs.TaskLeaderDead) + leaderFailed = true + killTask = name + } } + + // If there's a kill event set and live runners, kill them + if killEvent != nil && len(liveRunners) > 0 { + + // Log kill reason + if leaderFailed { + ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask) + } else { + ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask) + } + + // Kill live task runners + var leader *taskrunner.TaskRunner + for _, tr := range liveRunners { + if tr.IsLeader() { + // Capture the leader to kill last + leader = tr + continue + } + + tr.Kill(context.Background(), killEvent) + } + + // Kill leader last if it exists + if leader != nil { + leader.Kill(context.Background(), killEvent) + } + } + + // Get the client allocation + calloc := ar.clientAlloc(states) + + // Update the server + ar.stateUpdater.AllocStateUpdated(calloc) + + // Broadcast client alloc to listeners + ar.allocBroadcaster.Send(calloc) } - ar.tasksLock.RUnlock() - - // Get the client allocation - calloc := ar.clientAlloc(states) - - // Update the server - ar.stateUpdater.AllocStateUpdated(calloc) - - // Broadcast client alloc to listeners - ar.allocBroadcaster.Send(calloc) } // clientAlloc takes in the task states and returns an Allocation populated // with Client specific fields func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *structs.Allocation { - ar.stateLock.RLock() - defer ar.stateLock.RUnlock() + ar.stateLock.Lock() + defer ar.stateLock.Unlock() // store task states for AllocState to expose ar.state.TaskStates = taskStates @@ -405,12 +465,10 @@ func (ar *allocRunner) AllocState() *state.State { // If TaskStateUpdated has not been called yet, ar.state.TaskStates // won't be set as it is not the canonical source of TaskStates. if len(state.TaskStates) == 0 { - ar.tasksLock.RLock() ar.state.TaskStates = make(map[string]*structs.TaskState, len(ar.tasks)) for k, tr := range ar.tasks { state.TaskStates[k] = tr.TaskState() } - ar.tasksLock.RUnlock() } return state @@ -422,6 +480,9 @@ func (ar *allocRunner) AllocState() *state.State { // If there is already a pending update it will be discarded and replaced by // the latest update. func (ar *allocRunner) Update(update *structs.Allocation) { + // Detect Stop updates + stopping := !ar.Alloc().TerminalStatus() && update.TerminalStatus() + // Update ar.alloc ar.setAlloc(update) @@ -430,6 +491,34 @@ func (ar *allocRunner) Update(update *structs.Allocation) { ar.logger.Error("error running update hooks", "error", err) } + // If alloc is being terminated, kill all tasks, leader first + if stopping { + // Kill leader first + for name, tr := range ar.tasks { + if !tr.IsLeader() { + continue + } + + err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) + if err != nil { + ar.logger.Warn("error stopping leader task", "error", err, "task_name", name) + } + break + } + + // Kill the rest + for name, tr := range ar.tasks { + if tr.IsLeader() { + continue + } + + err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) + if err != nil { + ar.logger.Warn("error stopping task", "error", err, "task_name", name) + } + } + } + // Update task runners for _, tr := range ar.tasks { tr.Update(update) @@ -446,19 +535,21 @@ func (ar *allocRunner) Listener() *cstructs.AllocListener { // This method is safe for calling concurrently with Run() and will cause it to // exit (thus closing WaitCh). func (ar *allocRunner) Destroy() { + ar.destroyedLock.Lock() + if ar.destroyed { + // Only destroy once + ar.destroyedLock.Unlock() + return + } + defer ar.destroyedLock.Unlock() + // Stop tasks - ar.tasksLock.RLock() for name, tr := range ar.tasks { err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) - if err != nil { - if err == taskrunner.ErrTaskNotRunning { - ar.logger.Trace("task not running", "task_name", name) - } else { - ar.logger.Warn("failed to kill task", "error", err, "task_name", name) - } + if err != nil && err != taskrunner.ErrTaskNotRunning { + ar.logger.Warn("failed to kill task", "error", err, "task_name", name) } } - ar.tasksLock.RUnlock() // Wait for tasks to exit and postrun hooks to finish <-ar.waitCh @@ -468,15 +559,17 @@ func (ar *allocRunner) Destroy() { ar.logger.Warn("error running destroy hooks", "error", err) } + // Wait for task state update handler to exit before removing local + // state. + <-ar.taskStateUpdateHandlerCh + // Cleanup state db if err := ar.stateDB.DeleteAllocationBucket(ar.id); err != nil { ar.logger.Warn("failed to delete allocation state", "error", err) } // Mark alloc as destroyed - ar.destroyedLock.Lock() ar.destroyed = true - ar.destroyedLock.Unlock() } // IsDestroyed returns true if the alloc runner has been destroyed (stopped and @@ -514,9 +607,6 @@ func (ar *allocRunner) StatsReporter() interfaces.AllocStatsReporter { // LatestAllocStats returns the latest stats for an allocation. If taskFilter // is set, only stats for that task -- if it exists -- are returned. func (ar *allocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) { - ar.tasksLock.RLock() - defer ar.tasksLock.RUnlock() - astat := &cstructs.AllocResourceUsage{ Tasks: make(map[string]*cstructs.TaskResourceUsage, len(ar.tasks)), ResourceUsage: &cstructs.ResourceUsage{ diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 02372f9a8..5db5a5082 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -40,7 +40,6 @@ func (a *allocHealthSetter) SetHealth(healthy, isDeploy bool, trackerTaskEvents a.ar.stateLock.Unlock() // If deployment is unhealthy emit task events explaining why - a.ar.tasksLock.RLock() if !healthy && isDeploy { for task, event := range trackerTaskEvents { if tr, ok := a.ar.tasks[task]; ok { @@ -56,7 +55,6 @@ func (a *allocHealthSetter) SetHealth(healthy, isDeploy bool, trackerTaskEvents for name, tr := range a.ar.tasks { states[name] = tr.TaskState() } - a.ar.tasksLock.RUnlock() // Build the client allocation calloc := a.ar.clientAlloc(states) diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 0638d0382..3c50983a2 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -1,48 +1,272 @@ package allocrunner import ( + "fmt" + "sync" "testing" + "time" + "github.com/hashicorp/nomad/client/allocwatcher" "github.com/hashicorp/nomad/client/config" + consulapi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/state" + "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/shared/catalog" "github.com/hashicorp/nomad/plugins/shared/loader" "github.com/hashicorp/nomad/plugins/shared/singleton" + "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" ) +// MockStateUpdater implements the AllocStateHandler interface and records +// alloc updates. +type MockStateUpdater struct { + Updates []*structs.Allocation + mu sync.Mutex +} + +// AllocStateUpdated implements the AllocStateHandler interface and records an +// alloc update. +func (m *MockStateUpdater) AllocStateUpdated(alloc *structs.Allocation) { + m.mu.Lock() + m.Updates = append(m.Updates, alloc) + m.mu.Unlock() +} + +// Last returns a copy of the last alloc (or nil) update. Safe for concurrent +// access with updates. +func (m *MockStateUpdater) Last() *structs.Allocation { + m.mu.Lock() + defer m.mu.Unlock() + n := len(m.Updates) + if n == 0 { + return nil + } + return m.Updates[n-1].Copy() +} + +// Reset resets the recorded alloc updates. +func (m *MockStateUpdater) Reset() { + m.mu.Lock() + m.Updates = nil + m.mu.Unlock() +} + +// testAllocRunnerConfig returns a new allocrunner.Config with mocks and noop +// versions of dependencies. +func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) *Config { + logger := testlog.HCLogger(t) + return &Config{ + // Copy the alloc in case the caller edits and reuses it + Alloc: alloc.Copy(), + Logger: logger, + ClientConfig: config.TestClientConfig(), + StateDB: state.NoopDB{}, + Consul: consulapi.NewMockConsulServiceClient(t, logger), + Vault: vaultclient.NewMockVaultClient(), + StateUpdater: &MockStateUpdater{}, + PrevAllocWatcher: allocwatcher.NoopPrevAlloc{}, + PluginSingletonLoader: &loader.MockCatalog{}, + } +} + // TestAllocRunner_AllocState_Initialized asserts that getting TaskStates via // AllocState() are initialized even before the AllocRunner has run. func TestAllocRunner_AllocState_Initialized(t *testing.T) { - t.Skip("missing exec driver plugin implementation") t.Parallel() - alloc := mock.Alloc() - logger := testlog.HCLogger(t) - - conf := &Config{ - Alloc: alloc, - Logger: logger, - ClientConfig: config.TestClientConfig(), - StateDB: state.NoopDB{}, - Consul: nil, - Vault: nil, - StateUpdater: nil, - PrevAllocWatcher: nil, - PluginSingletonLoader: &loader.MockCatalog{}, - } + conf := testAllocRunnerConfig(t, mock.Alloc()) pluginLoader := catalog.TestPluginLoader(t) - conf.PluginSingletonLoader = singleton.NewSingletonLoader(logger, pluginLoader) + conf.PluginSingletonLoader = singleton.NewSingletonLoader(conf.Logger, pluginLoader) ar, err := NewAllocRunner(conf) require.NoError(t, err) allocState := ar.AllocState() require.NotNil(t, allocState) - require.NotNil(t, allocState.TaskStates[alloc.Job.TaskGroups[0].Tasks[0].Name]) + require.NotNil(t, allocState.TaskStates[conf.Alloc.Job.TaskGroups[0].Tasks[0].Name]) +} + +// TestAllocRunner_TaskLeader_KillTG asserts that when a leader task dies the +// entire task group is killed. +func TestAllocRunner_TaskLeader_KillTG(t *testing.T) { + t.Parallel() + + alloc := mock.BatchAlloc() + alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 + + // Create two tasks in the task group + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Name = "task1" + task.Driver = "mock_driver" + task.KillTimeout = 10 * time.Millisecond + task.Config = map[string]interface{}{ + "run_for": "10s", + } + + task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() + task2.Name = "task2" + task2.Driver = "mock_driver" + task2.Leader = true + task2.Config = map[string]interface{}{ + "run_for": "1s", + } + alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2) + alloc.TaskResources[task2.Name] = task2.Resources + + conf := testAllocRunnerConfig(t, alloc) + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer ar.Destroy() + go ar.Run() + + // Wait for all tasks to be killed + upd := conf.StateUpdater.(*MockStateUpdater) + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + if last.ClientStatus != structs.AllocClientStatusComplete { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete) + } + + // Task1 should be killed because Task2 exited + state1 := last.TaskStates[task.Name] + if state1.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead) + } + if state1.FinishedAt.IsZero() || state1.StartedAt.IsZero() { + return false, fmt.Errorf("expected to have a start and finish time") + } + if len(state1.Events) < 2 { + // At least have a received and destroyed + return false, fmt.Errorf("Unexpected number of events") + } + + found := false + for _, e := range state1.Events { + if e.Type != structs.TaskLeaderDead { + found = true + } + } + + if !found { + return false, fmt.Errorf("Did not find event %v", structs.TaskLeaderDead) + } + + // Task Two should be dead + state2 := last.TaskStates[task2.Name] + if state2.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead) + } + if state2.FinishedAt.IsZero() || state2.StartedAt.IsZero() { + return false, fmt.Errorf("expected to have a start and finish time") + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + +// TestAllocRunner_TaskLeader_StopTG asserts that when stopping an alloc with a +// leader the leader is stopped before other tasks. +func TestAllocRunner_TaskLeader_StopTG(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 + + // Create 3 tasks in the task group + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Name = "follower1" + task.Driver = "mock_driver" + task.KillTimeout = 10 * time.Millisecond + task.Config = map[string]interface{}{ + "run_for": "10s", + } + + task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() + task2.Name = "leader" + task2.Driver = "mock_driver" + task2.Leader = true + task2.KillTimeout = 10 * time.Millisecond + task2.Config = map[string]interface{}{ + "run_for": "10s", + } + + task3 := alloc.Job.TaskGroups[0].Tasks[0].Copy() + task3.Name = "follower2" + task3.Driver = "mock_driver" + task3.KillTimeout = 10 * time.Millisecond + task3.Config = map[string]interface{}{ + "run_for": "10s", + } + alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2, task3) + alloc.TaskResources[task2.Name] = task2.Resources + + conf := testAllocRunnerConfig(t, alloc) + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer ar.Destroy() + go ar.Run() + + // Wait for tasks to start + upd := conf.StateUpdater.(*MockStateUpdater) + last := upd.Last() + testutil.WaitForResult(func() (bool, error) { + last = upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + if n := len(last.TaskStates); n != 3 { + return false, fmt.Errorf("Not enough task states (want: 3; found %d)", n) + } + for name, state := range last.TaskStates { + if state.State != structs.TaskStateRunning { + return false, fmt.Errorf("Task %q is not running yet (it's %q)", name, state.State) + } + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Reset updates + upd.Reset() + + // Stop alloc + update := alloc.Copy() + update.DesiredStatus = structs.AllocDesiredStatusStop + ar.Update(update) + + // Wait for tasks to stop + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + if last.TaskStates["leader"].FinishedAt.UnixNano() >= last.TaskStates["follower1"].FinishedAt.UnixNano() { + return false, fmt.Errorf("expected leader to finish before follower1: %s >= %s", + last.TaskStates["leader"].FinishedAt, last.TaskStates["follower1"].FinishedAt) + } + if last.TaskStates["leader"].FinishedAt.UnixNano() >= last.TaskStates["follower2"].FinishedAt.UnixNano() { + return false, fmt.Errorf("expected leader to finish before follower2: %s >= %s", + last.TaskStates["leader"].FinishedAt, last.TaskStates["follower2"].FinishedAt) + } + return true, nil + }, func(err error) { + last := upd.Last() + for name, state := range last.TaskStates { + t.Logf("%s: %s", name, state.State) + } + t.Fatalf("err: %v", err) + }) } /* diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 8dffd65a4..f683544ba 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -52,10 +52,11 @@ const ( ) type TaskRunner struct { - // allocID and taskName are immutable so these fields may be accessed - // without locks - allocID string - taskName string + // allocID, taskName, and taskLeader are immutable so these fields may + // be accessed without locks + allocID string + taskName string + taskLeader bool alloc *structs.Allocation allocLock sync.Mutex @@ -203,6 +204,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { task: config.Task, taskDir: config.TaskDir, taskName: config.Task.Name, + taskLeader: config.Task.Leader, envBuilder: envBuilder, consulClient: config.Consul, vaultClient: config.VaultClient, @@ -398,15 +400,6 @@ func (tr *TaskRunner) handleUpdates() { return } - if tr.Alloc().TerminalStatus() { - // Terminal update: kill TaskRunner and let Run execute postrun hooks - err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) - if err != nil { - tr.logger.Warn("error stopping task", "error", err) - } - continue - } - // Non-terminal update; run hooks tr.updateHooks() } @@ -618,6 +611,7 @@ func (tr *TaskRunner) Restore() error { // update. func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) { tr.logger.Debug("setting task state", "state", state, "event", event.Type) + // Update the local state stateCopy := tr.setStateLocal(state, event) @@ -698,7 +692,6 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *str // logged. Use AppendEvent to simply add a new event. func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) { tr.stateLock.Lock() - defer tr.stateLock.Unlock() tr.appendEvent(event) @@ -708,8 +701,11 @@ func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) { tr.logger.Warn("error persisting event", "error", err, "event", event) } + stateCopy := tr.state.Copy() + tr.stateLock.Unlock() + // Notify the alloc runner of the event - tr.stateUpdater.TaskStateUpdated(tr.taskName, tr.state.Copy()) + tr.stateUpdater.TaskStateUpdated(tr.taskName, stateCopy) } // AppendEvent appends a new TaskEvent to this task's TaskState. The actual diff --git a/client/allocrunner/taskrunner/task_runner_getters.go b/client/allocrunner/taskrunner/task_runner_getters.go index 61eaf9711..3bce6ffcd 100644 --- a/client/allocrunner/taskrunner/task_runner_getters.go +++ b/client/allocrunner/taskrunner/task_runner_getters.go @@ -16,6 +16,11 @@ func (tr *TaskRunner) setAlloc(updated *structs.Allocation) { tr.allocLock.Unlock() } +// IsLeader returns true if this task is the leader of its task group. +func (tr *TaskRunner) IsLeader() bool { + return tr.taskLeader +} + func (tr *TaskRunner) Task() *structs.Task { tr.taskLock.RLock() defer tr.taskLock.RUnlock() From e029980b25da9adb8b3e677c538b0b9c2b4dd4ba Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 15 Oct 2018 20:07:16 -0700 Subject: [PATCH 2/8] tests: explicitly cleanup after clients --- client/acl_test.go | 16 ++--- client/alloc_endpoint_test.go | 21 +++--- client/allocrunner/alloc_runner_test.go | 40 +++++------ client/client_stats_endpoint_test.go | 7 +- client/client_test.go | 81 ++++++++++++----------- client/config/testing.go | 36 +++++++++- client/fingerprint_manager_test.go | 76 ++++++++------------- client/fs_endpoint_test.go | 88 ++++++++++++------------- client/rpc_test.go | 8 +-- client/testing.go | 20 ++++-- 10 files changed, 212 insertions(+), 181 deletions(-) diff --git a/client/acl_test.go b/client/acl_test.go index bb21fcb8b..12493e301 100644 --- a/client/acl_test.go +++ b/client/acl_test.go @@ -17,11 +17,11 @@ func TestClient_ACL_resolveTokenValue(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.RPCHandler = s1 c.ACLEnabled = true }) - defer c1.Shutdown() + defer cleanup() // Create a policy / token policy := mock.ACLPolicy() @@ -66,11 +66,11 @@ func TestClient_ACL_resolvePolicies(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.RPCHandler = s1 c.ACLEnabled = true }) - defer c1.Shutdown() + defer cleanup() // Create a policy / token policy := mock.ACLPolicy() @@ -106,10 +106,10 @@ func TestClient_ACL_ResolveToken_Disabled(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.RPCHandler = s1 }) - defer c1.Shutdown() + defer cleanup() // Should always get nil when disabled aclObj, err := c1.ResolveToken("blah") @@ -122,11 +122,11 @@ func TestClient_ACL_ResolveToken(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.RPCHandler = s1 c.ACLEnabled = true }) - defer c1.Shutdown() + defer cleanup() // Create a policy / token policy := mock.ACLPolicy() diff --git a/client/alloc_endpoint_test.go b/client/alloc_endpoint_test.go index b616a7c9a..f7ec3dc6c 100644 --- a/client/alloc_endpoint_test.go +++ b/client/alloc_endpoint_test.go @@ -16,7 +16,8 @@ import ( func TestAllocations_GarbageCollectAll(t *testing.T) { t.Parallel() require := require.New(t) - client := TestClient(t, nil) + client, cleanup := TestClient(t, nil) + defer cleanup() req := &nstructs.NodeSpecificRequest{} var resp nstructs.GenericResponse @@ -29,11 +30,11 @@ func TestAllocations_GarbageCollectAll_ACL(t *testing.T) { server, addr, root := testACLServer(t, nil) defer server.Shutdown() - client := TestClient(t, func(c *config.Config) { + client, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{addr} c.ACLEnabled = true }) - defer client.Shutdown() + defer cleanup() // Try request without a token and expect failure { @@ -79,9 +80,10 @@ func TestAllocations_GarbageCollect(t *testing.T) { t.Skip("missing mock driver plugin implementation") t.Parallel() require := require.New(t) - client := TestClient(t, func(c *config.Config) { + client, cleanup := TestClient(t, func(c *config.Config) { c.GCDiskUsageThreshold = 100.0 }) + defer cleanup() a := mock.Alloc() a.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" @@ -122,11 +124,11 @@ func TestAllocations_GarbageCollect_ACL(t *testing.T) { server, addr, root := testACLServer(t, nil) defer server.Shutdown() - client := TestClient(t, func(c *config.Config) { + client, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{addr} c.ACLEnabled = true }) - defer client.Shutdown() + defer cleanup() // Try request without a token and expect failure { @@ -178,7 +180,8 @@ func TestAllocations_Stats(t *testing.T) { t.Skip("missing exec driver plugin implementation") t.Parallel() require := require.New(t) - client := TestClient(t, nil) + client, cleanup := TestClient(t, nil) + defer cleanup() a := mock.Alloc() require.Nil(client.addAlloc(a, "")) @@ -213,11 +216,11 @@ func TestAllocations_Stats_ACL(t *testing.T) { server, addr, root := testACLServer(t, nil) defer server.Shutdown() - client := TestClient(t, func(c *config.Config) { + client, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{addr} c.ACLEnabled = true }) - defer client.Shutdown() + defer cleanup() // Try request without a token and expect failure { diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 3c50983a2..92b8ddf27 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -15,7 +15,6 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/shared/catalog" - "github.com/hashicorp/nomad/plugins/shared/loader" "github.com/hashicorp/nomad/plugins/shared/singleton" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" @@ -56,21 +55,24 @@ func (m *MockStateUpdater) Reset() { } // testAllocRunnerConfig returns a new allocrunner.Config with mocks and noop -// versions of dependencies. -func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) *Config { +// versions of dependencies along with a cleanup func. +func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, func()) { logger := testlog.HCLogger(t) - return &Config{ + pluginLoader := catalog.TestPluginLoader(t) + clientConf, cleanup := config.TestClientConfig(t) + conf := &Config{ // Copy the alloc in case the caller edits and reuses it Alloc: alloc.Copy(), Logger: logger, - ClientConfig: config.TestClientConfig(), + ClientConfig: clientConf, StateDB: state.NoopDB{}, Consul: consulapi.NewMockConsulServiceClient(t, logger), Vault: vaultclient.NewMockVaultClient(), StateUpdater: &MockStateUpdater{}, PrevAllocWatcher: allocwatcher.NoopPrevAlloc{}, - PluginSingletonLoader: &loader.MockCatalog{}, + PluginSingletonLoader: singleton.NewSingletonLoader(logger, pluginLoader), } + return conf, cleanup } // TestAllocRunner_AllocState_Initialized asserts that getting TaskStates via @@ -78,10 +80,11 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) *Config { func TestAllocRunner_AllocState_Initialized(t *testing.T) { t.Parallel() - conf := testAllocRunnerConfig(t, mock.Alloc()) + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() - pluginLoader := catalog.TestPluginLoader(t) - conf.PluginSingletonLoader = singleton.NewSingletonLoader(conf.Logger, pluginLoader) ar, err := NewAllocRunner(conf) require.NoError(t, err) @@ -105,7 +108,7 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) { task.Driver = "mock_driver" task.KillTimeout = 10 * time.Millisecond task.Config = map[string]interface{}{ - "run_for": "10s", + "run_for": 10 * time.Second, } task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() @@ -113,12 +116,13 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) { task2.Driver = "mock_driver" task2.Leader = true task2.Config = map[string]interface{}{ - "run_for": "1s", + "run_for": 1 * time.Second, } alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2) alloc.TaskResources[task2.Name] = task2.Resources - conf := testAllocRunnerConfig(t, alloc) + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() ar, err := NewAllocRunner(conf) require.NoError(t, err) defer ar.Destroy() @@ -186,31 +190,29 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) { task := alloc.Job.TaskGroups[0].Tasks[0] task.Name = "follower1" task.Driver = "mock_driver" - task.KillTimeout = 10 * time.Millisecond task.Config = map[string]interface{}{ - "run_for": "10s", + "run_for": 10 * time.Second, } task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() task2.Name = "leader" task2.Driver = "mock_driver" task2.Leader = true - task2.KillTimeout = 10 * time.Millisecond task2.Config = map[string]interface{}{ - "run_for": "10s", + "run_for": 10 * time.Second, } task3 := alloc.Job.TaskGroups[0].Tasks[0].Copy() task3.Name = "follower2" task3.Driver = "mock_driver" - task3.KillTimeout = 10 * time.Millisecond task3.Config = map[string]interface{}{ - "run_for": "10s", + "run_for": 10 * time.Second, } alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2, task3) alloc.TaskResources[task2.Name] = task2.Resources - conf := testAllocRunnerConfig(t, alloc) + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() ar, err := NewAllocRunner(conf) require.NoError(t, err) defer ar.Destroy() diff --git a/client/client_stats_endpoint_test.go b/client/client_stats_endpoint_test.go index c16f91f3d..2349d26ad 100644 --- a/client/client_stats_endpoint_test.go +++ b/client/client_stats_endpoint_test.go @@ -14,7 +14,8 @@ import ( func TestClientStats_Stats(t *testing.T) { t.Parallel() require := require.New(t) - client := TestClient(t, nil) + client, cleanup := TestClient(t, nil) + defer cleanup() req := &nstructs.NodeSpecificRequest{} var resp structs.ClientStatsResponse @@ -30,11 +31,11 @@ func TestClientStats_Stats_ACL(t *testing.T) { server, addr, root := testACLServer(t, nil) defer server.Shutdown() - client := TestClient(t, func(c *config.Config) { + client, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{addr} c.ACLEnabled = true }) - defer client.Shutdown() + defer cleanup() // Try request without a token and expect failure { diff --git a/client/client_test.go b/client/client_test.go index 5b293a261..58020ea6f 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -37,7 +37,8 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) { func TestClient_StartStop(t *testing.T) { t.Parallel() - client := TestClient(t, nil) + client, cleanup := TestClient(t, nil) + defer cleanup() if err := client.Shutdown(); err != nil { t.Fatalf("err: %v", err) } @@ -49,10 +50,11 @@ func TestClient_BaseLabels(t *testing.T) { t.Parallel() assert := assert.New(t) - client := TestClient(t, nil) + client, cleanup := TestClient(t, nil) if err := client.Shutdown(); err != nil { t.Fatalf("err: %v", err) } + defer cleanup() // directly invoke this function, as otherwise this will fail on a CI build // due to a race condition @@ -74,10 +76,10 @@ func TestClient_RPC(t *testing.T) { s1, addr := testServer(t, nil) defer s1.Shutdown() - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{addr} }) - defer c1.Shutdown() + defer cleanup() // RPC should succeed testutil.WaitForResult(func() (bool, error) { @@ -94,10 +96,10 @@ func TestClient_RPC_FireRetryWatchers(t *testing.T) { s1, addr := testServer(t, nil) defer s1.Shutdown() - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{addr} }) - defer c1.Shutdown() + defer cleanup() watcher := c1.rpcRetryWatcher() @@ -122,10 +124,10 @@ func TestClient_RPC_Passthrough(t *testing.T) { s1, _ := testServer(t, nil) defer s1.Shutdown() - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.RPCHandler = s1 }) - defer c1.Shutdown() + defer cleanup() // RPC should succeed testutil.WaitForResult(func() (bool, error) { @@ -140,8 +142,8 @@ func TestClient_RPC_Passthrough(t *testing.T) { func TestClient_Fingerprint(t *testing.T) { t.Parallel() - c := TestClient(t, nil) - defer c.Shutdown() + c, cleanup := TestClient(t, nil) + defer cleanup() // Ensure we are fingerprinting testutil.WaitForResult(func() (bool, error) { @@ -162,13 +164,13 @@ func TestClient_Fingerprint_Periodic(t *testing.T) { t.Skip("missing mock driver plugin implementation") t.Parallel() - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.Options = map[string]string{ driver.ShutdownPeriodicAfter: "true", driver.ShutdownPeriodicDuration: "1", } }) - defer c1.Shutdown() + defer cleanup() node := c1.config.Node { @@ -251,10 +253,10 @@ func TestClient_MixedTLS(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{addr} }) - defer c1.Shutdown() + defer cleanup() req := structs.NodeSpecificRequest{ NodeID: c1.Node().ID, @@ -301,7 +303,7 @@ func TestClient_BadTLS(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{addr} c.TLSConfig = &nconfig.TLSConfig{ EnableHTTP: true, @@ -312,7 +314,7 @@ func TestClient_BadTLS(t *testing.T) { KeyFile: badkey, } }) - defer c1.Shutdown() + defer cleanup() req := structs.NodeSpecificRequest{ NodeID: c1.Node().ID, @@ -339,10 +341,10 @@ func TestClient_Register(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.RPCHandler = s1 }) - defer c1.Shutdown() + defer cleanup() req := structs.NodeSpecificRequest{ NodeID: c1.Node().ID, @@ -373,10 +375,10 @@ func TestClient_Heartbeat(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.RPCHandler = s1 }) - defer c1.Shutdown() + defer cleanup() req := structs.NodeSpecificRequest{ NodeID: c1.Node().ID, @@ -406,10 +408,10 @@ func TestClient_UpdateAllocStatus(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.RPCHandler = s1 }) - defer c1.Shutdown() + defer cleanup() // Wait until the node is ready waitTilNodeReady(c1, t) @@ -457,10 +459,10 @@ func TestClient_WatchAllocs(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.RPCHandler = s1 }) - defer c1.Shutdown() + defer cleanup() // Wait until the node is ready waitTilNodeReady(c1, t) @@ -552,11 +554,11 @@ func TestClient_SaveRestoreState(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.DevMode = false c.RPCHandler = s1 }) - defer c1.Shutdown() + defer cleanup() // Wait until the node is ready waitTilNodeReady(c1, t) @@ -670,10 +672,10 @@ func TestClient_BlockedAllocations(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.RPCHandler = s1 }) - defer c1.Shutdown() + defer cleanup() // Wait for the node to be ready state := s1.State() @@ -781,10 +783,10 @@ func TestClient_ValidateMigrateToken_ValidToken(t *testing.T) { t.Parallel() assert := assert.New(t) - c := TestClient(t, func(c *config.Config) { + c, cleanup := TestClient(t, func(c *config.Config) { c.ACLEnabled = true }) - defer c.Shutdown() + defer cleanup() alloc := mock.Alloc() validToken, err := structs.GenerateMigrateToken(alloc.ID, c.secretNodeID()) @@ -797,10 +799,10 @@ func TestClient_ValidateMigrateToken_InvalidToken(t *testing.T) { t.Parallel() assert := assert.New(t) - c := TestClient(t, func(c *config.Config) { + c, cleanup := TestClient(t, func(c *config.Config) { c.ACLEnabled = true }) - defer c.Shutdown() + defer cleanup() assert.Equal(c.ValidateMigrateToken("", ""), false) @@ -813,8 +815,8 @@ func TestClient_ValidateMigrateToken_ACLDisabled(t *testing.T) { t.Parallel() assert := assert.New(t) - c := TestClient(t, func(c *config.Config) {}) - defer c.Shutdown() + c, cleanup := TestClient(t, func(c *config.Config) {}) + defer cleanup() assert.Equal(c.ValidateMigrateToken("", ""), true) } @@ -835,10 +837,10 @@ func TestClient_ReloadTLS_UpgradePlaintextToTLS(t *testing.T) { fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem" ) - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{addr} }) - defer c1.Shutdown() + defer cleanup() // Registering a node over plaintext should succeed { @@ -911,7 +913,7 @@ func TestClient_ReloadTLS_DowngradeTLSToPlaintext(t *testing.T) { fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem" ) - c1 := TestClient(t, func(c *config.Config) { + c1, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{addr} c.TLSConfig = &nconfig.TLSConfig{ EnableHTTP: true, @@ -922,7 +924,7 @@ func TestClient_ReloadTLS_DowngradeTLSToPlaintext(t *testing.T) { KeyFile: fookey, } }) - defer c1.Shutdown() + defer cleanup() // assert that when one node is running in encrypted mode, a RPC request to a // node running in plaintext mode should fail @@ -974,7 +976,8 @@ func TestClient_ReloadTLS_DowngradeTLSToPlaintext(t *testing.T) { // nomad server list. func TestClient_ServerList(t *testing.T) { t.Parallel() - client := TestClient(t, func(c *config.Config) {}) + client, cleanup := TestClient(t, func(c *config.Config) {}) + defer cleanup() if s := client.GetServers(); len(s) != 0 { t.Fatalf("expected server lit to be empty but found: %+q", s) diff --git a/client/config/testing.go b/client/config/testing.go index c98cad8c9..8281938ff 100644 --- a/client/config/testing.go +++ b/client/config/testing.go @@ -1,13 +1,43 @@ package config import ( + "io/ioutil" + "os" + "path/filepath" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" + "github.com/mitchellh/go-testing-interface" ) -// TestClientConfig returns a default client configuration for test clients. -func TestClientConfig() *Config { +// TestClientConfig returns a default client configuration for test clients and +// a cleanup func to remove the state and alloc dirs when finished. +func TestClientConfig(t testing.T) (*Config, func()) { conf := DefaultConfig() + + // Create a tempdir to hold state and alloc subdirs + parent, err := ioutil.TempDir("", "nomadtest") + if err != nil { + t.Fatalf("error creating client dir: %v", err) + } + cleanup := func() { + os.RemoveAll(parent) + } + + allocDir := filepath.Join(parent, "allocs") + if err := os.Mkdir(allocDir, 0777); err != nil { + cleanup() + t.Fatalf("error creating alloc dir: %v", err) + } + conf.AllocDir = allocDir + + stateDir := filepath.Join(parent, "client") + if err := os.Mkdir(stateDir, 0777); err != nil { + cleanup() + t.Fatalf("error creating alloc dir: %v", err) + } + conf.StateDir = stateDir + conf.VaultConfig.Enabled = helper.BoolToPtr(false) conf.DevMode = true conf.Node = &structs.Node{ @@ -19,5 +49,5 @@ func TestClientConfig() *Config { // Loosen GC threshold conf.GCDiskUsageThreshold = 98.0 conf.GCInodeUsageThreshold = 98.0 - return conf + return conf, cleanup } diff --git a/client/fingerprint_manager_test.go b/client/fingerprint_manager_test.go index 7b0804399..50fc114ea 100644 --- a/client/fingerprint_manager_test.go +++ b/client/fingerprint_manager_test.go @@ -20,10 +20,10 @@ func TestFingerprintManager_Run_MockDriver(t *testing.T) { t.Skip("missing mock driver plugin implementation") t.Parallel() require := require.New(t) - testClient := TestClient(t, nil) + testClient, cleanup := TestClient(t, nil) testClient.logger = testlog.HCLogger(t) - defer testClient.Shutdown() + defer cleanup() fm := NewFingerprintManager( testClient.config.PluginSingletonLoader, @@ -48,10 +48,8 @@ func TestFingerprintManager_Run_MockDriver(t *testing.T) { func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) { t.Parallel() require := require.New(t) - testClient := TestClient(t, nil) - - testClient.logger = testlog.HCLogger(t) - defer testClient.Shutdown() + testClient, cleanup := TestClient(t, nil) + defer cleanup() fm := NewFingerprintManager( testClient.config.PluginSingletonLoader, @@ -76,14 +74,12 @@ func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) { func TestFingerprintManager_Fingerprint_Run(t *testing.T) { t.Parallel() require := require.New(t) - testClient := TestClient(t, func(c *config.Config) { + testClient, cleanup := TestClient(t, func(c *config.Config) { c.Options = map[string]string{ "driver.raw_exec.enable": "true", } }) - - testClient.logger = testlog.HCLogger(t) - defer testClient.Shutdown() + defer cleanup() fm := NewFingerprintManager( testClient.config.PluginSingletonLoader, @@ -109,15 +105,13 @@ func TestFingerprintManager_Fingerprint_Periodic(t *testing.T) { t.Skip("missing mock driver plugin implementation") t.Parallel() require := require.New(t) - testClient := TestClient(t, func(c *config.Config) { + testClient, cleanup := TestClient(t, func(c *config.Config) { c.Options = map[string]string{ "test.shutdown_periodic_after": "true", "test.shutdown_periodic_duration": "2", } }) - - testClient.logger = testlog.HCLogger(t) - defer testClient.Shutdown() + defer cleanup() fm := NewFingerprintManager( testClient.config.PluginSingletonLoader, @@ -172,16 +166,14 @@ func TestFingerprintManager_HealthCheck_Driver(t *testing.T) { t.Skip("missing mock driver plugin implementation") t.Parallel() require := require.New(t) - testClient := TestClient(t, func(c *config.Config) { + testClient, cleanup := TestClient(t, func(c *config.Config) { c.Options = map[string]string{ "driver.raw_exec.enable": "1", "test.shutdown_periodic_after": "true", "test.shutdown_periodic_duration": "2", } }) - - testClient.logger = testlog.HCLogger(t) - defer testClient.Shutdown() + defer cleanup() fm := NewFingerprintManager( testClient.config.PluginSingletonLoader, @@ -275,15 +267,13 @@ func TestFingerprintManager_HealthCheck_Periodic(t *testing.T) { t.Skip("missing mock driver plugin implementation") t.Parallel() require := require.New(t) - testClient := TestClient(t, func(c *config.Config) { + testClient, cleanup := TestClient(t, func(c *config.Config) { c.Options = map[string]string{ "test.shutdown_periodic_after": "true", "test.shutdown_periodic_duration": "2", } }) - - testClient.logger = testlog.HCLogger(t) - defer testClient.Shutdown() + defer cleanup() fm := NewFingerprintManager( testClient.config.PluginSingletonLoader, @@ -373,15 +363,13 @@ func TestFimgerprintManager_Run_InWhitelist(t *testing.T) { t.Parallel() require := require.New(t) - testClient := TestClient(t, func(c *config.Config) { + testClient, cleanup := TestClient(t, func(c *config.Config) { c.Options = map[string]string{ "test.shutdown_periodic_after": "true", "test.shutdown_periodic_duration": "2", } }) - - testClient.logger = testlog.HCLogger(t) - defer testClient.Shutdown() + defer cleanup() fm := NewFingerprintManager( testClient.config.PluginSingletonLoader, @@ -404,15 +392,13 @@ func TestFimgerprintManager_Run_InWhitelist(t *testing.T) { func TestFingerprintManager_Run_InBlacklist(t *testing.T) { t.Parallel() require := require.New(t) - testClient := TestClient(t, func(c *config.Config) { + testClient, cleanup := TestClient(t, func(c *config.Config) { c.Options = map[string]string{ "fingerprint.whitelist": " arch,memory,foo,bar ", "fingerprint.blacklist": " cpu ", } }) - - testClient.logger = testlog.HCLogger(t) - defer testClient.Shutdown() + defer cleanup() fm := NewFingerprintManager( testClient.config.PluginSingletonLoader, @@ -437,15 +423,13 @@ func TestFingerprintManager_Run_Combination(t *testing.T) { t.Parallel() require := require.New(t) - testClient := TestClient(t, func(c *config.Config) { + testClient, cleanup := TestClient(t, func(c *config.Config) { c.Options = map[string]string{ "fingerprint.whitelist": " arch,cpu,memory,foo,bar ", "fingerprint.blacklist": " memory,nomad ", } }) - - testClient.logger = testlog.HCLogger(t) - defer testClient.Shutdown() + defer cleanup() fm := NewFingerprintManager( testClient.config.PluginSingletonLoader, @@ -471,15 +455,13 @@ func TestFingerprintManager_Run_Combination(t *testing.T) { func TestFingerprintManager_Run_WhitelistDrivers(t *testing.T) { t.Parallel() require := require.New(t) - testClient := TestClient(t, func(c *config.Config) { + testClient, cleanup := TestClient(t, func(c *config.Config) { c.Options = map[string]string{ "driver.raw_exec.enable": "1", "driver.whitelist": " raw_exec , foo ", } }) - - testClient.logger = testlog.HCLogger(t) - defer testClient.Shutdown() + defer cleanup() fm := NewFingerprintManager( testClient.config.PluginSingletonLoader, @@ -503,15 +485,13 @@ func TestFingerprintManager_Run_AllDriversBlacklisted(t *testing.T) { t.Parallel() require := require.New(t) - testClient := TestClient(t, func(c *config.Config) { + testClient, cleanup := TestClient(t, func(c *config.Config) { c.Options = map[string]string{ "driver.raw_exec.enable": "1", "driver.whitelist": " foo,bar,baz ", } }) - - testClient.logger = testlog.HCLogger(t) - defer testClient.Shutdown() + defer cleanup() fm := NewFingerprintManager( testClient.config.PluginSingletonLoader, @@ -538,7 +518,7 @@ func TestFingerprintManager_Run_DriversWhiteListBlacklistCombination(t *testing. t.Parallel() require := require.New(t) - testClient := TestClient(t, func(c *config.Config) { + testClient, cleanup := TestClient(t, func(c *config.Config) { c.Options = map[string]string{ "driver.raw_exec.enable": "1", "driver.whitelist": " raw_exec,exec,foo,bar,baz ", @@ -547,7 +527,7 @@ func TestFingerprintManager_Run_DriversWhiteListBlacklistCombination(t *testing. }) testClient.logger = testlog.HCLogger(t) - defer testClient.Shutdown() + defer cleanup() fm := NewFingerprintManager( testClient.config.PluginSingletonLoader, @@ -572,14 +552,14 @@ func TestFingerprintManager_Run_DriverFailure(t *testing.T) { t.Parallel() require := require.New(t) - testClient := TestClient(t, func(c *config.Config) { + testClient, cleanup := TestClient(t, func(c *config.Config) { c.Options = map[string]string{ "driver.raw_exec.enable": "1", } }) testClient.logger = testlog.HCLogger(t) - defer testClient.Shutdown() + defer cleanup() singLoader := testClient.config.PluginSingletonLoader @@ -626,7 +606,7 @@ func TestFingerprintManager_Run_DriversInBlacklist(t *testing.T) { t.Parallel() require := require.New(t) - testClient := TestClient(t, func(c *config.Config) { + testClient, cleanup := TestClient(t, func(c *config.Config) { c.Options = map[string]string{ "driver.raw_exec.enable": "1", "driver.whitelist": " raw_exec,foo,bar,baz ", @@ -635,7 +615,7 @@ func TestFingerprintManager_Run_DriversInBlacklist(t *testing.T) { }) testClient.logger = testlog.HCLogger(t) - defer testClient.Shutdown() + defer cleanup() fm := NewFingerprintManager( testClient.config.PluginSingletonLoader, diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index 4986338e4..f3cf6d646 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -57,8 +57,8 @@ func TestFS_Stat_NoAlloc(t *testing.T) { require := require.New(t) // Start a client - c := TestClient(t, nil) - defer c.Shutdown() + c, cleanup := TestClient(t, nil) + defer cleanup() // Make the request with bad allocation id req := &cstructs.FsStatRequest{ @@ -79,8 +79,8 @@ func TestFS_Stat(t *testing.T) { require := require.New(t) // Start a client - c := TestClient(t, nil) - defer c.Shutdown() + c, cleanup := TestClient(t, nil) + defer cleanup() // Create and add an alloc a := mock.Alloc() @@ -134,11 +134,11 @@ func TestFS_Stat_ACL(t *testing.T) { defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) - client := TestClient(t, func(c *config.Config) { + client, cleanup := TestClient(t, func(c *config.Config) { c.ACLEnabled = true c.Servers = []string{s.GetConfig().RPCAddr.String()} }) - defer client.Shutdown() + defer cleanup() // Create a bad token policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny}) @@ -196,8 +196,8 @@ func TestFS_List_NoAlloc(t *testing.T) { require := require.New(t) // Start a client - c := TestClient(t, nil) - defer c.Shutdown() + c, cleanup := TestClient(t, nil) + defer cleanup() // Make the request with bad allocation id req := &cstructs.FsListRequest{ @@ -218,8 +218,8 @@ func TestFS_List(t *testing.T) { require := require.New(t) // Start a client - c := TestClient(t, nil) - defer c.Shutdown() + c, cleanup := TestClient(t, nil) + defer cleanup() // Create and add an alloc a := mock.Alloc() @@ -273,11 +273,11 @@ func TestFS_List_ACL(t *testing.T) { defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) - client := TestClient(t, func(c *config.Config) { + client, cleanup := TestClient(t, func(c *config.Config) { c.ACLEnabled = true c.Servers = []string{s.GetConfig().RPCAddr.String()} }) - defer client.Shutdown() + defer cleanup() // Create a bad token policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny}) @@ -335,8 +335,8 @@ func TestFS_Stream_NoAlloc(t *testing.T) { require := require.New(t) // Start a client - c := TestClient(t, nil) - defer c.Shutdown() + c, cleanup := TestClient(t, nil) + defer cleanup() // Make the request with bad allocation id req := &cstructs.FsStreamRequest{ @@ -414,11 +414,11 @@ func TestFS_Stream_ACL(t *testing.T) { defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) - client := TestClient(t, func(c *config.Config) { + client, cleanup := TestClient(t, func(c *config.Config) { c.ACLEnabled = true c.Servers = []string{s.GetConfig().RPCAddr.String()} }) - defer client.Shutdown() + defer cleanup() // Create a bad token policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS}) @@ -534,10 +534,10 @@ func TestFS_Stream(t *testing.T) { defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) - c := TestClient(t, func(c *config.Config) { + c, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{s.GetConfig().RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() expected := "Hello from the other side" job := mock.BatchJob() @@ -645,10 +645,10 @@ func TestFS_Stream_Follow(t *testing.T) { defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) - c := TestClient(t, func(c *config.Config) { + c, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{s.GetConfig().RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() expectedBase := "Hello from the other side" repeat := 10 @@ -743,10 +743,10 @@ func TestFS_Stream_Limit(t *testing.T) { defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) - c := TestClient(t, func(c *config.Config) { + c, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{s.GetConfig().RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() var limit int64 = 5 full := "Hello from the other side" @@ -833,8 +833,8 @@ func TestFS_Logs_NoAlloc(t *testing.T) { require := require.New(t) // Start a client - c := TestClient(t, nil) - defer c.Shutdown() + c, cleanup := TestClient(t, nil) + defer cleanup() // Make the request with bad allocation id req := &cstructs.FsLogsRequest{ @@ -916,10 +916,10 @@ func TestFS_Logs_TaskPending(t *testing.T) { defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) - c := TestClient(t, func(c *config.Config) { + c, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{s.GetConfig().RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() job := mock.BatchJob() job.TaskGroups[0].Count = 1 @@ -1024,11 +1024,11 @@ func TestFS_Logs_ACL(t *testing.T) { defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) - client := TestClient(t, func(c *config.Config) { + client, cleanup := TestClient(t, func(c *config.Config) { c.ACLEnabled = true c.Servers = []string{s.GetConfig().RPCAddr.String()} }) - defer client.Shutdown() + defer cleanup() // Create a bad token policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS}) @@ -1145,10 +1145,10 @@ func TestFS_Logs(t *testing.T) { defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) - c := TestClient(t, func(c *config.Config) { + c, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{s.GetConfig().RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() expected := "Hello from the other side\n" job := mock.BatchJob() @@ -1247,10 +1247,10 @@ func TestFS_Logs_Follow(t *testing.T) { defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) - c := TestClient(t, func(c *config.Config) { + c, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{s.GetConfig().RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() expectedBase := "Hello from the other side\n" repeat := 10 @@ -1545,8 +1545,8 @@ func TestFS_findClosest(t *testing.T) { func TestFS_streamFile_NoFile(t *testing.T) { t.Parallel() require := require.New(t) - c := TestClient(t, nil) - defer c.Shutdown() + c, cleanup := TestClient(t, nil) + defer cleanup() ad := tempAllocDir(t) defer os.RemoveAll(ad.AllocDir) @@ -1565,8 +1565,8 @@ func TestFS_streamFile_NoFile(t *testing.T) { func TestFS_streamFile_Modify(t *testing.T) { t.Parallel() - c := TestClient(t, nil) - defer c.Shutdown() + c, cleanup := TestClient(t, nil) + defer cleanup() // Get a temp alloc dir ad := tempAllocDir(t) @@ -1634,8 +1634,8 @@ func TestFS_streamFile_Modify(t *testing.T) { func TestFS_streamFile_Truncate(t *testing.T) { t.Parallel() - c := TestClient(t, nil) - defer c.Shutdown() + c, cleanup := TestClient(t, nil) + defer cleanup() // Get a temp alloc dir ad := tempAllocDir(t) @@ -1739,8 +1739,8 @@ func TestFS_streamFile_Truncate(t *testing.T) { func TestFS_streamImpl_Delete(t *testing.T) { t.Parallel() - c := TestClient(t, nil) - defer c.Shutdown() + c, cleanup := TestClient(t, nil) + defer cleanup() // Get a temp alloc dir ad := tempAllocDir(t) @@ -1807,8 +1807,8 @@ func TestFS_streamImpl_Delete(t *testing.T) { func TestFS_logsImpl_NoFollow(t *testing.T) { t.Parallel() - c := TestClient(t, nil) - defer c.Shutdown() + c, cleanup := TestClient(t, nil) + defer cleanup() // Get a temp alloc dir and create the log dir ad := tempAllocDir(t) @@ -1874,8 +1874,8 @@ func TestFS_logsImpl_NoFollow(t *testing.T) { func TestFS_logsImpl_Follow(t *testing.T) { t.Parallel() - c := TestClient(t, nil) - defer c.Shutdown() + c, cleanup := TestClient(t, nil) + defer cleanup() // Get a temp alloc dir and create the log dir ad := tempAllocDir(t) diff --git a/client/rpc_test.go b/client/rpc_test.go index c25033923..404e47c12 100644 --- a/client/rpc_test.go +++ b/client/rpc_test.go @@ -19,10 +19,10 @@ func TestRpc_streamingRpcConn_badEndpoint(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c := TestClient(t, func(c *config.Config) { + c, cleanup := TestClient(t, func(c *config.Config) { c.Servers = []string{s1.GetConfig().RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { @@ -75,7 +75,7 @@ func TestRpc_streamingRpcConn_badEndpoint_TLS(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c := TestClient(t, func(c *config.Config) { + c, cleanup := TestClient(t, func(c *config.Config) { c.Region = "regionFoo" c.Servers = []string{s1.GetConfig().RPCAddr.String()} c.TLSConfig = &sconfig.TLSConfig{ @@ -87,7 +87,7 @@ func TestRpc_streamingRpcConn_badEndpoint_TLS(t *testing.T) { KeyFile: fookey, } }) - defer c.Shutdown() + defer cleanup() // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { diff --git a/client/testing.go b/client/testing.go index 941094990..bd67fd5a4 100644 --- a/client/testing.go +++ b/client/testing.go @@ -11,9 +11,14 @@ import ( "github.com/mitchellh/go-testing-interface" ) -// TestClient creates an in-memory client for testing purposes. -func TestClient(t testing.T, cb func(c *config.Config)) *Client { - conf := config.TestClientConfig() +// TestClient creates an in-memory client for testing purposes and returns a +// cleanup func to shutdown the client and remove the alloc and state dirs. +// +// There is no need to override the AllocDir or StateDir as they are randomized +// and removed in the returned cleanup function. If they are overridden in the +// callback then the caller still must run the returned cleanup func. +func TestClient(t testing.T, cb func(c *config.Config)) (*Client, func()) { + conf, cleanup := config.TestClientConfig(t) // Tighten the fingerprinter timeouts (must be done in client package // to avoid circular dependencies) @@ -38,7 +43,14 @@ func TestClient(t testing.T, cb func(c *config.Config)) *Client { mockService := consulApi.NewMockConsulServiceClient(t, logger) client, err := NewClient(conf, catalog, mockService) if err != nil { + cleanup() t.Fatalf("err: %v", err) } - return client + return client, func() { + // Shutdown client + client.Shutdown() + + // Call TestClientConfig cleanup + cleanup() + } } From 2aed3e852750e0444e1ba86df3463e976ffbe137 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 15 Oct 2018 20:38:12 -0700 Subject: [PATCH 3/8] ar: refactor task killing into 1 method Update comments and address some PR comments from #4775 --- client/allocrunner/alloc_runner.go | 108 +++++++++---------- client/allocrunner/interfaces/runner.go | 7 +- client/allocrunner/taskrunner/task_runner.go | 27 ++--- 3 files changed, 63 insertions(+), 79 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index c64ee3da6..eb13afa2e 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -42,7 +42,7 @@ type allocRunner struct { // the goroutine is already processing a previous update. taskStateUpdatedCh chan struct{} - // taskStateUpdateHandlerCh is closed when thte task state handling + // taskStateUpdateHandlerCh is closed when the task state handling // goroutine exits. It is unsafe to destroy the local allocation state // before this goroutine exits. taskStateUpdateHandlerCh chan struct{} @@ -252,7 +252,7 @@ func (ar *allocRunner) Restore() error { // // The goroutine is used to compute changes to the alloc's ClientStatus and to // update the server with the new state. -func (ar *allocRunner) TaskStateUpdated(taskName string, state *structs.TaskState) { +func (ar *allocRunner) TaskStateUpdated() { select { case ar.taskStateUpdatedCh <- struct{}{}: default: @@ -269,10 +269,6 @@ func (ar *allocRunner) TaskStateUpdated(taskName string, state *structs.TaskStat func (ar *allocRunner) handleTaskStateUpdates() { defer close(ar.taskStateUpdateHandlerCh) - //TODO allocdirs are being left in working dir not temp! - //TODO remove taskname and state from TaskStateUpdated - //TODO Ensure Client updates don't callback into AR - for done := false; !done; { select { case <-ar.taskStateUpdatedCh: @@ -335,22 +331,7 @@ func (ar *allocRunner) handleTaskStateUpdates() { ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask) } - // Kill live task runners - var leader *taskrunner.TaskRunner - for _, tr := range liveRunners { - if tr.IsLeader() { - // Capture the leader to kill last - leader = tr - continue - } - - tr.Kill(context.Background(), killEvent) - } - - // Kill leader last if it exists - if leader != nil { - leader.Kill(context.Background(), killEvent) - } + ar.killTasks() } // Get the client allocation @@ -364,6 +345,41 @@ func (ar *allocRunner) handleTaskStateUpdates() { } } +// killTasks kills all task runners, leader (if there is one) first. Errors are +// logged except taskrunner.ErrTaskNotRunning which is ignored. +func (ar *allocRunner) killTasks() { + // Kill leader first, synchronously + for name, tr := range ar.tasks { + if !tr.IsLeader() { + continue + } + + err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) + if err != nil { + ar.logger.Warn("error stopping leader task", "error", err, "task_name", name) + } + break + } + + // Kill the rest concurrently + wg := sync.WaitGroup{} + for name, tr := range ar.tasks { + if tr.IsLeader() { + continue + } + + wg.Add(1) + go func(name string, tr *taskrunner.TaskRunner) { + defer wg.Done() + err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) + if err != nil && err != taskrunner.ErrTaskNotRunning { + ar.logger.Warn("error stopping task", "error", err, "task_name", name) + } + }(name, tr) + } + wg.Wait() +} + // clientAlloc takes in the task states and returns an Allocation populated // with Client specific fields func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *structs.Allocation { @@ -486,43 +502,24 @@ func (ar *allocRunner) Update(update *structs.Allocation) { // Update ar.alloc ar.setAlloc(update) - // Run hooks - if err := ar.update(update); err != nil { - ar.logger.Error("error running update hooks", "error", err) - } - - // If alloc is being terminated, kill all tasks, leader first - if stopping { - // Kill leader first - for name, tr := range ar.tasks { - if !tr.IsLeader() { - continue - } - - err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) - if err != nil { - ar.logger.Warn("error stopping leader task", "error", err, "task_name", name) - } - break + // Run update hooks if not stopping or dead + if !update.TerminalStatus() { + if err := ar.update(update); err != nil { + ar.logger.Error("error running update hooks", "error", err) } - // Kill the rest - for name, tr := range ar.tasks { - if tr.IsLeader() { - continue - } - - err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) - if err != nil { - ar.logger.Warn("error stopping task", "error", err, "task_name", name) - } - } } // Update task runners for _, tr := range ar.tasks { tr.Update(update) } + + // If alloc is being terminated, kill all tasks, leader first + if stopping { + ar.killTasks() + } + } func (ar *allocRunner) Listener() *cstructs.AllocListener { @@ -543,13 +540,8 @@ func (ar *allocRunner) Destroy() { } defer ar.destroyedLock.Unlock() - // Stop tasks - for name, tr := range ar.tasks { - err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) - if err != nil && err != taskrunner.ErrTaskNotRunning { - ar.logger.Warn("failed to kill task", "error", err, "task_name", name) - } - } + // Stop any running tasks + ar.killTasks() // Wait for tasks to exit and postrun hooks to finish <-ar.waitCh diff --git a/client/allocrunner/interfaces/runner.go b/client/allocrunner/interfaces/runner.go index e488d932a..4789e4f94 100644 --- a/client/allocrunner/interfaces/runner.go +++ b/client/allocrunner/interfaces/runner.go @@ -2,8 +2,6 @@ package interfaces import ( "github.com/hashicorp/nomad/client/allocrunner/state" - "github.com/hashicorp/nomad/nomad/structs" - cstructs "github.com/hashicorp/nomad/client/structs" ) @@ -24,8 +22,9 @@ type AllocRunner interface { // TaskStateHandler exposes a handler to be called when a task's state changes type TaskStateHandler interface { - // TaskStateUpdated is used to emit updated task state - TaskStateUpdated(task string, state *structs.TaskState) + // TaskStateUpdated is used to notify the alloc runner about task state + // changes. + TaskStateUpdated() } // AllocStatsReporter gives acess to the latest resource usage from the diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index f683544ba..d0448dcf2 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -610,26 +610,21 @@ func (tr *TaskRunner) Restore() error { // UpdateState sets the task runners allocation state and triggers a server // update. func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) { - tr.logger.Debug("setting task state", "state", state, "event", event.Type) + tr.logger.Trace("setting task state", "state", state, "event", event.Type) // Update the local state - stateCopy := tr.setStateLocal(state, event) + tr.setStateLocal(state, event) // Notify the alloc runner of the transition - tr.stateUpdater.TaskStateUpdated(tr.taskName, stateCopy) + tr.stateUpdater.TaskStateUpdated() } // setStateLocal updates the local in-memory state, persists a copy to disk and returns a // copy of the task's state. -func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *structs.TaskState { +func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) { tr.stateLock.Lock() defer tr.stateLock.Unlock() - //XXX REMOVE ME AFTER TESTING - if state == "" { - panic("UpdateState must not be called with an empty state") - } - // Update the task state oldState := tr.state.State taskState := tr.state @@ -681,8 +676,6 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *str // try to persist it again. tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state) } - - return tr.state.Copy() } // EmitEvent appends a new TaskEvent to this task's TaskState. The actual @@ -692,6 +685,7 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *str // logged. Use AppendEvent to simply add a new event. func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) { tr.stateLock.Lock() + defer tr.stateLock.Unlock() tr.appendEvent(event) @@ -701,11 +695,8 @@ func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) { tr.logger.Warn("error persisting event", "error", err, "event", event) } - stateCopy := tr.state.Copy() - tr.stateLock.Unlock() - // Notify the alloc runner of the event - tr.stateUpdater.TaskStateUpdated(tr.taskName, stateCopy) + tr.stateUpdater.TaskStateUpdated() } // AppendEvent appends a new TaskEvent to this task's TaskState. The actual @@ -769,8 +760,10 @@ func (tr *TaskRunner) Update(update *structs.Allocation) { // Update tr.alloc tr.setAlloc(update) - // Trigger update hooks - tr.triggerUpdateHooks() + // Trigger update hooks if not terminal + if !update.TerminalStatus() { + tr.triggerUpdateHooks() + } } // triggerUpdate if there isn't already an update pending. Should be called From d71e7666bdfa404ec81be98ec7995f43d37b0d59 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 16 Oct 2018 15:17:36 -0700 Subject: [PATCH 4/8] ar: fix leader handling, state restoring, and destroying unrun ARs * Migrated all of the old leader task tests and got them passing * Refactor and consolidate task killing code in AR to always kill leader tasks first * Fixed lots of issues with state restoring * Fixed deadlock in AR.Destroy if AR.Run had never been called * Added a new in memory statedb for testing --- client/allocrunner/alloc_runner.go | 47 +++-- client/allocrunner/alloc_runner_test.go | 83 ++++++++- client/allocrunner/taskrunner/state/state.go | 17 ++ client/allocrunner/taskrunner/task_runner.go | 41 ++--- .../taskrunner/task_runner_hooks.go | 5 +- client/client.go | 4 +- client/state/db_test.go | 169 ++++++++++++++++++ client/state/interface.go | 31 +++- client/state/memdb.go | 144 +++++++++++++++ client/state/noopdb.go | 6 +- client/state/state_database.go | 85 ++++++--- helper/boltdd/boltdd.go | 36 +++- helper/boltdd/boltdd_test.go | 4 +- nomad/structs/structs.go | 15 ++ 14 files changed, 613 insertions(+), 74 deletions(-) create mode 100644 client/state/db_test.go create mode 100644 client/state/memdb.go diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index eb13afa2e..2179dcf2d 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -58,8 +58,16 @@ type allocRunner struct { waitCh chan struct{} // destroyed is true when the Run() loop has exited, postrun hooks have - // run, and alloc runner has been destroyed - destroyed bool + // run, and alloc runner has been destroyed. Must acquire destroyedLock + // to access. + destroyed bool + + // runLaunched is true if Run() has been called. If this is false + // Destroy() does not wait on tasks to shutdown as they are not + // running. Must acquire destroyedLock to access. + runLaunched bool + + // destroyedLock guards destroyed, ran, and serializes Destroy() calls. destroyedLock sync.Mutex // Alloc captures the allocation being run. @@ -170,6 +178,21 @@ func (ar *allocRunner) WaitCh() <-chan struct{} { // Run is the main goroutine that executes all the tasks. func (ar *allocRunner) Run() { + ar.destroyedLock.Lock() + defer ar.destroyedLock.Unlock() + + if ar.destroyed { + // Run should not be called after Destroy is called. This is a + // programming error. + ar.logger.Error("alloc destroyed; cannot run") + return + } + ar.runLaunched = true + + go ar.runImpl() +} + +func (ar *allocRunner) runImpl() { // Close the wait channel on return defer close(ar.waitCh) @@ -183,7 +206,7 @@ func (ar *allocRunner) Run() { } // Run the runners and block until they exit - <-ar.runImpl() + <-ar.runTasks() POST: // Run the postrun hooks @@ -193,8 +216,8 @@ POST: } } -// runImpl is used to run the runners. -func (ar *allocRunner) runImpl() <-chan struct{} { +// runTasks is used to run the task runners. +func (ar *allocRunner) runTasks() <-chan struct{} { for _, task := range ar.tasks { go task.Run() } @@ -355,7 +378,7 @@ func (ar *allocRunner) killTasks() { } err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) - if err != nil { + if err != nil && err != taskrunner.ErrTaskNotRunning { ar.logger.Warn("error stopping leader task", "error", err, "task_name", name) } break @@ -543,8 +566,10 @@ func (ar *allocRunner) Destroy() { // Stop any running tasks ar.killTasks() - // Wait for tasks to exit and postrun hooks to finish - <-ar.waitCh + // Wait for tasks to exit and postrun hooks to finish (if they ran at all) + if ar.runLaunched { + <-ar.waitCh + } // Run destroy hooks if err := ar.destroy(); err != nil { @@ -552,8 +577,10 @@ func (ar *allocRunner) Destroy() { } // Wait for task state update handler to exit before removing local - // state. - <-ar.taskStateUpdateHandlerCh + // state if Run() ran at all. + if ar.runLaunched { + <-ar.taskStateUpdateHandlerCh + } // Cleanup state db if err := ar.stateDB.DeleteAllocationBucket(ar.id); err != nil { diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 92b8ddf27..48b248f4a 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -126,7 +126,7 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) { ar, err := NewAllocRunner(conf) require.NoError(t, err) defer ar.Destroy() - go ar.Run() + ar.Run() // Wait for all tasks to be killed upd := conf.StateUpdater.(*MockStateUpdater) @@ -216,7 +216,7 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) { ar, err := NewAllocRunner(conf) require.NoError(t, err) defer ar.Destroy() - go ar.Run() + ar.Run() // Wait for tasks to start upd := conf.StateUpdater.(*MockStateUpdater) @@ -271,6 +271,85 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) { }) } +// TestAllocRunner_TaskLeader_StopRestoredTG asserts that when stopping a +// restored task group with a leader that failed before restoring the leader is +// not stopped as it does not exist. +// See https://github.com/hashicorp/nomad/issues/3420#issuecomment-341666932 +func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 + + // Create a leader and follower task in the task group + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Name = "follower1" + task.Driver = "mock_driver" + task.KillTimeout = 10 * time.Second + task.Config = map[string]interface{}{ + "run_for": "10s", + } + + task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() + task2.Name = "leader" + task2.Driver = "mock_driver" + task2.Leader = true + task2.KillTimeout = 10 * time.Millisecond + task2.Config = map[string]interface{}{ + "run_for": "10s", + } + + alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2) + alloc.TaskResources[task2.Name] = task2.Resources + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + + // Use a memory backed statedb + conf.StateDB = state.NewMemDB() + + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer ar.Destroy() + + // Mimic Nomad exiting before the leader stopping is able to stop other tasks. + ar.tasks["leader"].UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled)) + ar.tasks["follower1"].UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) + + // Create a new AllocRunner to test RestoreState and Run + ar2, err := NewAllocRunner(conf) + require.NoError(t, err) + defer ar2.Destroy() + + if err := ar2.Restore(); err != nil { + t.Fatalf("error restoring state: %v", err) + } + ar2.Run() + + // Wait for tasks to be stopped because leader is dead + testutil.WaitForResult(func() (bool, error) { + alloc := ar2.Alloc() + for task, state := range alloc.TaskStates { + if state.State != structs.TaskStateDead { + return false, fmt.Errorf("Task %q should be dead: %v", task, state.State) + } + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Make sure it GCs properly + ar2.Destroy() + + select { + case <-ar2.WaitCh(): + // exited as expected + case <-time.After(10 * time.Second): + t.Fatalf("timed out waiting for AR to GC") + } +} + /* import ( diff --git a/client/allocrunner/taskrunner/state/state.go b/client/allocrunner/taskrunner/state/state.go index 94aedcc95..83481c1ac 100644 --- a/client/allocrunner/taskrunner/state/state.go +++ b/client/allocrunner/taskrunner/state/state.go @@ -25,6 +25,23 @@ func NewLocalState() *LocalState { } } +// Canonicalize ensures LocalState is in a consistent state by initializing +// Hooks and ensuring no HookState's are nil. Useful for cleaning unmarshalled +// state which may be in an unknown state. +func (s *LocalState) Canonicalize() { + if s.Hooks == nil { + // Hooks is nil, create it + s.Hooks = make(map[string]*HookState) + } else { + for k, v := range s.Hooks { + // Remove invalid nil entries from Hooks map + if v == nil { + delete(s.Hooks, k) + } + } + } +} + // Copy should be called with the lock held func (s *LocalState) Copy() *LocalState { // Create a copy diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index d0448dcf2..11168aac4 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -171,9 +171,6 @@ type Config struct { // VaultClient is the client to use to derive and renew Vault tokens VaultClient vaultclient.VaultClient - // LocalState is optionally restored task state - LocalState *state.LocalState - // StateDB is used to store and restore state. StateDB cstate.StateDB @@ -197,6 +194,12 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { config.ClientConfig.Region, ) + // Initialize state from alloc if it is set + tstate := structs.NewTaskState() + if ts := config.Alloc.TaskStates[config.Task.Name]; ts != nil { + tstate = ts.Copy() + } + tr := &TaskRunner{ alloc: config.Alloc, allocID: config.Alloc.ID, @@ -208,8 +211,8 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { envBuilder: envBuilder, consulClient: config.Consul, vaultClient: config.VaultClient, - state: config.Alloc.TaskStates[config.Task.Name].Copy(), - localState: config.LocalState, + state: tstate, + localState: state.NewLocalState(), stateDB: config.StateDB, stateUpdater: config.StateUpdater, ctx: trCtx, @@ -230,9 +233,6 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { } tr.restartTracker = restarts.NewRestartTracker(tg.RestartPolicy, tr.alloc.Job.Type) - // Initialize the task state - tr.initState() - // Get the driver if err := tr.initDriver(); err != nil { tr.logger.Error("failed to create driver", "error", err) @@ -248,17 +248,6 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { return tr, nil } -func (tr *TaskRunner) initState() { - if tr.state == nil { - tr.state = &structs.TaskState{ - State: structs.TaskStatePending, - } - } - if tr.localState == nil { - tr.localState = state.NewLocalState() - } -} - func (tr *TaskRunner) initLabels() { alloc := tr.Alloc() tr.baseLabels = []metrics.Label{ @@ -590,10 +579,6 @@ func (tr *TaskRunner) buildTaskConfig() *drivers.TaskConfig { } } -// XXX If the objects don't exists since the client shutdown before the task -// runner ever saved state, then we should treat it as a new task runner and not -// return an error -// // Restore task runner state. Called by AllocRunner.Restore after NewTaskRunner // but before Run so no locks need to be acquired. func (tr *TaskRunner) Restore() error { @@ -602,8 +587,14 @@ func (tr *TaskRunner) Restore() error { return err } - tr.localState = ls - tr.state = ts + if ls != nil { + ls.Canonicalize() + tr.localState = ls + } + if ts != nil { + ts.Canonicalize() + tr.state = ts + } return nil } diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 42fb9f73f..4eaecc81b 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -101,8 +101,11 @@ func (tr *TaskRunner) prestart() error { TaskEnv: tr.envBuilder.Build(), } + var origHookState *state.HookState tr.localStateLock.RLock() - origHookState := tr.localState.Hooks[name] + if tr.localState.Hooks != nil { + origHookState = tr.localState.Hooks[name] + } tr.localStateLock.RUnlock() if origHookState != nil && origHookState.PrestartDone { tr.logger.Trace("skipping done prestart hook", "name", pre.Name()) diff --git a/client/client.go b/client/client.go index f1bc1ae47..8c4908795 100644 --- a/client/client.go +++ b/client/client.go @@ -805,7 +805,7 @@ func (c *Client) restoreState() error { // All allocs restored successfully, run them! c.allocLock.Lock() for _, ar := range c.allocs { - go ar.Run() + ar.Run() } c.allocLock.Unlock() @@ -1933,7 +1933,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error // Store the alloc runner. c.allocs[alloc.ID] = ar - go ar.Run() + ar.Run() return nil } diff --git a/client/state/db_test.go b/client/state/db_test.go new file mode 100644 index 000000000..60d9489c0 --- /dev/null +++ b/client/state/db_test.go @@ -0,0 +1,169 @@ +package state + +import ( + "io/ioutil" + "os" + "testing" + + trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func setupBoltDB(t *testing.T) (*BoltStateDB, func()) { + dir, err := ioutil.TempDir("", "nomadtest") + require.NoError(t, err) + + db, err := NewBoltStateDB(dir) + if err != nil { + if err := os.RemoveAll(dir); err != nil { + t.Logf("error removing boltdb dir: %v", err) + } + t.Fatalf("error creating boltdb: %v", err) + } + + cleanup := func() { + if err := db.Close(); err != nil { + t.Errorf("error closing boltdb: %v", err) + } + if err := os.RemoveAll(dir); err != nil { + t.Logf("error removing boltdb dir: %v", err) + } + } + + return db.(*BoltStateDB), cleanup +} + +func testDB(t *testing.T, f func(*testing.T, StateDB)) { + boltdb, cleanup := setupBoltDB(t) + defer cleanup() + + memdb := NewMemDB() + + impls := []StateDB{boltdb, memdb} + + for _, db := range impls { + db := db + t.Run(db.Name(), func(t *testing.T) { + f(t, db) + }) + } +} + +// TestStateDB asserts the behavior of GetAllAllocations, PutAllocation, and +// DeleteAllocationBucket for all operational StateDB implementations. +func TestStateDB_Allocations(t *testing.T) { + t.Parallel() + + testDB(t, func(t *testing.T, db StateDB) { + require := require.New(t) + + // Empty database should return empty non-nil results + allocs, errs, err := db.GetAllAllocations() + require.NoError(err) + require.NotNil(allocs) + require.Empty(allocs) + require.NotNil(errs) + require.Empty(errs) + + // Put allocations + alloc1 := mock.Alloc() + alloc2 := mock.BatchAlloc() + require.NoError(db.PutAllocation(alloc1)) + require.NoError(db.PutAllocation(alloc2)) + + // Retrieve them + allocs, errs, err = db.GetAllAllocations() + require.NoError(err) + require.NotNil(allocs) + require.Len(allocs, 2) + require.Contains(allocs, alloc1) + require.Contains(allocs, alloc2) + require.NotNil(errs) + require.Empty(errs) + + // Add another + alloc3 := mock.SystemAlloc() + require.NoError(db.PutAllocation(alloc3)) + allocs, errs, err = db.GetAllAllocations() + require.NoError(err) + require.NotNil(allocs) + require.Len(allocs, 3) + require.Contains(allocs, alloc1) + require.Contains(allocs, alloc2) + require.Contains(allocs, alloc3) + require.NotNil(errs) + require.Empty(errs) + + // Deleting a nonexistent alloc is a noop + require.NoError(db.DeleteAllocationBucket("asdf")) + allocs, errs, err = db.GetAllAllocations() + require.NoError(err) + require.NotNil(allocs) + require.Len(allocs, 3) + + // Delete alloc1 + require.NoError(db.DeleteAllocationBucket(alloc1.ID)) + allocs, errs, err = db.GetAllAllocations() + require.NoError(err) + require.NotNil(allocs) + require.Len(allocs, 2) + require.Contains(allocs, alloc2) + require.Contains(allocs, alloc3) + require.NotNil(errs) + require.Empty(errs) + }) +} + +// TestStateDB_TaskState asserts the behavior of task state related StateDB +// methods. +func TestStateDB_TaskState(t *testing.T) { + t.Parallel() + + testDB(t, func(t *testing.T, db StateDB) { + require := require.New(t) + + // Getting nonexistent state should return nils + ls, ts, err := db.GetTaskRunnerState("allocid", "taskname") + require.NoError(err) + require.Nil(ls) + require.Nil(ts) + + // Putting TaskState without first putting the allocation should work + state := structs.NewTaskState() + state.Failed = true // set a non-default value + require.NoError(db.PutTaskState("allocid", "taskname", state)) + + // Getting should return the available state + ls, ts, err = db.GetTaskRunnerState("allocid", "taskname") + require.NoError(err) + require.Nil(ls) + require.Equal(state, ts) + + // Deleting a nonexistent task should not error + require.NoError(db.DeleteTaskBucket("adsf", "asdf")) + require.NoError(db.DeleteTaskBucket("asllocid", "asdf")) + + // Data should be untouched + ls, ts, err = db.GetTaskRunnerState("allocid", "taskname") + require.NoError(err) + require.Nil(ls) + require.Equal(state, ts) + + // Deleting the task should remove the state + require.NoError(db.DeleteTaskBucket("allocid", "taskname")) + ls, ts, err = db.GetTaskRunnerState("allocid", "taskname") + require.NoError(err) + require.Nil(ls) + require.Nil(ts) + + // Putting LocalState should work just like TaskState + origLocalState := trstate.NewLocalState() + require.NoError(db.PutTaskRunnerLocalState("allocid", "taskname", origLocalState)) + ls, ts, err = db.GetTaskRunnerState("allocid", "taskname") + require.NoError(err) + require.Equal(origLocalState, ls) + require.Nil(ts) + }) +} diff --git a/client/state/interface.go b/client/state/interface.go index 6f571659c..0bea1de19 100644 --- a/client/state/interface.go +++ b/client/state/interface.go @@ -7,12 +7,41 @@ import ( // StateDB implementations store and load Nomad client state. type StateDB interface { + // Name of implementation. + Name() string + + // GetAllAllocations returns all valid allocations and a map of + // allocation IDs to retrieval errors. + // + // If a single error is returned then both allocations and the map will be nil. GetAllAllocations() ([]*structs.Allocation, map[string]error, error) + + // PulAllocation stores an allocation or returns an error if it could + // not be stored. PutAllocation(*structs.Allocation) error + + // GetTaskRunnerState returns the LocalState and TaskState for a + // TaskRunner. Either state may be nil if it is not found, but if an + // error is encountered only the error will be non-nil. GetTaskRunnerState(allocID, taskName string) (*state.LocalState, *structs.TaskState, error) - PutTaskRunnerLocalState(allocID, taskName string, val interface{}) error + + // PutTaskRunnerLocalTask stores the LocalState for a TaskRunner or + // returns an error. + PutTaskRunnerLocalState(allocID, taskName string, val *state.LocalState) error + + // PutTaskState stores the TaskState for a TaskRunner or returns an + // error. PutTaskState(allocID, taskName string, state *structs.TaskState) error + + // DeleteTaskBucket deletes a task's state bucket if it exists. No + // error is returned if it does not exist. DeleteTaskBucket(allocID, taskName string) error + + // DeleteAllocationBucket deletes an allocation's state bucket if it + // exists. No error is returned if it does not exist. DeleteAllocationBucket(allocID string) error + + // Close the database. Unsafe for further use after calling regardless + // of return value. Close() error } diff --git a/client/state/memdb.go b/client/state/memdb.go new file mode 100644 index 000000000..f4e67ff05 --- /dev/null +++ b/client/state/memdb.go @@ -0,0 +1,144 @@ +package state + +import ( + "sync" + + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +// MemDB implements a StateDB that stores data in memory and should only be +// used for testing. All methods are safe for concurrent use. +type MemDB struct { + // alloc_id -> value + allocs map[string]*structs.Allocation + + // alloc_id -> task_name -> value + localTaskState map[string]map[string]*state.LocalState + taskState map[string]map[string]*structs.TaskState + + mu sync.RWMutex +} + +func NewMemDB() *MemDB { + return &MemDB{ + allocs: make(map[string]*structs.Allocation), + localTaskState: make(map[string]map[string]*state.LocalState), + taskState: make(map[string]map[string]*structs.TaskState), + } +} + +func (m *MemDB) Name() string { + return "memdb" +} + +func (m *MemDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + allocs := make([]*structs.Allocation, 0, len(m.allocs)) + for _, v := range m.allocs { + allocs = append(allocs, v) + } + + return allocs, map[string]error{}, nil +} + +func (m *MemDB) PutAllocation(alloc *structs.Allocation) error { + m.mu.Lock() + defer m.mu.Unlock() + m.allocs[alloc.ID] = alloc + return nil +} + +func (m *MemDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + var ls *state.LocalState + var ts *structs.TaskState + + // Local Task State + allocLocalTS := m.localTaskState[allocID] + if len(allocLocalTS) != 0 { + ls = allocLocalTS[taskName] + } + + // Task State + allocTS := m.taskState[allocID] + if len(allocTS) != 0 { + ts = allocTS[taskName] + } + + return ls, ts, nil +} + +func (m *MemDB) PutTaskRunnerLocalState(allocID string, taskName string, val *state.LocalState) error { + m.mu.Lock() + defer m.mu.Unlock() + + if alts, ok := m.localTaskState[allocID]; ok { + alts[taskName] = val.Copy() + return nil + } + + m.localTaskState[allocID] = map[string]*state.LocalState{ + taskName: val.Copy(), + } + + return nil +} + +func (m *MemDB) PutTaskState(allocID string, taskName string, state *structs.TaskState) error { + m.mu.Lock() + defer m.mu.Unlock() + + if ats, ok := m.taskState[allocID]; ok { + ats[taskName] = state.Copy() + return nil + } + + m.taskState[allocID] = map[string]*structs.TaskState{ + taskName: state.Copy(), + } + + return nil +} + +func (m *MemDB) DeleteTaskBucket(allocID, taskName string) error { + m.mu.Lock() + defer m.mu.Unlock() + + if ats, ok := m.taskState[allocID]; ok { + delete(ats, taskName) + } + + if alts, ok := m.localTaskState[allocID]; ok { + delete(alts, taskName) + } + + return nil +} + +func (m *MemDB) DeleteAllocationBucket(allocID string) error { + m.mu.Lock() + defer m.mu.Unlock() + + delete(m.allocs, allocID) + delete(m.taskState, allocID) + delete(m.localTaskState, allocID) + + return nil +} + +func (m *MemDB) Close() error { + m.mu.Lock() + defer m.mu.Unlock() + + // Set everything to nil to blow up on further use + m.allocs = nil + m.taskState = nil + m.localTaskState = nil + + return nil +} diff --git a/client/state/noopdb.go b/client/state/noopdb.go index 05ee2b562..c20eae88d 100644 --- a/client/state/noopdb.go +++ b/client/state/noopdb.go @@ -8,6 +8,10 @@ import ( // NoopDB implements a StateDB that does not persist any data. type NoopDB struct{} +func (n NoopDB) Name() string { + return "noopdb" +} + func (n NoopDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) { return nil, nil, nil } @@ -20,7 +24,7 @@ func (n NoopDB) GetTaskRunnerState(allocID string, taskName string) (*state.Loca return nil, nil, nil } -func (n NoopDB) PutTaskRunnerLocalState(allocID string, taskName string, val interface{}) error { +func (n NoopDB) PutTaskRunnerLocalState(allocID string, taskName string, val *state.LocalState) error { return nil } diff --git a/client/state/state_database.go b/client/state/state_database.go index b63355b76..524848e13 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -21,9 +21,9 @@ allocations/ (bucket) */ var ( - // allocationsBucket is the bucket name containing all allocation related + // allocationsBucketName is the bucket name containing all allocation related // data - allocationsBucket = []byte("allocations") + allocationsBucketName = []byte("allocations") // allocKey is the key serialized Allocations are stored under allocKey = []byte("alloc") @@ -74,6 +74,10 @@ func NewBoltStateDB(stateDir string) (StateDB, error) { return sdb, nil } +func (s *BoltStateDB) Name() string { + return "boltdb" +} + // GetAllAllocations gets all allocations persisted by this client and returns // a map of alloc ids to errors for any allocations that could not be restored. // @@ -101,15 +105,15 @@ type allocEntry struct { } func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, map[string]error) { - allocationsBkt := tx.Bucket(allocationsBucket) + allocs := []*structs.Allocation{} + errs := map[string]error{} + + allocationsBkt := tx.Bucket(allocationsBucketName) if allocationsBkt == nil { // No allocs - return nil, nil + return allocs, errs } - var allocs []*structs.Allocation - errs := map[string]error{} - // Create a cursor for iteration. c := allocationsBkt.BoltBucket().Cursor() @@ -138,7 +142,7 @@ func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, m func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error { return s.db.Update(func(tx *boltdd.Tx) error { // Retrieve the root allocations bucket - allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucket) + allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucketName) if err != nil { return err } @@ -157,25 +161,53 @@ func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error { }) } -// GetTaskRunnerState restores TaskRunner specific state. +// GetTaskRunnerState returns the LocalState and TaskState for a +// TaskRunner. LocalState or TaskState will be nil if they do not exist. +// +// If an error is encountered both LocalState and TaskState will be nil. func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.LocalState, *structs.TaskState, error) { - var ls trstate.LocalState - var ts structs.TaskState + var ls *trstate.LocalState + var ts *structs.TaskState err := s.db.View(func(tx *boltdd.Tx) error { - bkt, err := getTaskBucket(tx, allocID, taskName) - if err != nil { - return fmt.Errorf("failed to get task %q bucket: %v", taskName, err) + allAllocsBkt := tx.Bucket(allocationsBucketName) + if allAllocsBkt == nil { + // No state, return + return nil } - // Restore Local State - if err := bkt.Get(taskLocalStateKey, &ls); err != nil { - return fmt.Errorf("failed to read local task runner state: %v", err) + allocBkt := allAllocsBkt.Bucket([]byte(allocID)) + if allocBkt == nil { + // No state for alloc, return + return nil } - // Restore Task State - if err := bkt.Get(taskStateKey, &ts); err != nil { - return fmt.Errorf("failed to read task state: %v", err) + taskBkt := allocBkt.Bucket([]byte(taskName)) + if taskBkt == nil { + // No state for task, return + return nil + } + + // Restore Local State if it exists + ls = &trstate.LocalState{} + if err := taskBkt.Get(taskLocalStateKey, ls); err != nil { + if !boltdd.IsErrNotFound(err) { + return fmt.Errorf("failed to read local task runner state: %v", err) + } + + // Key not found, reset ls to nil + ls = nil + } + + // Restore Task State if it exists + ts = &structs.TaskState{} + if err := taskBkt.Get(taskStateKey, ts); err != nil { + if !boltdd.IsErrNotFound(err) { + return fmt.Errorf("failed to read task state: %v", err) + } + + // Key not found, reset ts to nil + ts = nil } return nil @@ -185,12 +217,11 @@ func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.Loc return nil, nil, err } - return &ls, &ts, nil - + return ls, ts, nil } // PutTaskRunnerLocalState stores TaskRunner's LocalState or returns an error. -func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val interface{}) error { +func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val *trstate.LocalState) error { return s.db.Update(func(tx *boltdd.Tx) error { taskBkt, err := getTaskBucket(tx, allocID, taskName) if err != nil { @@ -221,7 +252,7 @@ func (s *BoltStateDB) PutTaskState(allocID, taskName string, state *structs.Task func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error { return s.db.Update(func(tx *boltdd.Tx) error { // Retrieve the root allocations bucket - allocations := tx.Bucket(allocationsBucket) + allocations := tx.Bucket(allocationsBucketName) if allocations == nil { return nil } @@ -242,7 +273,7 @@ func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error { func (s *BoltStateDB) DeleteAllocationBucket(allocID string) error { return s.db.Update(func(tx *boltdd.Tx) error { // Retrieve the root allocations bucket - allocations := tx.Bucket(allocationsBucket) + allocations := tx.Bucket(allocationsBucketName) if allocations == nil { return nil } @@ -267,13 +298,13 @@ func getAllocationBucket(tx *boltdd.Tx, allocID string) (*boltdd.Bucket, error) w := tx.Writable() // Retrieve the root allocations bucket - allocations := tx.Bucket(allocationsBucket) + allocations := tx.Bucket(allocationsBucketName) if allocations == nil { if !w { return nil, fmt.Errorf("Allocations bucket doesn't exist and transaction is not writable") } - allocations, err = tx.CreateBucketIfNotExists(allocationsBucket) + allocations, err = tx.CreateBucketIfNotExists(allocationsBucketName) if err != nil { return nil, err } diff --git a/helper/boltdd/boltdd.go b/helper/boltdd/boltdd.go index 1fa31870b..7e665745d 100644 --- a/helper/boltdd/boltdd.go +++ b/helper/boltdd/boltdd.go @@ -14,6 +14,29 @@ import ( "golang.org/x/crypto/blake2b" ) +// ErrNotFound is returned when a key is not found. +type ErrNotFound struct { + name string +} + +func (e *ErrNotFound) Error() string { + return fmt.Sprintf("key not found: %s", e.name) +} + +// NotFound returns a new error for a key that was not found. +func NotFound(name string) error { + return &ErrNotFound{name} +} + +// IsErrNotFound returns true if the error is an ErrNotFound error. +func IsErrNotFound(e error) bool { + if e == nil { + return false + } + _, ok := e.(*ErrNotFound) + return ok +} + // DB wraps an underlying bolt.DB to create write deduplicating buckets and // msgpack encoded values. type DB struct { @@ -277,12 +300,13 @@ func (b *Bucket) Put(key []byte, val interface{}) error { } -// Get value by key from boltdb or return an error if key not found. +// Get value by key from boltdb or return an ErrNotFound error if key not +// found. func (b *Bucket) Get(key []byte, obj interface{}) error { // Get the raw data from the underlying boltdb data := b.boltBucket.Get(key) if data == nil { - return fmt.Errorf("no data at key %v", string(key)) + return NotFound(string(key)) } // Deserialize the object @@ -342,11 +366,15 @@ func (b *Bucket) CreateBucketIfNotExists(name []byte) (*Bucket, error) { return newBucket(bmeta, bb), nil } -// DeleteBucket deletes a child bucket. Returns an error if the bucket does not -// exist or corresponds to a non-bucket key. +// DeleteBucket deletes a child bucket. Returns an error if the bucket +// corresponds to a non-bucket key or another error is encountered. No error is +// returned if the bucket does not exist. func (b *Bucket) DeleteBucket(name []byte) error { // Delete the bucket from the underlying boltdb err := b.boltBucket.DeleteBucket(name) + if err == bolt.ErrBucketNotFound { + err = nil + } // Remove reference to child bucket b.bm.deleteBucket(name) diff --git a/helper/boltdd/boltdd_test.go b/helper/boltdd/boltdd_test.go index da3681ef6..b31523bc0 100644 --- a/helper/boltdd/boltdd_test.go +++ b/helper/boltdd/boltdd_test.go @@ -232,7 +232,9 @@ func TestBucket_Delete(t *testing.T) { require.NoError(err) var v []byte - require.Error(child.Get(childKey, &v)) + err = child.Get(childKey, &v) + require.Error(err) + require.True(IsErrNotFound(err)) require.Equal(([]byte)(nil), v) require.Nil(child.Bucket(grandchildName1)) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 9b2b534a1..f26df1d5c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5573,6 +5573,21 @@ type TaskState struct { Events []*TaskEvent } +// NewTaskState returns a TaskState initialized in the Pending state. +func NewTaskState() *TaskState { + return &TaskState{ + State: TaskStatePending, + } +} + +// Canonicalize ensures the TaskState has a State set. It should default to +// Pending. +func (ts *TaskState) Canonicalize() { + if ts.State == "" { + ts.State = TaskStatePending + } +} + func (ts *TaskState) Copy() *TaskState { if ts == nil { return nil From 41b40b9296c8cfc396d3efe39fb6de9f32a37202 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 17 Oct 2018 17:14:27 -0700 Subject: [PATCH 5/8] plugins/drivers: fix panic when copying a nil TaskHandle --- plugins/drivers/task_handle.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/drivers/task_handle.go b/plugins/drivers/task_handle.go index e71288952..749c10314 100644 --- a/plugins/drivers/task_handle.go +++ b/plugins/drivers/task_handle.go @@ -29,6 +29,10 @@ func (h *TaskHandle) GetDriverState(v interface{}) error { } func (h *TaskHandle) Copy() *TaskHandle { + if h == nil { + return nil + } + handle := new(TaskHandle) *handle = *h handle.Config = h.Config.Copy() From 05365806ac3f365b8df99533d0376911368e76c3 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 17 Oct 2018 17:14:44 -0700 Subject: [PATCH 6/8] ar: initialize allocwatcher on restore Fixes a panic. Left a comment on how the behavior could be improved, but this is what releases <0.9.0 did. --- client/client.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/client/client.go b/client/client.go index 8c4908795..f4a3810ac 100644 --- a/client/client.go +++ b/client/client.go @@ -760,6 +760,11 @@ func (c *Client) restoreState() error { var mErr multierror.Error for _, alloc := range allocs { + //XXX On Restore we give up on watching previous allocs because + // we need the local AllocRunners initialized first. We could + // add a second loop to initialize just the alloc watcher. + prevAllocWatcher := allocwatcher.NoopPrevAlloc{} + c.configLock.RLock() arConf := &allocrunner.Config{ Alloc: alloc, @@ -769,6 +774,7 @@ func (c *Client) restoreState() error { StateUpdater: c, Consul: c.consulService, Vault: c.vaultClient, + PrevAllocWatcher: prevAllocWatcher, PluginLoader: c.config.PluginLoader, PluginSingletonLoader: c.config.PluginSingletonLoader, } From 9d2b28e9bec3c1f49b1e3a1a767a4cacd2416a5d Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 29 Oct 2018 14:21:05 -0700 Subject: [PATCH 7/8] tests: fix usages of TestClient cleanup and mock driver --- nomad/client_alloc_endpoint_test.go | 33 ++++++------- nomad/client_fs_endpoint_test.go | 72 ++++++++++++++--------------- nomad/client_rpc_test.go | 4 +- nomad/client_stats_endpoint_test.go | 8 ++-- nomad/drainer_int_test.go | 6 +-- nomad/mock/mock.go | 2 +- 6 files changed, 63 insertions(+), 62 deletions(-) diff --git a/nomad/client_alloc_endpoint_test.go b/nomad/client_alloc_endpoint_test.go index 1df7485e3..2f434e32a 100644 --- a/nomad/client_alloc_endpoint_test.go +++ b/nomad/client_alloc_endpoint_test.go @@ -3,6 +3,7 @@ package nomad import ( "fmt" "testing" + "time" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" @@ -26,10 +27,10 @@ func TestClientAllocations_GarbageCollectAll_Local(t *testing.T) { codec := rpcClient(t, s) testutil.WaitForLeader(t, s.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() testutil.WaitForResult(func() (bool, error) { nodes := s.connectedNodes() @@ -188,11 +189,11 @@ func TestClientAllocations_GarbageCollectAll_Remote(t *testing.T) { testutil.WaitForLeader(t, s2.RPC) codec := rpcClient(t, s2) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} c.GCDiskUsageThreshold = 100.0 }) - defer c.Shutdown() + defer cleanup() testutil.WaitForResult(func() (bool, error) { nodes := s2.connectedNodes() @@ -268,11 +269,11 @@ func TestClientAllocations_GarbageCollect_Local(t *testing.T) { codec := rpcClient(t, s) testutil.WaitForLeader(t, s.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} c.GCDiskUsageThreshold = 100.0 }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node a := mock.Alloc() @@ -283,7 +284,7 @@ func TestClientAllocations_GarbageCollect_Local(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "2s", + "run_for": 2 * time.Second, }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ @@ -417,11 +418,11 @@ func TestClientAllocations_GarbageCollect_Remote(t *testing.T) { testutil.WaitForLeader(t, s2.RPC) codec := rpcClient(t, s2) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} c.GCDiskUsageThreshold = 100.0 }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node a := mock.Alloc() @@ -432,7 +433,7 @@ func TestClientAllocations_GarbageCollect_Remote(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "2s", + "run_for": 2 * time.Second, }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ @@ -539,10 +540,10 @@ func TestClientAllocations_Stats_Local(t *testing.T) { codec := rpcClient(t, s) testutil.WaitForLeader(t, s.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node a := mock.Alloc() @@ -553,7 +554,7 @@ func TestClientAllocations_Stats_Local(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "2s", + "run_for": 2 * time.Second, }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ @@ -688,10 +689,10 @@ func TestClientAllocations_Stats_Remote(t *testing.T) { testutil.WaitForLeader(t, s2.RPC) codec := rpcClient(t, s2) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node a := mock.Alloc() @@ -702,7 +703,7 @@ func TestClientAllocations_Stats_Remote(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "2s", + "run_for": 2 * time.Second, }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ diff --git a/nomad/client_fs_endpoint_test.go b/nomad/client_fs_endpoint_test.go index bdfdd6409..e87e5a00f 100644 --- a/nomad/client_fs_endpoint_test.go +++ b/nomad/client_fs_endpoint_test.go @@ -31,10 +31,10 @@ func TestClientFS_List_Local(t *testing.T) { codec := rpcClient(t, s) testutil.WaitForLeader(t, s.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node a := mock.Alloc() @@ -45,7 +45,7 @@ func TestClientFS_List_Local(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "2s", + "run_for": 2 * time.Second, }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ @@ -183,10 +183,10 @@ func TestClientFS_List_Remote(t *testing.T) { testutil.WaitForLeader(t, s2.RPC) codec := rpcClient(t, s2) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node a := mock.Alloc() @@ -197,7 +197,7 @@ func TestClientFS_List_Remote(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "2s", + "run_for": 2 * time.Second, }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ @@ -300,10 +300,10 @@ func TestClientFS_Stat_Local(t *testing.T) { codec := rpcClient(t, s) testutil.WaitForLeader(t, s.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node a := mock.Alloc() @@ -314,7 +314,7 @@ func TestClientFS_Stat_Local(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "2s", + "run_for": 2 * time.Second, }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ @@ -452,10 +452,10 @@ func TestClientFS_Stat_Remote(t *testing.T) { testutil.WaitForLeader(t, s2.RPC) codec := rpcClient(t, s2) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node a := mock.Alloc() @@ -466,7 +466,7 @@ func TestClientFS_Stat_Remote(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "2s", + "run_for": 2 * time.Second, }, LogConfig: structs.DefaultLogConfig(), Resources: &structs.Resources{ @@ -719,10 +719,10 @@ func TestClientFS_Streaming_Local(t *testing.T) { defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node expected := "Hello from the other side" @@ -734,7 +734,7 @@ func TestClientFS_Streaming_Local(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "2s", + "run_for": 2 * time.Second, "stdout_string": expected, }, LogConfig: structs.DefaultLogConfig(), @@ -851,10 +851,10 @@ func TestClientFS_Streaming_Local_Follow(t *testing.T) { defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node expectedBase := "Hello from the other side" @@ -868,7 +868,7 @@ func TestClientFS_Streaming_Local_Follow(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "20s", + "run_for": 2 * time.Second, "stdout_string": expectedBase, "stdout_repeat": repeat, "stdout_repeat_duration": 200 * time.Millisecond, @@ -995,10 +995,10 @@ func TestClientFS_Streaming_Remote_Server(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node expected := "Hello from the other side" @@ -1010,7 +1010,7 @@ func TestClientFS_Streaming_Remote_Server(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "2s", + "run_for": 2 * time.Second, "stdout_string": expected, }, LogConfig: structs.DefaultLogConfig(), @@ -1141,11 +1141,11 @@ func TestClientFS_Streaming_Remote_Region(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} c.Region = "two" }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node expected := "Hello from the other side" @@ -1157,7 +1157,7 @@ func TestClientFS_Streaming_Remote_Region(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "2s", + "run_for": 2 * time.Second, "stdout_string": expected, }, LogConfig: structs.DefaultLogConfig(), @@ -1541,10 +1541,10 @@ func TestClientFS_Logs_Local(t *testing.T) { defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node expected := "Hello from the other side" @@ -1556,7 +1556,7 @@ func TestClientFS_Logs_Local(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "2s", + "run_for": 2 * time.Second, "stdout_string": expected, }, LogConfig: structs.DefaultLogConfig(), @@ -1674,10 +1674,10 @@ func TestClientFS_Logs_Local_Follow(t *testing.T) { defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node expectedBase := "Hello from the other side" @@ -1691,7 +1691,7 @@ func TestClientFS_Logs_Local_Follow(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "20s", + "run_for": 20 * time.Second, "stdout_string": expectedBase, "stdout_repeat": repeat, "stdout_repeat_duration": 200 * time.Millisecond, @@ -1819,10 +1819,10 @@ func TestClientFS_Logs_Remote_Server(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node expected := "Hello from the other side" @@ -1834,7 +1834,7 @@ func TestClientFS_Logs_Remote_Server(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "2s", + "run_for": 2 * time.Second, "stdout_string": expected, }, LogConfig: structs.DefaultLogConfig(), @@ -1966,11 +1966,11 @@ func TestClientFS_Logs_Remote_Region(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} c.Region = "two" }) - defer c.Shutdown() + defer cleanup() // Force an allocation onto the node expected := "Hello from the other side" @@ -1982,7 +1982,7 @@ func TestClientFS_Logs_Remote_Region(t *testing.T) { Name: "web", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "2s", + "run_for": 2 * time.Second, "stdout_string": expected, }, LogConfig: structs.DefaultLogConfig(), diff --git a/nomad/client_rpc_test.go b/nomad/client_rpc_test.go index e1a75c596..33ca87baa 100644 --- a/nomad/client_rpc_test.go +++ b/nomad/client_rpc_test.go @@ -264,10 +264,10 @@ func TestNodeStreamingRpc_badEndpoint(t *testing.T) { defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s1.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() // Wait for the client to connect testutil.WaitForResult(func() (bool, error) { diff --git a/nomad/client_stats_endpoint_test.go b/nomad/client_stats_endpoint_test.go index 66e42b059..78f667bd5 100644 --- a/nomad/client_stats_endpoint_test.go +++ b/nomad/client_stats_endpoint_test.go @@ -25,10 +25,10 @@ func TestClientStats_Stats_Local(t *testing.T) { codec := rpcClient(t, s) testutil.WaitForLeader(t, s.RPC) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() testutil.WaitForResult(func() (bool, error) { nodes := s.connectedNodes() @@ -183,10 +183,10 @@ func TestClientStats_Stats_Remote(t *testing.T) { testutil.WaitForLeader(t, s2.RPC) codec := rpcClient(t, s2) - c := client.TestClient(t, func(c *config.Config) { + c, cleanup := client.TestClient(t, func(c *config.Config) { c.Servers = []string{s2.config.RPCAddr.String()} }) - defer c.Shutdown() + defer cleanup() testutil.WaitForResult(func() (bool, error) { nodes := s2.connectedNodes() diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index 9c05bb46b..c955a844c 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -870,7 +870,6 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { // Test that transitions to force drain work. func TestDrainer_Batch_TransitionToForce(t *testing.T) { t.Parallel() - require := require.New(t) for _, inf := range []bool{true, false} { name := "Infinite" @@ -878,6 +877,7 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) { name = "Deadline" } t.Run(name, func(t *testing.T) { + require := require.New(t) s1 := TestServer(t, nil) defer s1.Shutdown() codec := rpcClient(t, s1) @@ -948,12 +948,12 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) { // Make sure the batch job isn't affected testutil.AssertUntil(500*time.Millisecond, func() (bool, error) { if err := checkAllocPromoter(errCh); err != nil { - return false, err + return false, fmt.Errorf("check alloc promoter error: %v", err) } allocs, err := state.AllocsByNode(nil, n1.ID) if err != nil { - return false, err + return false, fmt.Errorf("AllocsByNode error: %v", err) } for _, alloc := range allocs { if alloc.DesiredStatus != structs.AllocDesiredStatusRun { diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 19a1f252f..3b90ee15d 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -264,7 +264,7 @@ func BatchJob() *structs.Job { Name: "worker", Driver: "mock_driver", Config: map[string]interface{}{ - "run_for": "500ms", + "run_for": 500 * time.Millisecond, }, Env: map[string]string{ "FOO": "bar", From 0b4e15c3661af3c354453d0f2f9f05f51e644996 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 29 Oct 2018 15:25:22 -0700 Subject: [PATCH 8/8] tests: more fixes due to api changes --- client/state/db_test.go | 29 ++++++++++++++++++++++++++--- command/agent/consul/int_test.go | 4 ++-- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/client/state/db_test.go b/client/state/db_test.go index 60d9489c0..ab50025e5 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -3,11 +3,13 @@ package state import ( "io/ioutil" "os" + "reflect" "testing" trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/kr/pretty" "github.com/stretchr/testify/require" ) @@ -70,6 +72,13 @@ func TestStateDB_Allocations(t *testing.T) { // Put allocations alloc1 := mock.Alloc() alloc2 := mock.BatchAlloc() + + //XXX Sadly roundtripping allocs loses time.Duration type + // information from the Config map[string]interface{}. As + // the mock driver itself with unmarshal run_for into the + // proper type, we can safely ignore it here. + delete(alloc2.Job.TaskGroups[0].Tasks[0].Config, "run_for") + require.NoError(db.PutAllocation(alloc1)) require.NoError(db.PutAllocation(alloc2)) @@ -78,8 +87,22 @@ func TestStateDB_Allocations(t *testing.T) { require.NoError(err) require.NotNil(allocs) require.Len(allocs, 2) - require.Contains(allocs, alloc1) - require.Contains(allocs, alloc2) + for _, a := range allocs { + switch a.ID { + case alloc1.ID: + if !reflect.DeepEqual(a, alloc1) { + pretty.Ldiff(t, a, alloc1) + t.Fatalf("alloc %q unequal", a.ID) + } + case alloc2.ID: + if !reflect.DeepEqual(a, alloc2) { + pretty.Ldiff(t, a, alloc2) + t.Fatalf("alloc %q unequal", a.ID) + } + default: + t.Fatalf("unexpected alloc id %q", a.ID) + } + } require.NotNil(errs) require.Empty(errs) @@ -98,7 +121,7 @@ func TestStateDB_Allocations(t *testing.T) { // Deleting a nonexistent alloc is a noop require.NoError(db.DeleteAllocationBucket("asdf")) - allocs, errs, err = db.GetAllAllocations() + allocs, _, err = db.GetAllAllocations() require.NoError(err) require.NotNil(allocs) require.Len(allocs, 3) diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index c279131b9..405a21854 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -27,8 +27,8 @@ type mockUpdater struct { logger log.Logger } -func (m *mockUpdater) TaskStateUpdated(task string, state *structs.TaskState) { - m.logger.Named("test.updater").Debug("update", "task", task, "state", state) +func (m *mockUpdater) TaskStateUpdated() { + m.logger.Named("mock.updater").Debug("Update!") } // TODO Fix