Merge pull request #4803 from hashicorp/b-leader-fixes

AR Fixes: task leader handling, restoring, state updating, AR.Destroy deadlocks
This commit is contained in:
Michael Schurter
2018-10-29 17:38:59 -05:00
committed by GitHub
34 changed files with 1352 additions and 451 deletions

View File

@@ -17,11 +17,11 @@ func TestClient_ACL_resolveTokenValue(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.RPCHandler = s1
c.ACLEnabled = true
})
defer c1.Shutdown()
defer cleanup()
// Create a policy / token
policy := mock.ACLPolicy()
@@ -66,11 +66,11 @@ func TestClient_ACL_resolvePolicies(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.RPCHandler = s1
c.ACLEnabled = true
})
defer c1.Shutdown()
defer cleanup()
// Create a policy / token
policy := mock.ACLPolicy()
@@ -106,10 +106,10 @@ func TestClient_ACL_ResolveToken_Disabled(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
defer c1.Shutdown()
defer cleanup()
// Should always get nil when disabled
aclObj, err := c1.ResolveToken("blah")
@@ -122,11 +122,11 @@ func TestClient_ACL_ResolveToken(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.RPCHandler = s1
c.ACLEnabled = true
})
defer c1.Shutdown()
defer cleanup()
// Create a policy / token
policy := mock.ACLPolicy()

View File

@@ -16,7 +16,8 @@ import (
func TestAllocations_GarbageCollectAll(t *testing.T) {
t.Parallel()
require := require.New(t)
client := TestClient(t, nil)
client, cleanup := TestClient(t, nil)
defer cleanup()
req := &nstructs.NodeSpecificRequest{}
var resp nstructs.GenericResponse
@@ -29,11 +30,11 @@ func TestAllocations_GarbageCollectAll_ACL(t *testing.T) {
server, addr, root := testACLServer(t, nil)
defer server.Shutdown()
client := TestClient(t, func(c *config.Config) {
client, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.ACLEnabled = true
})
defer client.Shutdown()
defer cleanup()
// Try request without a token and expect failure
{
@@ -79,9 +80,10 @@ func TestAllocations_GarbageCollect(t *testing.T) {
t.Skip("missing mock driver plugin implementation")
t.Parallel()
require := require.New(t)
client := TestClient(t, func(c *config.Config) {
client, cleanup := TestClient(t, func(c *config.Config) {
c.GCDiskUsageThreshold = 100.0
})
defer cleanup()
a := mock.Alloc()
a.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
@@ -122,11 +124,11 @@ func TestAllocations_GarbageCollect_ACL(t *testing.T) {
server, addr, root := testACLServer(t, nil)
defer server.Shutdown()
client := TestClient(t, func(c *config.Config) {
client, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.ACLEnabled = true
})
defer client.Shutdown()
defer cleanup()
// Try request without a token and expect failure
{
@@ -178,7 +180,8 @@ func TestAllocations_Stats(t *testing.T) {
t.Skip("missing exec driver plugin implementation")
t.Parallel()
require := require.New(t)
client := TestClient(t, nil)
client, cleanup := TestClient(t, nil)
defer cleanup()
a := mock.Alloc()
require.Nil(client.addAlloc(a, ""))
@@ -213,11 +216,11 @@ func TestAllocations_Stats_ACL(t *testing.T) {
server, addr, root := testACLServer(t, nil)
defer server.Shutdown()
client := TestClient(t, func(c *config.Config) {
client, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.ACLEnabled = true
})
defer client.Shutdown()
defer cleanup()
// Try request without a token and expect failure
{

View File

@@ -34,9 +34,19 @@ type allocRunner struct {
clientConfig *config.Config
// stateUpdater is used to emit updated task state
// stateUpdater is used to emit updated alloc state
stateUpdater cinterfaces.AllocStateHandler
// taskStateUpdateCh is ticked whenever task state as changed. Must
// have len==1 to allow nonblocking notification of state updates while
// the goroutine is already processing a previous update.
taskStateUpdatedCh chan struct{}
// taskStateUpdateHandlerCh is closed when the task state handling
// goroutine exits. It is unsafe to destroy the local allocation state
// before this goroutine exits.
taskStateUpdateHandlerCh chan struct{}
// consulClient is the client used by the consul service hook for
// registering services and checks
consulClient consul.ConsulServiceAPI
@@ -48,8 +58,16 @@ type allocRunner struct {
waitCh chan struct{}
// destroyed is true when the Run() loop has exited, postrun hooks have
// run, and alloc runner has been destroyed
destroyed bool
// run, and alloc runner has been destroyed. Must acquire destroyedLock
// to access.
destroyed bool
// runLaunched is true if Run() has been called. If this is false
// Destroy() does not wait on tasks to shutdown as they are not
// running. Must acquire destroyedLock to access.
runLaunched bool
// destroyedLock guards destroyed, ran, and serializes Destroy() calls.
destroyedLock sync.Mutex
// Alloc captures the allocation being run.
@@ -70,8 +88,7 @@ type allocRunner struct {
runnerHooks []interfaces.RunnerHook
// tasks are the set of task runners
tasks map[string]*taskrunner.TaskRunner
tasksLock sync.RWMutex
tasks map[string]*taskrunner.TaskRunner
// allocBroadcaster sends client allocation updates to all listeners
allocBroadcaster *cstructs.AllocBroadcaster
@@ -94,19 +111,21 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
}
ar := &allocRunner{
id: alloc.ID,
alloc: alloc,
clientConfig: config.ClientConfig,
consulClient: config.Consul,
vaultClient: config.Vault,
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
waitCh: make(chan struct{}),
state: &state.State{},
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
allocBroadcaster: cstructs.NewAllocBroadcaster(),
prevAllocWatcher: config.PrevAllocWatcher,
pluginSingletonLoader: config.PluginSingletonLoader,
id: alloc.ID,
alloc: alloc,
clientConfig: config.ClientConfig,
consulClient: config.Consul,
vaultClient: config.Vault,
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
waitCh: make(chan struct{}),
state: &state.State{},
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
taskStateUpdatedCh: make(chan struct{}, 1),
taskStateUpdateHandlerCh: make(chan struct{}),
allocBroadcaster: cstructs.NewAllocBroadcaster(),
prevAllocWatcher: config.PrevAllocWatcher,
pluginSingletonLoader: config.PluginSingletonLoader,
}
// Create the logger based on the allocation ID
@@ -157,32 +176,37 @@ func (ar *allocRunner) WaitCh() <-chan struct{} {
return ar.waitCh
}
// XXX How does alloc Restart work
// Run is the main goroutine that executes all the tasks.
func (ar *allocRunner) Run() {
// Close the wait channel
ar.destroyedLock.Lock()
defer ar.destroyedLock.Unlock()
if ar.destroyed {
// Run should not be called after Destroy is called. This is a
// programming error.
ar.logger.Error("alloc destroyed; cannot run")
return
}
ar.runLaunched = true
go ar.runImpl()
}
func (ar *allocRunner) runImpl() {
// Close the wait channel on return
defer close(ar.waitCh)
var taskWaitCh <-chan struct{}
// Start the task state update handler
go ar.handleTaskStateUpdates()
// Run the prestart hooks
// XXX Equivalent to TR.Prestart hook
if err := ar.prerun(); err != nil {
ar.logger.Error("prerun failed", "error", err)
goto POST
}
// Run the runners
taskWaitCh = ar.runImpl()
MAIN:
for {
select {
case <-taskWaitCh:
// TaskRunners have all exited
break MAIN
}
}
// Run the runners and block until they exit
<-ar.runTasks()
POST:
// Run the postrun hooks
@@ -192,8 +216,8 @@ POST:
}
}
// runImpl is used to run the runners.
func (ar *allocRunner) runImpl() <-chan struct{} {
// runTasks is used to run the task runners.
func (ar *allocRunner) runTasks() <-chan struct{} {
for _, task := range ar.tasks {
go task.Run()
}
@@ -244,71 +268,146 @@ func (ar *allocRunner) Restore() error {
}
// TaskStateUpdated is called by TaskRunner when a task's state has been
// updated. This hook is used to compute changes to the alloc's ClientStatus
// and to update the server with the new state.
func (ar *allocRunner) TaskStateUpdated(taskName string, state *structs.TaskState) {
// If a task is dead, we potentially want to kill other tasks in the group
if state.State == structs.TaskStateDead {
// Find all tasks that are not the one that is dead and check if the one
// that is dead is a leader
var otherTaskRunners []*taskrunner.TaskRunner
var otherTaskNames []string
leader := false
// updated. It does not process the update synchronously but instead notifies a
// goroutine the state has change. Since processing the state change may cause
// the task to be killed (thus change its state again) it cannot be done
// synchronously as it would cause a deadlock due to reentrancy.
//
// The goroutine is used to compute changes to the alloc's ClientStatus and to
// update the server with the new state.
func (ar *allocRunner) TaskStateUpdated() {
select {
case ar.taskStateUpdatedCh <- struct{}{}:
default:
// already pending updates
}
}
// handleTaskStateUpdates must be run in goroutine as it monitors
// taskStateUpdateCh for task state update notifications and processes task
// states.
//
// Processing task state updates must be done in a goroutine as it may have to
// kill tasks which causes further task state updates.
func (ar *allocRunner) handleTaskStateUpdates() {
defer close(ar.taskStateUpdateHandlerCh)
for done := false; !done; {
select {
case <-ar.taskStateUpdatedCh:
case <-ar.waitCh:
// Tasks have exited, run once more to ensure final
// states are collected.
done = true
}
// Set with the appropriate event if task runners should be
// killed.
var killEvent *structs.TaskEvent
// If task runners should be killed, this is set to the task
// name whose fault it is.
killTask := ""
// True if task runners should be killed because a leader
// failed (informational).
leaderFailed := false
// Task state has been updated; gather the state of the other tasks
trNum := len(ar.tasks)
liveRunners := make([]*taskrunner.TaskRunner, 0, trNum)
states := make(map[string]*structs.TaskState, trNum)
for name, tr := range ar.tasks {
if name != taskName {
otherTaskRunners = append(otherTaskRunners, tr)
otherTaskNames = append(otherTaskNames, name)
} else if tr.Task().Leader {
leader = true
}
}
// If the task failed, we should kill all the other tasks in the task group.
if state.Failed {
if len(otherTaskRunners) > 0 {
ar.logger.Debug("task failure, destroying all tasks", "failed_task", taskName, "destroying", otherTaskNames)
}
for _, tr := range otherTaskRunners {
tr.Kill(context.Background(), structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName))
}
} else if leader {
if len(otherTaskRunners) > 0 {
ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", taskName, "destroying", otherTaskNames)
}
// If the task was a leader task we should kill all the other tasks.
for _, tr := range otherTaskRunners {
tr.Kill(context.Background(), structs.NewTaskEvent(structs.TaskLeaderDead))
}
}
}
// Gather the state of the other tasks
ar.tasksLock.RLock()
states := make(map[string]*structs.TaskState, len(ar.tasks))
for name, tr := range ar.tasks {
if name == taskName {
state := tr.TaskState()
states[name] = state
} else {
states[name] = tr.TaskState()
// Capture live task runners in case we need to kill them
if state.State != structs.TaskStateDead {
liveRunners = append(liveRunners, tr)
continue
}
// Task is dead, determine if other tasks should be killed
if state.Failed {
// Only set failed event if no event has been
// set yet to give dead leaders priority.
if killEvent == nil {
killTask = name
killEvent = structs.NewTaskEvent(structs.TaskSiblingFailed).
SetFailedSibling(name)
}
} else if tr.IsLeader() {
killEvent = structs.NewTaskEvent(structs.TaskLeaderDead)
leaderFailed = true
killTask = name
}
}
// If there's a kill event set and live runners, kill them
if killEvent != nil && len(liveRunners) > 0 {
// Log kill reason
if leaderFailed {
ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask)
} else {
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
}
ar.killTasks()
}
// Get the client allocation
calloc := ar.clientAlloc(states)
// Update the server
ar.stateUpdater.AllocStateUpdated(calloc)
// Broadcast client alloc to listeners
ar.allocBroadcaster.Send(calloc)
}
ar.tasksLock.RUnlock()
}
// Get the client allocation
calloc := ar.clientAlloc(states)
// killTasks kills all task runners, leader (if there is one) first. Errors are
// logged except taskrunner.ErrTaskNotRunning which is ignored.
func (ar *allocRunner) killTasks() {
// Kill leader first, synchronously
for name, tr := range ar.tasks {
if !tr.IsLeader() {
continue
}
// Update the server
ar.stateUpdater.AllocStateUpdated(calloc)
err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled))
if err != nil && err != taskrunner.ErrTaskNotRunning {
ar.logger.Warn("error stopping leader task", "error", err, "task_name", name)
}
break
}
// Broadcast client alloc to listeners
ar.allocBroadcaster.Send(calloc)
// Kill the rest concurrently
wg := sync.WaitGroup{}
for name, tr := range ar.tasks {
if tr.IsLeader() {
continue
}
wg.Add(1)
go func(name string, tr *taskrunner.TaskRunner) {
defer wg.Done()
err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled))
if err != nil && err != taskrunner.ErrTaskNotRunning {
ar.logger.Warn("error stopping task", "error", err, "task_name", name)
}
}(name, tr)
}
wg.Wait()
}
// clientAlloc takes in the task states and returns an Allocation populated
// with Client specific fields
func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *structs.Allocation {
ar.stateLock.RLock()
defer ar.stateLock.RUnlock()
ar.stateLock.Lock()
defer ar.stateLock.Unlock()
// store task states for AllocState to expose
ar.state.TaskStates = taskStates
@@ -405,12 +504,10 @@ func (ar *allocRunner) AllocState() *state.State {
// If TaskStateUpdated has not been called yet, ar.state.TaskStates
// won't be set as it is not the canonical source of TaskStates.
if len(state.TaskStates) == 0 {
ar.tasksLock.RLock()
ar.state.TaskStates = make(map[string]*structs.TaskState, len(ar.tasks))
for k, tr := range ar.tasks {
state.TaskStates[k] = tr.TaskState()
}
ar.tasksLock.RUnlock()
}
return state
@@ -422,18 +519,30 @@ func (ar *allocRunner) AllocState() *state.State {
// If there is already a pending update it will be discarded and replaced by
// the latest update.
func (ar *allocRunner) Update(update *structs.Allocation) {
// Detect Stop updates
stopping := !ar.Alloc().TerminalStatus() && update.TerminalStatus()
// Update ar.alloc
ar.setAlloc(update)
// Run hooks
if err := ar.update(update); err != nil {
ar.logger.Error("error running update hooks", "error", err)
// Run update hooks if not stopping or dead
if !update.TerminalStatus() {
if err := ar.update(update); err != nil {
ar.logger.Error("error running update hooks", "error", err)
}
}
// Update task runners
for _, tr := range ar.tasks {
tr.Update(update)
}
// If alloc is being terminated, kill all tasks, leader first
if stopping {
ar.killTasks()
}
}
func (ar *allocRunner) Listener() *cstructs.AllocListener {
@@ -446,37 +555,40 @@ func (ar *allocRunner) Listener() *cstructs.AllocListener {
// This method is safe for calling concurrently with Run() and will cause it to
// exit (thus closing WaitCh).
func (ar *allocRunner) Destroy() {
// Stop tasks
ar.tasksLock.RLock()
for name, tr := range ar.tasks {
err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled))
if err != nil {
if err == taskrunner.ErrTaskNotRunning {
ar.logger.Trace("task not running", "task_name", name)
} else {
ar.logger.Warn("failed to kill task", "error", err, "task_name", name)
}
}
ar.destroyedLock.Lock()
if ar.destroyed {
// Only destroy once
ar.destroyedLock.Unlock()
return
}
ar.tasksLock.RUnlock()
defer ar.destroyedLock.Unlock()
// Wait for tasks to exit and postrun hooks to finish
<-ar.waitCh
// Stop any running tasks
ar.killTasks()
// Wait for tasks to exit and postrun hooks to finish (if they ran at all)
if ar.runLaunched {
<-ar.waitCh
}
// Run destroy hooks
if err := ar.destroy(); err != nil {
ar.logger.Warn("error running destroy hooks", "error", err)
}
// Wait for task state update handler to exit before removing local
// state if Run() ran at all.
if ar.runLaunched {
<-ar.taskStateUpdateHandlerCh
}
// Cleanup state db
if err := ar.stateDB.DeleteAllocationBucket(ar.id); err != nil {
ar.logger.Warn("failed to delete allocation state", "error", err)
}
// Mark alloc as destroyed
ar.destroyedLock.Lock()
ar.destroyed = true
ar.destroyedLock.Unlock()
}
// IsDestroyed returns true if the alloc runner has been destroyed (stopped and
@@ -514,9 +626,6 @@ func (ar *allocRunner) StatsReporter() interfaces.AllocStatsReporter {
// LatestAllocStats returns the latest stats for an allocation. If taskFilter
// is set, only stats for that task -- if it exists -- are returned.
func (ar *allocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) {
ar.tasksLock.RLock()
defer ar.tasksLock.RUnlock()
astat := &cstructs.AllocResourceUsage{
Tasks: make(map[string]*cstructs.TaskResourceUsage, len(ar.tasks)),
ResourceUsage: &cstructs.ResourceUsage{

View File

@@ -40,7 +40,6 @@ func (a *allocHealthSetter) SetHealth(healthy, isDeploy bool, trackerTaskEvents
a.ar.stateLock.Unlock()
// If deployment is unhealthy emit task events explaining why
a.ar.tasksLock.RLock()
if !healthy && isDeploy {
for task, event := range trackerTaskEvents {
if tr, ok := a.ar.tasks[task]; ok {
@@ -56,7 +55,6 @@ func (a *allocHealthSetter) SetHealth(healthy, isDeploy bool, trackerTaskEvents
for name, tr := range a.ar.tasks {
states[name] = tr.TaskState()
}
a.ar.tasksLock.RUnlock()
// Build the client allocation
calloc := a.ar.clientAlloc(states)

View File

@@ -1,48 +1,353 @@
package allocrunner
import (
"fmt"
"sync"
"testing"
"time"
"github.com/hashicorp/nomad/client/allocwatcher"
"github.com/hashicorp/nomad/client/config"
consulapi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/shared/catalog"
"github.com/hashicorp/nomad/plugins/shared/loader"
"github.com/hashicorp/nomad/plugins/shared/singleton"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
// MockStateUpdater implements the AllocStateHandler interface and records
// alloc updates.
type MockStateUpdater struct {
Updates []*structs.Allocation
mu sync.Mutex
}
// AllocStateUpdated implements the AllocStateHandler interface and records an
// alloc update.
func (m *MockStateUpdater) AllocStateUpdated(alloc *structs.Allocation) {
m.mu.Lock()
m.Updates = append(m.Updates, alloc)
m.mu.Unlock()
}
// Last returns a copy of the last alloc (or nil) update. Safe for concurrent
// access with updates.
func (m *MockStateUpdater) Last() *structs.Allocation {
m.mu.Lock()
defer m.mu.Unlock()
n := len(m.Updates)
if n == 0 {
return nil
}
return m.Updates[n-1].Copy()
}
// Reset resets the recorded alloc updates.
func (m *MockStateUpdater) Reset() {
m.mu.Lock()
m.Updates = nil
m.mu.Unlock()
}
// testAllocRunnerConfig returns a new allocrunner.Config with mocks and noop
// versions of dependencies along with a cleanup func.
func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, func()) {
logger := testlog.HCLogger(t)
pluginLoader := catalog.TestPluginLoader(t)
clientConf, cleanup := config.TestClientConfig(t)
conf := &Config{
// Copy the alloc in case the caller edits and reuses it
Alloc: alloc.Copy(),
Logger: logger,
ClientConfig: clientConf,
StateDB: state.NoopDB{},
Consul: consulapi.NewMockConsulServiceClient(t, logger),
Vault: vaultclient.NewMockVaultClient(),
StateUpdater: &MockStateUpdater{},
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
PluginSingletonLoader: singleton.NewSingletonLoader(logger, pluginLoader),
}
return conf, cleanup
}
// TestAllocRunner_AllocState_Initialized asserts that getting TaskStates via
// AllocState() are initialized even before the AllocRunner has run.
func TestAllocRunner_AllocState_Initialized(t *testing.T) {
t.Skip("missing exec driver plugin implementation")
t.Parallel()
alloc := mock.Alloc()
logger := testlog.HCLogger(t)
alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
conf := &Config{
Alloc: alloc,
Logger: logger,
ClientConfig: config.TestClientConfig(),
StateDB: state.NoopDB{},
Consul: nil,
Vault: nil,
StateUpdater: nil,
PrevAllocWatcher: nil,
PluginSingletonLoader: &loader.MockCatalog{},
}
pluginLoader := catalog.TestPluginLoader(t)
conf.PluginSingletonLoader = singleton.NewSingletonLoader(logger, pluginLoader)
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
allocState := ar.AllocState()
require.NotNil(t, allocState)
require.NotNil(t, allocState.TaskStates[alloc.Job.TaskGroups[0].Tasks[0].Name])
require.NotNil(t, allocState.TaskStates[conf.Alloc.Job.TaskGroups[0].Tasks[0].Name])
}
// TestAllocRunner_TaskLeader_KillTG asserts that when a leader task dies the
// entire task group is killed.
func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
t.Parallel()
alloc := mock.BatchAlloc()
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
// Create two tasks in the task group
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Name = "task1"
task.Driver = "mock_driver"
task.KillTimeout = 10 * time.Millisecond
task.Config = map[string]interface{}{
"run_for": 10 * time.Second,
}
task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "task2"
task2.Driver = "mock_driver"
task2.Leader = true
task2.Config = map[string]interface{}{
"run_for": 1 * time.Second,
}
alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2)
alloc.TaskResources[task2.Name] = task2.Resources
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer ar.Destroy()
ar.Run()
// Wait for all tasks to be killed
upd := conf.StateUpdater.(*MockStateUpdater)
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
}
// Task1 should be killed because Task2 exited
state1 := last.TaskStates[task.Name]
if state1.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead)
}
if state1.FinishedAt.IsZero() || state1.StartedAt.IsZero() {
return false, fmt.Errorf("expected to have a start and finish time")
}
if len(state1.Events) < 2 {
// At least have a received and destroyed
return false, fmt.Errorf("Unexpected number of events")
}
found := false
for _, e := range state1.Events {
if e.Type != structs.TaskLeaderDead {
found = true
}
}
if !found {
return false, fmt.Errorf("Did not find event %v", structs.TaskLeaderDead)
}
// Task Two should be dead
state2 := last.TaskStates[task2.Name]
if state2.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead)
}
if state2.FinishedAt.IsZero() || state2.StartedAt.IsZero() {
return false, fmt.Errorf("expected to have a start and finish time")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
// TestAllocRunner_TaskLeader_StopTG asserts that when stopping an alloc with a
// leader the leader is stopped before other tasks.
func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
// Create 3 tasks in the task group
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Name = "follower1"
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": 10 * time.Second,
}
task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "leader"
task2.Driver = "mock_driver"
task2.Leader = true
task2.Config = map[string]interface{}{
"run_for": 10 * time.Second,
}
task3 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
task3.Name = "follower2"
task3.Driver = "mock_driver"
task3.Config = map[string]interface{}{
"run_for": 10 * time.Second,
}
alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2, task3)
alloc.TaskResources[task2.Name] = task2.Resources
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer ar.Destroy()
ar.Run()
// Wait for tasks to start
upd := conf.StateUpdater.(*MockStateUpdater)
last := upd.Last()
testutil.WaitForResult(func() (bool, error) {
last = upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if n := len(last.TaskStates); n != 3 {
return false, fmt.Errorf("Not enough task states (want: 3; found %d)", n)
}
for name, state := range last.TaskStates {
if state.State != structs.TaskStateRunning {
return false, fmt.Errorf("Task %q is not running yet (it's %q)", name, state.State)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Reset updates
upd.Reset()
// Stop alloc
update := alloc.Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(update)
// Wait for tasks to stop
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.TaskStates["leader"].FinishedAt.UnixNano() >= last.TaskStates["follower1"].FinishedAt.UnixNano() {
return false, fmt.Errorf("expected leader to finish before follower1: %s >= %s",
last.TaskStates["leader"].FinishedAt, last.TaskStates["follower1"].FinishedAt)
}
if last.TaskStates["leader"].FinishedAt.UnixNano() >= last.TaskStates["follower2"].FinishedAt.UnixNano() {
return false, fmt.Errorf("expected leader to finish before follower2: %s >= %s",
last.TaskStates["leader"].FinishedAt, last.TaskStates["follower2"].FinishedAt)
}
return true, nil
}, func(err error) {
last := upd.Last()
for name, state := range last.TaskStates {
t.Logf("%s: %s", name, state.State)
}
t.Fatalf("err: %v", err)
})
}
// TestAllocRunner_TaskLeader_StopRestoredTG asserts that when stopping a
// restored task group with a leader that failed before restoring the leader is
// not stopped as it does not exist.
// See https://github.com/hashicorp/nomad/issues/3420#issuecomment-341666932
func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
// Create a leader and follower task in the task group
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Name = "follower1"
task.Driver = "mock_driver"
task.KillTimeout = 10 * time.Second
task.Config = map[string]interface{}{
"run_for": "10s",
}
task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "leader"
task2.Driver = "mock_driver"
task2.Leader = true
task2.KillTimeout = 10 * time.Millisecond
task2.Config = map[string]interface{}{
"run_for": "10s",
}
alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2)
alloc.TaskResources[task2.Name] = task2.Resources
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
// Use a memory backed statedb
conf.StateDB = state.NewMemDB()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer ar.Destroy()
// Mimic Nomad exiting before the leader stopping is able to stop other tasks.
ar.tasks["leader"].UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled))
ar.tasks["follower1"].UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
// Create a new AllocRunner to test RestoreState and Run
ar2, err := NewAllocRunner(conf)
require.NoError(t, err)
defer ar2.Destroy()
if err := ar2.Restore(); err != nil {
t.Fatalf("error restoring state: %v", err)
}
ar2.Run()
// Wait for tasks to be stopped because leader is dead
testutil.WaitForResult(func() (bool, error) {
alloc := ar2.Alloc()
for task, state := range alloc.TaskStates {
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("Task %q should be dead: %v", task, state.State)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Make sure it GCs properly
ar2.Destroy()
select {
case <-ar2.WaitCh():
// exited as expected
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for AR to GC")
}
}
/*

View File

@@ -2,8 +2,6 @@ package interfaces
import (
"github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/nomad/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
)
@@ -24,8 +22,9 @@ type AllocRunner interface {
// TaskStateHandler exposes a handler to be called when a task's state changes
type TaskStateHandler interface {
// TaskStateUpdated is used to emit updated task state
TaskStateUpdated(task string, state *structs.TaskState)
// TaskStateUpdated is used to notify the alloc runner about task state
// changes.
TaskStateUpdated()
}
// AllocStatsReporter gives acess to the latest resource usage from the

View File

@@ -25,6 +25,23 @@ func NewLocalState() *LocalState {
}
}
// Canonicalize ensures LocalState is in a consistent state by initializing
// Hooks and ensuring no HookState's are nil. Useful for cleaning unmarshalled
// state which may be in an unknown state.
func (s *LocalState) Canonicalize() {
if s.Hooks == nil {
// Hooks is nil, create it
s.Hooks = make(map[string]*HookState)
} else {
for k, v := range s.Hooks {
// Remove invalid nil entries from Hooks map
if v == nil {
delete(s.Hooks, k)
}
}
}
}
// Copy should be called with the lock held
func (s *LocalState) Copy() *LocalState {
// Create a copy

View File

@@ -52,10 +52,11 @@ const (
)
type TaskRunner struct {
// allocID and taskName are immutable so these fields may be accessed
// without locks
allocID string
taskName string
// allocID, taskName, and taskLeader are immutable so these fields may
// be accessed without locks
allocID string
taskName string
taskLeader bool
alloc *structs.Allocation
allocLock sync.Mutex
@@ -170,9 +171,6 @@ type Config struct {
// VaultClient is the client to use to derive and renew Vault tokens
VaultClient vaultclient.VaultClient
// LocalState is optionally restored task state
LocalState *state.LocalState
// StateDB is used to store and restore state.
StateDB cstate.StateDB
@@ -196,6 +194,12 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
config.ClientConfig.Region,
)
// Initialize state from alloc if it is set
tstate := structs.NewTaskState()
if ts := config.Alloc.TaskStates[config.Task.Name]; ts != nil {
tstate = ts.Copy()
}
tr := &TaskRunner{
alloc: config.Alloc,
allocID: config.Alloc.ID,
@@ -203,11 +207,12 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
task: config.Task,
taskDir: config.TaskDir,
taskName: config.Task.Name,
taskLeader: config.Task.Leader,
envBuilder: envBuilder,
consulClient: config.Consul,
vaultClient: config.VaultClient,
state: config.Alloc.TaskStates[config.Task.Name].Copy(),
localState: config.LocalState,
state: tstate,
localState: state.NewLocalState(),
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
ctx: trCtx,
@@ -228,9 +233,6 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
}
tr.restartTracker = restarts.NewRestartTracker(tg.RestartPolicy, tr.alloc.Job.Type)
// Initialize the task state
tr.initState()
// Get the driver
if err := tr.initDriver(); err != nil {
tr.logger.Error("failed to create driver", "error", err)
@@ -246,17 +248,6 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
return tr, nil
}
func (tr *TaskRunner) initState() {
if tr.state == nil {
tr.state = &structs.TaskState{
State: structs.TaskStatePending,
}
}
if tr.localState == nil {
tr.localState = state.NewLocalState()
}
}
func (tr *TaskRunner) initLabels() {
alloc := tr.Alloc()
tr.baseLabels = []metrics.Label{
@@ -398,15 +389,6 @@ func (tr *TaskRunner) handleUpdates() {
return
}
if tr.Alloc().TerminalStatus() {
// Terminal update: kill TaskRunner and let Run execute postrun hooks
err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled))
if err != nil {
tr.logger.Warn("error stopping task", "error", err)
}
continue
}
// Non-terminal update; run hooks
tr.updateHooks()
}
@@ -597,10 +579,6 @@ func (tr *TaskRunner) buildTaskConfig() *drivers.TaskConfig {
}
}
// XXX If the objects don't exists since the client shutdown before the task
// runner ever saved state, then we should treat it as a new task runner and not
// return an error
//
// Restore task runner state. Called by AllocRunner.Restore after NewTaskRunner
// but before Run so no locks need to be acquired.
func (tr *TaskRunner) Restore() error {
@@ -609,33 +587,35 @@ func (tr *TaskRunner) Restore() error {
return err
}
tr.localState = ls
tr.state = ts
if ls != nil {
ls.Canonicalize()
tr.localState = ls
}
if ts != nil {
ts.Canonicalize()
tr.state = ts
}
return nil
}
// UpdateState sets the task runners allocation state and triggers a server
// update.
func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) {
tr.logger.Debug("setting task state", "state", state, "event", event.Type)
tr.logger.Trace("setting task state", "state", state, "event", event.Type)
// Update the local state
stateCopy := tr.setStateLocal(state, event)
tr.setStateLocal(state, event)
// Notify the alloc runner of the transition
tr.stateUpdater.TaskStateUpdated(tr.taskName, stateCopy)
tr.stateUpdater.TaskStateUpdated()
}
// setStateLocal updates the local in-memory state, persists a copy to disk and returns a
// copy of the task's state.
func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *structs.TaskState {
func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) {
tr.stateLock.Lock()
defer tr.stateLock.Unlock()
//XXX REMOVE ME AFTER TESTING
if state == "" {
panic("UpdateState must not be called with an empty state")
}
// Update the task state
oldState := tr.state.State
taskState := tr.state
@@ -687,8 +667,6 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *str
// try to persist it again.
tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state)
}
return tr.state.Copy()
}
// EmitEvent appends a new TaskEvent to this task's TaskState. The actual
@@ -709,7 +687,7 @@ func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) {
}
// Notify the alloc runner of the event
tr.stateUpdater.TaskStateUpdated(tr.taskName, tr.state.Copy())
tr.stateUpdater.TaskStateUpdated()
}
// AppendEvent appends a new TaskEvent to this task's TaskState. The actual
@@ -773,8 +751,10 @@ func (tr *TaskRunner) Update(update *structs.Allocation) {
// Update tr.alloc
tr.setAlloc(update)
// Trigger update hooks
tr.triggerUpdateHooks()
// Trigger update hooks if not terminal
if !update.TerminalStatus() {
tr.triggerUpdateHooks()
}
}
// triggerUpdate if there isn't already an update pending. Should be called

View File

@@ -16,6 +16,11 @@ func (tr *TaskRunner) setAlloc(updated *structs.Allocation) {
tr.allocLock.Unlock()
}
// IsLeader returns true if this task is the leader of its task group.
func (tr *TaskRunner) IsLeader() bool {
return tr.taskLeader
}
func (tr *TaskRunner) Task() *structs.Task {
tr.taskLock.RLock()
defer tr.taskLock.RUnlock()

View File

@@ -101,8 +101,11 @@ func (tr *TaskRunner) prestart() error {
TaskEnv: tr.envBuilder.Build(),
}
var origHookState *state.HookState
tr.localStateLock.RLock()
origHookState := tr.localState.Hooks[name]
if tr.localState.Hooks != nil {
origHookState = tr.localState.Hooks[name]
}
tr.localStateLock.RUnlock()
if origHookState != nil && origHookState.PrestartDone {
tr.logger.Trace("skipping done prestart hook", "name", pre.Name())

View File

@@ -760,6 +760,11 @@ func (c *Client) restoreState() error {
var mErr multierror.Error
for _, alloc := range allocs {
//XXX On Restore we give up on watching previous allocs because
// we need the local AllocRunners initialized first. We could
// add a second loop to initialize just the alloc watcher.
prevAllocWatcher := allocwatcher.NoopPrevAlloc{}
c.configLock.RLock()
arConf := &allocrunner.Config{
Alloc: alloc,
@@ -769,6 +774,7 @@ func (c *Client) restoreState() error {
StateUpdater: c,
Consul: c.consulService,
Vault: c.vaultClient,
PrevAllocWatcher: prevAllocWatcher,
PluginLoader: c.config.PluginLoader,
PluginSingletonLoader: c.config.PluginSingletonLoader,
}
@@ -805,7 +811,7 @@ func (c *Client) restoreState() error {
// All allocs restored successfully, run them!
c.allocLock.Lock()
for _, ar := range c.allocs {
go ar.Run()
ar.Run()
}
c.allocLock.Unlock()
@@ -1933,7 +1939,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
// Store the alloc runner.
c.allocs[alloc.ID] = ar
go ar.Run()
ar.Run()
return nil
}

View File

@@ -14,7 +14,8 @@ import (
func TestClientStats_Stats(t *testing.T) {
t.Parallel()
require := require.New(t)
client := TestClient(t, nil)
client, cleanup := TestClient(t, nil)
defer cleanup()
req := &nstructs.NodeSpecificRequest{}
var resp structs.ClientStatsResponse
@@ -30,11 +31,11 @@ func TestClientStats_Stats_ACL(t *testing.T) {
server, addr, root := testACLServer(t, nil)
defer server.Shutdown()
client := TestClient(t, func(c *config.Config) {
client, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.ACLEnabled = true
})
defer client.Shutdown()
defer cleanup()
// Try request without a token and expect failure
{

View File

@@ -37,7 +37,8 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) {
func TestClient_StartStop(t *testing.T) {
t.Parallel()
client := TestClient(t, nil)
client, cleanup := TestClient(t, nil)
defer cleanup()
if err := client.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}
@@ -49,10 +50,11 @@ func TestClient_BaseLabels(t *testing.T) {
t.Parallel()
assert := assert.New(t)
client := TestClient(t, nil)
client, cleanup := TestClient(t, nil)
if err := client.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}
defer cleanup()
// directly invoke this function, as otherwise this will fail on a CI build
// due to a race condition
@@ -74,10 +76,10 @@ func TestClient_RPC(t *testing.T) {
s1, addr := testServer(t, nil)
defer s1.Shutdown()
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
})
defer c1.Shutdown()
defer cleanup()
// RPC should succeed
testutil.WaitForResult(func() (bool, error) {
@@ -94,10 +96,10 @@ func TestClient_RPC_FireRetryWatchers(t *testing.T) {
s1, addr := testServer(t, nil)
defer s1.Shutdown()
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
})
defer c1.Shutdown()
defer cleanup()
watcher := c1.rpcRetryWatcher()
@@ -122,10 +124,10 @@ func TestClient_RPC_Passthrough(t *testing.T) {
s1, _ := testServer(t, nil)
defer s1.Shutdown()
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
defer c1.Shutdown()
defer cleanup()
// RPC should succeed
testutil.WaitForResult(func() (bool, error) {
@@ -140,8 +142,8 @@ func TestClient_RPC_Passthrough(t *testing.T) {
func TestClient_Fingerprint(t *testing.T) {
t.Parallel()
c := TestClient(t, nil)
defer c.Shutdown()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Ensure we are fingerprinting
testutil.WaitForResult(func() (bool, error) {
@@ -162,13 +164,13 @@ func TestClient_Fingerprint_Periodic(t *testing.T) {
t.Skip("missing mock driver plugin implementation")
t.Parallel()
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
driver.ShutdownPeriodicAfter: "true",
driver.ShutdownPeriodicDuration: "1",
}
})
defer c1.Shutdown()
defer cleanup()
node := c1.config.Node
{
@@ -251,10 +253,10 @@ func TestClient_MixedTLS(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
})
defer c1.Shutdown()
defer cleanup()
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
@@ -301,7 +303,7 @@ func TestClient_BadTLS(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.TLSConfig = &nconfig.TLSConfig{
EnableHTTP: true,
@@ -312,7 +314,7 @@ func TestClient_BadTLS(t *testing.T) {
KeyFile: badkey,
}
})
defer c1.Shutdown()
defer cleanup()
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
@@ -339,10 +341,10 @@ func TestClient_Register(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
defer c1.Shutdown()
defer cleanup()
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
@@ -373,10 +375,10 @@ func TestClient_Heartbeat(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
defer c1.Shutdown()
defer cleanup()
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
@@ -406,10 +408,10 @@ func TestClient_UpdateAllocStatus(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
defer c1.Shutdown()
defer cleanup()
// Wait until the node is ready
waitTilNodeReady(c1, t)
@@ -457,10 +459,10 @@ func TestClient_WatchAllocs(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
defer c1.Shutdown()
defer cleanup()
// Wait until the node is ready
waitTilNodeReady(c1, t)
@@ -552,11 +554,11 @@ func TestClient_SaveRestoreState(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.DevMode = false
c.RPCHandler = s1
})
defer c1.Shutdown()
defer cleanup()
// Wait until the node is ready
waitTilNodeReady(c1, t)
@@ -670,10 +672,10 @@ func TestClient_BlockedAllocations(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
defer c1.Shutdown()
defer cleanup()
// Wait for the node to be ready
state := s1.State()
@@ -781,10 +783,10 @@ func TestClient_ValidateMigrateToken_ValidToken(t *testing.T) {
t.Parallel()
assert := assert.New(t)
c := TestClient(t, func(c *config.Config) {
c, cleanup := TestClient(t, func(c *config.Config) {
c.ACLEnabled = true
})
defer c.Shutdown()
defer cleanup()
alloc := mock.Alloc()
validToken, err := structs.GenerateMigrateToken(alloc.ID, c.secretNodeID())
@@ -797,10 +799,10 @@ func TestClient_ValidateMigrateToken_InvalidToken(t *testing.T) {
t.Parallel()
assert := assert.New(t)
c := TestClient(t, func(c *config.Config) {
c, cleanup := TestClient(t, func(c *config.Config) {
c.ACLEnabled = true
})
defer c.Shutdown()
defer cleanup()
assert.Equal(c.ValidateMigrateToken("", ""), false)
@@ -813,8 +815,8 @@ func TestClient_ValidateMigrateToken_ACLDisabled(t *testing.T) {
t.Parallel()
assert := assert.New(t)
c := TestClient(t, func(c *config.Config) {})
defer c.Shutdown()
c, cleanup := TestClient(t, func(c *config.Config) {})
defer cleanup()
assert.Equal(c.ValidateMigrateToken("", ""), true)
}
@@ -835,10 +837,10 @@ func TestClient_ReloadTLS_UpgradePlaintextToTLS(t *testing.T) {
fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem"
)
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
})
defer c1.Shutdown()
defer cleanup()
// Registering a node over plaintext should succeed
{
@@ -911,7 +913,7 @@ func TestClient_ReloadTLS_DowngradeTLSToPlaintext(t *testing.T) {
fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem"
)
c1 := TestClient(t, func(c *config.Config) {
c1, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.TLSConfig = &nconfig.TLSConfig{
EnableHTTP: true,
@@ -922,7 +924,7 @@ func TestClient_ReloadTLS_DowngradeTLSToPlaintext(t *testing.T) {
KeyFile: fookey,
}
})
defer c1.Shutdown()
defer cleanup()
// assert that when one node is running in encrypted mode, a RPC request to a
// node running in plaintext mode should fail
@@ -974,7 +976,8 @@ func TestClient_ReloadTLS_DowngradeTLSToPlaintext(t *testing.T) {
// nomad server list.
func TestClient_ServerList(t *testing.T) {
t.Parallel()
client := TestClient(t, func(c *config.Config) {})
client, cleanup := TestClient(t, func(c *config.Config) {})
defer cleanup()
if s := client.GetServers(); len(s) != 0 {
t.Fatalf("expected server lit to be empty but found: %+q", s)

View File

@@ -1,13 +1,43 @@
package config
import (
"io/ioutil"
"os"
"path/filepath"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/go-testing-interface"
)
// TestClientConfig returns a default client configuration for test clients.
func TestClientConfig() *Config {
// TestClientConfig returns a default client configuration for test clients and
// a cleanup func to remove the state and alloc dirs when finished.
func TestClientConfig(t testing.T) (*Config, func()) {
conf := DefaultConfig()
// Create a tempdir to hold state and alloc subdirs
parent, err := ioutil.TempDir("", "nomadtest")
if err != nil {
t.Fatalf("error creating client dir: %v", err)
}
cleanup := func() {
os.RemoveAll(parent)
}
allocDir := filepath.Join(parent, "allocs")
if err := os.Mkdir(allocDir, 0777); err != nil {
cleanup()
t.Fatalf("error creating alloc dir: %v", err)
}
conf.AllocDir = allocDir
stateDir := filepath.Join(parent, "client")
if err := os.Mkdir(stateDir, 0777); err != nil {
cleanup()
t.Fatalf("error creating alloc dir: %v", err)
}
conf.StateDir = stateDir
conf.VaultConfig.Enabled = helper.BoolToPtr(false)
conf.DevMode = true
conf.Node = &structs.Node{
@@ -19,5 +49,5 @@ func TestClientConfig() *Config {
// Loosen GC threshold
conf.GCDiskUsageThreshold = 98.0
conf.GCInodeUsageThreshold = 98.0
return conf
return conf, cleanup
}

View File

@@ -20,10 +20,10 @@ func TestFingerprintManager_Run_MockDriver(t *testing.T) {
t.Skip("missing mock driver plugin implementation")
t.Parallel()
require := require.New(t)
testClient := TestClient(t, nil)
testClient, cleanup := TestClient(t, nil)
testClient.logger = testlog.HCLogger(t)
defer testClient.Shutdown()
defer cleanup()
fm := NewFingerprintManager(
testClient.config.PluginSingletonLoader,
@@ -48,10 +48,8 @@ func TestFingerprintManager_Run_MockDriver(t *testing.T) {
func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) {
t.Parallel()
require := require.New(t)
testClient := TestClient(t, nil)
testClient.logger = testlog.HCLogger(t)
defer testClient.Shutdown()
testClient, cleanup := TestClient(t, nil)
defer cleanup()
fm := NewFingerprintManager(
testClient.config.PluginSingletonLoader,
@@ -76,14 +74,12 @@ func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) {
func TestFingerprintManager_Fingerprint_Run(t *testing.T) {
t.Parallel()
require := require.New(t)
testClient := TestClient(t, func(c *config.Config) {
testClient, cleanup := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
"driver.raw_exec.enable": "true",
}
})
testClient.logger = testlog.HCLogger(t)
defer testClient.Shutdown()
defer cleanup()
fm := NewFingerprintManager(
testClient.config.PluginSingletonLoader,
@@ -109,15 +105,13 @@ func TestFingerprintManager_Fingerprint_Periodic(t *testing.T) {
t.Skip("missing mock driver plugin implementation")
t.Parallel()
require := require.New(t)
testClient := TestClient(t, func(c *config.Config) {
testClient, cleanup := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
"test.shutdown_periodic_after": "true",
"test.shutdown_periodic_duration": "2",
}
})
testClient.logger = testlog.HCLogger(t)
defer testClient.Shutdown()
defer cleanup()
fm := NewFingerprintManager(
testClient.config.PluginSingletonLoader,
@@ -172,16 +166,14 @@ func TestFingerprintManager_HealthCheck_Driver(t *testing.T) {
t.Skip("missing mock driver plugin implementation")
t.Parallel()
require := require.New(t)
testClient := TestClient(t, func(c *config.Config) {
testClient, cleanup := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
"driver.raw_exec.enable": "1",
"test.shutdown_periodic_after": "true",
"test.shutdown_periodic_duration": "2",
}
})
testClient.logger = testlog.HCLogger(t)
defer testClient.Shutdown()
defer cleanup()
fm := NewFingerprintManager(
testClient.config.PluginSingletonLoader,
@@ -275,15 +267,13 @@ func TestFingerprintManager_HealthCheck_Periodic(t *testing.T) {
t.Skip("missing mock driver plugin implementation")
t.Parallel()
require := require.New(t)
testClient := TestClient(t, func(c *config.Config) {
testClient, cleanup := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
"test.shutdown_periodic_after": "true",
"test.shutdown_periodic_duration": "2",
}
})
testClient.logger = testlog.HCLogger(t)
defer testClient.Shutdown()
defer cleanup()
fm := NewFingerprintManager(
testClient.config.PluginSingletonLoader,
@@ -373,15 +363,13 @@ func TestFimgerprintManager_Run_InWhitelist(t *testing.T) {
t.Parallel()
require := require.New(t)
testClient := TestClient(t, func(c *config.Config) {
testClient, cleanup := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
"test.shutdown_periodic_after": "true",
"test.shutdown_periodic_duration": "2",
}
})
testClient.logger = testlog.HCLogger(t)
defer testClient.Shutdown()
defer cleanup()
fm := NewFingerprintManager(
testClient.config.PluginSingletonLoader,
@@ -404,15 +392,13 @@ func TestFimgerprintManager_Run_InWhitelist(t *testing.T) {
func TestFingerprintManager_Run_InBlacklist(t *testing.T) {
t.Parallel()
require := require.New(t)
testClient := TestClient(t, func(c *config.Config) {
testClient, cleanup := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
"fingerprint.whitelist": " arch,memory,foo,bar ",
"fingerprint.blacklist": " cpu ",
}
})
testClient.logger = testlog.HCLogger(t)
defer testClient.Shutdown()
defer cleanup()
fm := NewFingerprintManager(
testClient.config.PluginSingletonLoader,
@@ -437,15 +423,13 @@ func TestFingerprintManager_Run_Combination(t *testing.T) {
t.Parallel()
require := require.New(t)
testClient := TestClient(t, func(c *config.Config) {
testClient, cleanup := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
"fingerprint.whitelist": " arch,cpu,memory,foo,bar ",
"fingerprint.blacklist": " memory,nomad ",
}
})
testClient.logger = testlog.HCLogger(t)
defer testClient.Shutdown()
defer cleanup()
fm := NewFingerprintManager(
testClient.config.PluginSingletonLoader,
@@ -471,15 +455,13 @@ func TestFingerprintManager_Run_Combination(t *testing.T) {
func TestFingerprintManager_Run_WhitelistDrivers(t *testing.T) {
t.Parallel()
require := require.New(t)
testClient := TestClient(t, func(c *config.Config) {
testClient, cleanup := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
"driver.raw_exec.enable": "1",
"driver.whitelist": " raw_exec , foo ",
}
})
testClient.logger = testlog.HCLogger(t)
defer testClient.Shutdown()
defer cleanup()
fm := NewFingerprintManager(
testClient.config.PluginSingletonLoader,
@@ -503,15 +485,13 @@ func TestFingerprintManager_Run_AllDriversBlacklisted(t *testing.T) {
t.Parallel()
require := require.New(t)
testClient := TestClient(t, func(c *config.Config) {
testClient, cleanup := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
"driver.raw_exec.enable": "1",
"driver.whitelist": " foo,bar,baz ",
}
})
testClient.logger = testlog.HCLogger(t)
defer testClient.Shutdown()
defer cleanup()
fm := NewFingerprintManager(
testClient.config.PluginSingletonLoader,
@@ -538,7 +518,7 @@ func TestFingerprintManager_Run_DriversWhiteListBlacklistCombination(t *testing.
t.Parallel()
require := require.New(t)
testClient := TestClient(t, func(c *config.Config) {
testClient, cleanup := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
"driver.raw_exec.enable": "1",
"driver.whitelist": " raw_exec,exec,foo,bar,baz ",
@@ -547,7 +527,7 @@ func TestFingerprintManager_Run_DriversWhiteListBlacklistCombination(t *testing.
})
testClient.logger = testlog.HCLogger(t)
defer testClient.Shutdown()
defer cleanup()
fm := NewFingerprintManager(
testClient.config.PluginSingletonLoader,
@@ -572,14 +552,14 @@ func TestFingerprintManager_Run_DriverFailure(t *testing.T) {
t.Parallel()
require := require.New(t)
testClient := TestClient(t, func(c *config.Config) {
testClient, cleanup := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
"driver.raw_exec.enable": "1",
}
})
testClient.logger = testlog.HCLogger(t)
defer testClient.Shutdown()
defer cleanup()
singLoader := testClient.config.PluginSingletonLoader
@@ -626,7 +606,7 @@ func TestFingerprintManager_Run_DriversInBlacklist(t *testing.T) {
t.Parallel()
require := require.New(t)
testClient := TestClient(t, func(c *config.Config) {
testClient, cleanup := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
"driver.raw_exec.enable": "1",
"driver.whitelist": " raw_exec,foo,bar,baz ",
@@ -635,7 +615,7 @@ func TestFingerprintManager_Run_DriversInBlacklist(t *testing.T) {
})
testClient.logger = testlog.HCLogger(t)
defer testClient.Shutdown()
defer cleanup()
fm := NewFingerprintManager(
testClient.config.PluginSingletonLoader,

View File

@@ -57,8 +57,8 @@ func TestFS_Stat_NoAlloc(t *testing.T) {
require := require.New(t)
// Start a client
c := TestClient(t, nil)
defer c.Shutdown()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Make the request with bad allocation id
req := &cstructs.FsStatRequest{
@@ -79,8 +79,8 @@ func TestFS_Stat(t *testing.T) {
require := require.New(t)
// Start a client
c := TestClient(t, nil)
defer c.Shutdown()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Create and add an alloc
a := mock.Alloc()
@@ -134,11 +134,11 @@ func TestFS_Stat_ACL(t *testing.T) {
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
client := TestClient(t, func(c *config.Config) {
client, cleanup := TestClient(t, func(c *config.Config) {
c.ACLEnabled = true
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer client.Shutdown()
defer cleanup()
// Create a bad token
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny})
@@ -196,8 +196,8 @@ func TestFS_List_NoAlloc(t *testing.T) {
require := require.New(t)
// Start a client
c := TestClient(t, nil)
defer c.Shutdown()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Make the request with bad allocation id
req := &cstructs.FsListRequest{
@@ -218,8 +218,8 @@ func TestFS_List(t *testing.T) {
require := require.New(t)
// Start a client
c := TestClient(t, nil)
defer c.Shutdown()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Create and add an alloc
a := mock.Alloc()
@@ -273,11 +273,11 @@ func TestFS_List_ACL(t *testing.T) {
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
client := TestClient(t, func(c *config.Config) {
client, cleanup := TestClient(t, func(c *config.Config) {
c.ACLEnabled = true
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer client.Shutdown()
defer cleanup()
// Create a bad token
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny})
@@ -335,8 +335,8 @@ func TestFS_Stream_NoAlloc(t *testing.T) {
require := require.New(t)
// Start a client
c := TestClient(t, nil)
defer c.Shutdown()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Make the request with bad allocation id
req := &cstructs.FsStreamRequest{
@@ -414,11 +414,11 @@ func TestFS_Stream_ACL(t *testing.T) {
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
client := TestClient(t, func(c *config.Config) {
client, cleanup := TestClient(t, func(c *config.Config) {
c.ACLEnabled = true
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer client.Shutdown()
defer cleanup()
// Create a bad token
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
@@ -534,10 +534,10 @@ func TestFS_Stream(t *testing.T) {
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
c := TestClient(t, func(c *config.Config) {
c, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
expected := "Hello from the other side"
job := mock.BatchJob()
@@ -645,10 +645,10 @@ func TestFS_Stream_Follow(t *testing.T) {
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
c := TestClient(t, func(c *config.Config) {
c, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
expectedBase := "Hello from the other side"
repeat := 10
@@ -743,10 +743,10 @@ func TestFS_Stream_Limit(t *testing.T) {
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
c := TestClient(t, func(c *config.Config) {
c, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
var limit int64 = 5
full := "Hello from the other side"
@@ -833,8 +833,8 @@ func TestFS_Logs_NoAlloc(t *testing.T) {
require := require.New(t)
// Start a client
c := TestClient(t, nil)
defer c.Shutdown()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Make the request with bad allocation id
req := &cstructs.FsLogsRequest{
@@ -916,10 +916,10 @@ func TestFS_Logs_TaskPending(t *testing.T) {
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
c := TestClient(t, func(c *config.Config) {
c, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
job := mock.BatchJob()
job.TaskGroups[0].Count = 1
@@ -1024,11 +1024,11 @@ func TestFS_Logs_ACL(t *testing.T) {
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
client := TestClient(t, func(c *config.Config) {
client, cleanup := TestClient(t, func(c *config.Config) {
c.ACLEnabled = true
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer client.Shutdown()
defer cleanup()
// Create a bad token
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
@@ -1145,10 +1145,10 @@ func TestFS_Logs(t *testing.T) {
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
c := TestClient(t, func(c *config.Config) {
c, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
expected := "Hello from the other side\n"
job := mock.BatchJob()
@@ -1247,10 +1247,10 @@ func TestFS_Logs_Follow(t *testing.T) {
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
c := TestClient(t, func(c *config.Config) {
c, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
expectedBase := "Hello from the other side\n"
repeat := 10
@@ -1545,8 +1545,8 @@ func TestFS_findClosest(t *testing.T) {
func TestFS_streamFile_NoFile(t *testing.T) {
t.Parallel()
require := require.New(t)
c := TestClient(t, nil)
defer c.Shutdown()
c, cleanup := TestClient(t, nil)
defer cleanup()
ad := tempAllocDir(t)
defer os.RemoveAll(ad.AllocDir)
@@ -1565,8 +1565,8 @@ func TestFS_streamFile_NoFile(t *testing.T) {
func TestFS_streamFile_Modify(t *testing.T) {
t.Parallel()
c := TestClient(t, nil)
defer c.Shutdown()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Get a temp alloc dir
ad := tempAllocDir(t)
@@ -1634,8 +1634,8 @@ func TestFS_streamFile_Modify(t *testing.T) {
func TestFS_streamFile_Truncate(t *testing.T) {
t.Parallel()
c := TestClient(t, nil)
defer c.Shutdown()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Get a temp alloc dir
ad := tempAllocDir(t)
@@ -1739,8 +1739,8 @@ func TestFS_streamFile_Truncate(t *testing.T) {
func TestFS_streamImpl_Delete(t *testing.T) {
t.Parallel()
c := TestClient(t, nil)
defer c.Shutdown()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Get a temp alloc dir
ad := tempAllocDir(t)
@@ -1807,8 +1807,8 @@ func TestFS_streamImpl_Delete(t *testing.T) {
func TestFS_logsImpl_NoFollow(t *testing.T) {
t.Parallel()
c := TestClient(t, nil)
defer c.Shutdown()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Get a temp alloc dir and create the log dir
ad := tempAllocDir(t)
@@ -1874,8 +1874,8 @@ func TestFS_logsImpl_NoFollow(t *testing.T) {
func TestFS_logsImpl_Follow(t *testing.T) {
t.Parallel()
c := TestClient(t, nil)
defer c.Shutdown()
c, cleanup := TestClient(t, nil)
defer cleanup()
// Get a temp alloc dir and create the log dir
ad := tempAllocDir(t)

View File

@@ -19,10 +19,10 @@ func TestRpc_streamingRpcConn_badEndpoint(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c := TestClient(t, func(c *config.Config) {
c, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{s1.GetConfig().RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {
@@ -75,7 +75,7 @@ func TestRpc_streamingRpcConn_badEndpoint_TLS(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c := TestClient(t, func(c *config.Config) {
c, cleanup := TestClient(t, func(c *config.Config) {
c.Region = "regionFoo"
c.Servers = []string{s1.GetConfig().RPCAddr.String()}
c.TLSConfig = &sconfig.TLSConfig{
@@ -87,7 +87,7 @@ func TestRpc_streamingRpcConn_badEndpoint_TLS(t *testing.T) {
KeyFile: fookey,
}
})
defer c.Shutdown()
defer cleanup()
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {

192
client/state/db_test.go Normal file
View File

@@ -0,0 +1,192 @@
package state
import (
"io/ioutil"
"os"
"reflect"
"testing"
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)
func setupBoltDB(t *testing.T) (*BoltStateDB, func()) {
dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(t, err)
db, err := NewBoltStateDB(dir)
if err != nil {
if err := os.RemoveAll(dir); err != nil {
t.Logf("error removing boltdb dir: %v", err)
}
t.Fatalf("error creating boltdb: %v", err)
}
cleanup := func() {
if err := db.Close(); err != nil {
t.Errorf("error closing boltdb: %v", err)
}
if err := os.RemoveAll(dir); err != nil {
t.Logf("error removing boltdb dir: %v", err)
}
}
return db.(*BoltStateDB), cleanup
}
func testDB(t *testing.T, f func(*testing.T, StateDB)) {
boltdb, cleanup := setupBoltDB(t)
defer cleanup()
memdb := NewMemDB()
impls := []StateDB{boltdb, memdb}
for _, db := range impls {
db := db
t.Run(db.Name(), func(t *testing.T) {
f(t, db)
})
}
}
// TestStateDB asserts the behavior of GetAllAllocations, PutAllocation, and
// DeleteAllocationBucket for all operational StateDB implementations.
func TestStateDB_Allocations(t *testing.T) {
t.Parallel()
testDB(t, func(t *testing.T, db StateDB) {
require := require.New(t)
// Empty database should return empty non-nil results
allocs, errs, err := db.GetAllAllocations()
require.NoError(err)
require.NotNil(allocs)
require.Empty(allocs)
require.NotNil(errs)
require.Empty(errs)
// Put allocations
alloc1 := mock.Alloc()
alloc2 := mock.BatchAlloc()
//XXX Sadly roundtripping allocs loses time.Duration type
// information from the Config map[string]interface{}. As
// the mock driver itself with unmarshal run_for into the
// proper type, we can safely ignore it here.
delete(alloc2.Job.TaskGroups[0].Tasks[0].Config, "run_for")
require.NoError(db.PutAllocation(alloc1))
require.NoError(db.PutAllocation(alloc2))
// Retrieve them
allocs, errs, err = db.GetAllAllocations()
require.NoError(err)
require.NotNil(allocs)
require.Len(allocs, 2)
for _, a := range allocs {
switch a.ID {
case alloc1.ID:
if !reflect.DeepEqual(a, alloc1) {
pretty.Ldiff(t, a, alloc1)
t.Fatalf("alloc %q unequal", a.ID)
}
case alloc2.ID:
if !reflect.DeepEqual(a, alloc2) {
pretty.Ldiff(t, a, alloc2)
t.Fatalf("alloc %q unequal", a.ID)
}
default:
t.Fatalf("unexpected alloc id %q", a.ID)
}
}
require.NotNil(errs)
require.Empty(errs)
// Add another
alloc3 := mock.SystemAlloc()
require.NoError(db.PutAllocation(alloc3))
allocs, errs, err = db.GetAllAllocations()
require.NoError(err)
require.NotNil(allocs)
require.Len(allocs, 3)
require.Contains(allocs, alloc1)
require.Contains(allocs, alloc2)
require.Contains(allocs, alloc3)
require.NotNil(errs)
require.Empty(errs)
// Deleting a nonexistent alloc is a noop
require.NoError(db.DeleteAllocationBucket("asdf"))
allocs, _, err = db.GetAllAllocations()
require.NoError(err)
require.NotNil(allocs)
require.Len(allocs, 3)
// Delete alloc1
require.NoError(db.DeleteAllocationBucket(alloc1.ID))
allocs, errs, err = db.GetAllAllocations()
require.NoError(err)
require.NotNil(allocs)
require.Len(allocs, 2)
require.Contains(allocs, alloc2)
require.Contains(allocs, alloc3)
require.NotNil(errs)
require.Empty(errs)
})
}
// TestStateDB_TaskState asserts the behavior of task state related StateDB
// methods.
func TestStateDB_TaskState(t *testing.T) {
t.Parallel()
testDB(t, func(t *testing.T, db StateDB) {
require := require.New(t)
// Getting nonexistent state should return nils
ls, ts, err := db.GetTaskRunnerState("allocid", "taskname")
require.NoError(err)
require.Nil(ls)
require.Nil(ts)
// Putting TaskState without first putting the allocation should work
state := structs.NewTaskState()
state.Failed = true // set a non-default value
require.NoError(db.PutTaskState("allocid", "taskname", state))
// Getting should return the available state
ls, ts, err = db.GetTaskRunnerState("allocid", "taskname")
require.NoError(err)
require.Nil(ls)
require.Equal(state, ts)
// Deleting a nonexistent task should not error
require.NoError(db.DeleteTaskBucket("adsf", "asdf"))
require.NoError(db.DeleteTaskBucket("asllocid", "asdf"))
// Data should be untouched
ls, ts, err = db.GetTaskRunnerState("allocid", "taskname")
require.NoError(err)
require.Nil(ls)
require.Equal(state, ts)
// Deleting the task should remove the state
require.NoError(db.DeleteTaskBucket("allocid", "taskname"))
ls, ts, err = db.GetTaskRunnerState("allocid", "taskname")
require.NoError(err)
require.Nil(ls)
require.Nil(ts)
// Putting LocalState should work just like TaskState
origLocalState := trstate.NewLocalState()
require.NoError(db.PutTaskRunnerLocalState("allocid", "taskname", origLocalState))
ls, ts, err = db.GetTaskRunnerState("allocid", "taskname")
require.NoError(err)
require.Equal(origLocalState, ls)
require.Nil(ts)
})
}

View File

@@ -7,12 +7,41 @@ import (
// StateDB implementations store and load Nomad client state.
type StateDB interface {
// Name of implementation.
Name() string
// GetAllAllocations returns all valid allocations and a map of
// allocation IDs to retrieval errors.
//
// If a single error is returned then both allocations and the map will be nil.
GetAllAllocations() ([]*structs.Allocation, map[string]error, error)
// PulAllocation stores an allocation or returns an error if it could
// not be stored.
PutAllocation(*structs.Allocation) error
// GetTaskRunnerState returns the LocalState and TaskState for a
// TaskRunner. Either state may be nil if it is not found, but if an
// error is encountered only the error will be non-nil.
GetTaskRunnerState(allocID, taskName string) (*state.LocalState, *structs.TaskState, error)
PutTaskRunnerLocalState(allocID, taskName string, val interface{}) error
// PutTaskRunnerLocalTask stores the LocalState for a TaskRunner or
// returns an error.
PutTaskRunnerLocalState(allocID, taskName string, val *state.LocalState) error
// PutTaskState stores the TaskState for a TaskRunner or returns an
// error.
PutTaskState(allocID, taskName string, state *structs.TaskState) error
// DeleteTaskBucket deletes a task's state bucket if it exists. No
// error is returned if it does not exist.
DeleteTaskBucket(allocID, taskName string) error
// DeleteAllocationBucket deletes an allocation's state bucket if it
// exists. No error is returned if it does not exist.
DeleteAllocationBucket(allocID string) error
// Close the database. Unsafe for further use after calling regardless
// of return value.
Close() error
}

144
client/state/memdb.go Normal file
View File

@@ -0,0 +1,144 @@
package state
import (
"sync"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
"github.com/hashicorp/nomad/nomad/structs"
)
// MemDB implements a StateDB that stores data in memory and should only be
// used for testing. All methods are safe for concurrent use.
type MemDB struct {
// alloc_id -> value
allocs map[string]*structs.Allocation
// alloc_id -> task_name -> value
localTaskState map[string]map[string]*state.LocalState
taskState map[string]map[string]*structs.TaskState
mu sync.RWMutex
}
func NewMemDB() *MemDB {
return &MemDB{
allocs: make(map[string]*structs.Allocation),
localTaskState: make(map[string]map[string]*state.LocalState),
taskState: make(map[string]map[string]*structs.TaskState),
}
}
func (m *MemDB) Name() string {
return "memdb"
}
func (m *MemDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) {
m.mu.RLock()
defer m.mu.RUnlock()
allocs := make([]*structs.Allocation, 0, len(m.allocs))
for _, v := range m.allocs {
allocs = append(allocs, v)
}
return allocs, map[string]error{}, nil
}
func (m *MemDB) PutAllocation(alloc *structs.Allocation) error {
m.mu.Lock()
defer m.mu.Unlock()
m.allocs[alloc.ID] = alloc
return nil
}
func (m *MemDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) {
m.mu.RLock()
defer m.mu.RUnlock()
var ls *state.LocalState
var ts *structs.TaskState
// Local Task State
allocLocalTS := m.localTaskState[allocID]
if len(allocLocalTS) != 0 {
ls = allocLocalTS[taskName]
}
// Task State
allocTS := m.taskState[allocID]
if len(allocTS) != 0 {
ts = allocTS[taskName]
}
return ls, ts, nil
}
func (m *MemDB) PutTaskRunnerLocalState(allocID string, taskName string, val *state.LocalState) error {
m.mu.Lock()
defer m.mu.Unlock()
if alts, ok := m.localTaskState[allocID]; ok {
alts[taskName] = val.Copy()
return nil
}
m.localTaskState[allocID] = map[string]*state.LocalState{
taskName: val.Copy(),
}
return nil
}
func (m *MemDB) PutTaskState(allocID string, taskName string, state *structs.TaskState) error {
m.mu.Lock()
defer m.mu.Unlock()
if ats, ok := m.taskState[allocID]; ok {
ats[taskName] = state.Copy()
return nil
}
m.taskState[allocID] = map[string]*structs.TaskState{
taskName: state.Copy(),
}
return nil
}
func (m *MemDB) DeleteTaskBucket(allocID, taskName string) error {
m.mu.Lock()
defer m.mu.Unlock()
if ats, ok := m.taskState[allocID]; ok {
delete(ats, taskName)
}
if alts, ok := m.localTaskState[allocID]; ok {
delete(alts, taskName)
}
return nil
}
func (m *MemDB) DeleteAllocationBucket(allocID string) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.allocs, allocID)
delete(m.taskState, allocID)
delete(m.localTaskState, allocID)
return nil
}
func (m *MemDB) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
// Set everything to nil to blow up on further use
m.allocs = nil
m.taskState = nil
m.localTaskState = nil
return nil
}

View File

@@ -8,6 +8,10 @@ import (
// NoopDB implements a StateDB that does not persist any data.
type NoopDB struct{}
func (n NoopDB) Name() string {
return "noopdb"
}
func (n NoopDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) {
return nil, nil, nil
}
@@ -20,7 +24,7 @@ func (n NoopDB) GetTaskRunnerState(allocID string, taskName string) (*state.Loca
return nil, nil, nil
}
func (n NoopDB) PutTaskRunnerLocalState(allocID string, taskName string, val interface{}) error {
func (n NoopDB) PutTaskRunnerLocalState(allocID string, taskName string, val *state.LocalState) error {
return nil
}

View File

@@ -21,9 +21,9 @@ allocations/ (bucket)
*/
var (
// allocationsBucket is the bucket name containing all allocation related
// allocationsBucketName is the bucket name containing all allocation related
// data
allocationsBucket = []byte("allocations")
allocationsBucketName = []byte("allocations")
// allocKey is the key serialized Allocations are stored under
allocKey = []byte("alloc")
@@ -74,6 +74,10 @@ func NewBoltStateDB(stateDir string) (StateDB, error) {
return sdb, nil
}
func (s *BoltStateDB) Name() string {
return "boltdb"
}
// GetAllAllocations gets all allocations persisted by this client and returns
// a map of alloc ids to errors for any allocations that could not be restored.
//
@@ -101,15 +105,15 @@ type allocEntry struct {
}
func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, map[string]error) {
allocationsBkt := tx.Bucket(allocationsBucket)
allocs := []*structs.Allocation{}
errs := map[string]error{}
allocationsBkt := tx.Bucket(allocationsBucketName)
if allocationsBkt == nil {
// No allocs
return nil, nil
return allocs, errs
}
var allocs []*structs.Allocation
errs := map[string]error{}
// Create a cursor for iteration.
c := allocationsBkt.BoltBucket().Cursor()
@@ -138,7 +142,7 @@ func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, m
func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error {
return s.db.Update(func(tx *boltdd.Tx) error {
// Retrieve the root allocations bucket
allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucket)
allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucketName)
if err != nil {
return err
}
@@ -157,25 +161,53 @@ func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error {
})
}
// GetTaskRunnerState restores TaskRunner specific state.
// GetTaskRunnerState returns the LocalState and TaskState for a
// TaskRunner. LocalState or TaskState will be nil if they do not exist.
//
// If an error is encountered both LocalState and TaskState will be nil.
func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.LocalState, *structs.TaskState, error) {
var ls trstate.LocalState
var ts structs.TaskState
var ls *trstate.LocalState
var ts *structs.TaskState
err := s.db.View(func(tx *boltdd.Tx) error {
bkt, err := getTaskBucket(tx, allocID, taskName)
if err != nil {
return fmt.Errorf("failed to get task %q bucket: %v", taskName, err)
allAllocsBkt := tx.Bucket(allocationsBucketName)
if allAllocsBkt == nil {
// No state, return
return nil
}
// Restore Local State
if err := bkt.Get(taskLocalStateKey, &ls); err != nil {
return fmt.Errorf("failed to read local task runner state: %v", err)
allocBkt := allAllocsBkt.Bucket([]byte(allocID))
if allocBkt == nil {
// No state for alloc, return
return nil
}
// Restore Task State
if err := bkt.Get(taskStateKey, &ts); err != nil {
return fmt.Errorf("failed to read task state: %v", err)
taskBkt := allocBkt.Bucket([]byte(taskName))
if taskBkt == nil {
// No state for task, return
return nil
}
// Restore Local State if it exists
ls = &trstate.LocalState{}
if err := taskBkt.Get(taskLocalStateKey, ls); err != nil {
if !boltdd.IsErrNotFound(err) {
return fmt.Errorf("failed to read local task runner state: %v", err)
}
// Key not found, reset ls to nil
ls = nil
}
// Restore Task State if it exists
ts = &structs.TaskState{}
if err := taskBkt.Get(taskStateKey, ts); err != nil {
if !boltdd.IsErrNotFound(err) {
return fmt.Errorf("failed to read task state: %v", err)
}
// Key not found, reset ts to nil
ts = nil
}
return nil
@@ -185,12 +217,11 @@ func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.Loc
return nil, nil, err
}
return &ls, &ts, nil
return ls, ts, nil
}
// PutTaskRunnerLocalState stores TaskRunner's LocalState or returns an error.
func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val interface{}) error {
func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val *trstate.LocalState) error {
return s.db.Update(func(tx *boltdd.Tx) error {
taskBkt, err := getTaskBucket(tx, allocID, taskName)
if err != nil {
@@ -221,7 +252,7 @@ func (s *BoltStateDB) PutTaskState(allocID, taskName string, state *structs.Task
func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error {
return s.db.Update(func(tx *boltdd.Tx) error {
// Retrieve the root allocations bucket
allocations := tx.Bucket(allocationsBucket)
allocations := tx.Bucket(allocationsBucketName)
if allocations == nil {
return nil
}
@@ -242,7 +273,7 @@ func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error {
func (s *BoltStateDB) DeleteAllocationBucket(allocID string) error {
return s.db.Update(func(tx *boltdd.Tx) error {
// Retrieve the root allocations bucket
allocations := tx.Bucket(allocationsBucket)
allocations := tx.Bucket(allocationsBucketName)
if allocations == nil {
return nil
}
@@ -267,13 +298,13 @@ func getAllocationBucket(tx *boltdd.Tx, allocID string) (*boltdd.Bucket, error)
w := tx.Writable()
// Retrieve the root allocations bucket
allocations := tx.Bucket(allocationsBucket)
allocations := tx.Bucket(allocationsBucketName)
if allocations == nil {
if !w {
return nil, fmt.Errorf("Allocations bucket doesn't exist and transaction is not writable")
}
allocations, err = tx.CreateBucketIfNotExists(allocationsBucket)
allocations, err = tx.CreateBucketIfNotExists(allocationsBucketName)
if err != nil {
return nil, err
}

View File

@@ -11,9 +11,14 @@ import (
"github.com/mitchellh/go-testing-interface"
)
// TestClient creates an in-memory client for testing purposes.
func TestClient(t testing.T, cb func(c *config.Config)) *Client {
conf := config.TestClientConfig()
// TestClient creates an in-memory client for testing purposes and returns a
// cleanup func to shutdown the client and remove the alloc and state dirs.
//
// There is no need to override the AllocDir or StateDir as they are randomized
// and removed in the returned cleanup function. If they are overridden in the
// callback then the caller still must run the returned cleanup func.
func TestClient(t testing.T, cb func(c *config.Config)) (*Client, func()) {
conf, cleanup := config.TestClientConfig(t)
// Tighten the fingerprinter timeouts (must be done in client package
// to avoid circular dependencies)
@@ -38,7 +43,14 @@ func TestClient(t testing.T, cb func(c *config.Config)) *Client {
mockService := consulApi.NewMockConsulServiceClient(t, logger)
client, err := NewClient(conf, catalog, mockService)
if err != nil {
cleanup()
t.Fatalf("err: %v", err)
}
return client
return client, func() {
// Shutdown client
client.Shutdown()
// Call TestClientConfig cleanup
cleanup()
}
}

View File

@@ -27,8 +27,8 @@ type mockUpdater struct {
logger log.Logger
}
func (m *mockUpdater) TaskStateUpdated(task string, state *structs.TaskState) {
m.logger.Named("test.updater").Debug("update", "task", task, "state", state)
func (m *mockUpdater) TaskStateUpdated() {
m.logger.Named("mock.updater").Debug("Update!")
}
// TODO Fix

View File

@@ -14,6 +14,29 @@ import (
"golang.org/x/crypto/blake2b"
)
// ErrNotFound is returned when a key is not found.
type ErrNotFound struct {
name string
}
func (e *ErrNotFound) Error() string {
return fmt.Sprintf("key not found: %s", e.name)
}
// NotFound returns a new error for a key that was not found.
func NotFound(name string) error {
return &ErrNotFound{name}
}
// IsErrNotFound returns true if the error is an ErrNotFound error.
func IsErrNotFound(e error) bool {
if e == nil {
return false
}
_, ok := e.(*ErrNotFound)
return ok
}
// DB wraps an underlying bolt.DB to create write deduplicating buckets and
// msgpack encoded values.
type DB struct {
@@ -277,12 +300,13 @@ func (b *Bucket) Put(key []byte, val interface{}) error {
}
// Get value by key from boltdb or return an error if key not found.
// Get value by key from boltdb or return an ErrNotFound error if key not
// found.
func (b *Bucket) Get(key []byte, obj interface{}) error {
// Get the raw data from the underlying boltdb
data := b.boltBucket.Get(key)
if data == nil {
return fmt.Errorf("no data at key %v", string(key))
return NotFound(string(key))
}
// Deserialize the object
@@ -342,11 +366,15 @@ func (b *Bucket) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
return newBucket(bmeta, bb), nil
}
// DeleteBucket deletes a child bucket. Returns an error if the bucket does not
// exist or corresponds to a non-bucket key.
// DeleteBucket deletes a child bucket. Returns an error if the bucket
// corresponds to a non-bucket key or another error is encountered. No error is
// returned if the bucket does not exist.
func (b *Bucket) DeleteBucket(name []byte) error {
// Delete the bucket from the underlying boltdb
err := b.boltBucket.DeleteBucket(name)
if err == bolt.ErrBucketNotFound {
err = nil
}
// Remove reference to child bucket
b.bm.deleteBucket(name)

View File

@@ -232,7 +232,9 @@ func TestBucket_Delete(t *testing.T) {
require.NoError(err)
var v []byte
require.Error(child.Get(childKey, &v))
err = child.Get(childKey, &v)
require.Error(err)
require.True(IsErrNotFound(err))
require.Equal(([]byte)(nil), v)
require.Nil(child.Bucket(grandchildName1))

View File

@@ -3,6 +3,7 @@ package nomad
import (
"fmt"
"testing"
"time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
@@ -26,10 +27,10 @@ func TestClientAllocations_GarbageCollectAll_Local(t *testing.T) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
testutil.WaitForResult(func() (bool, error) {
nodes := s.connectedNodes()
@@ -188,11 +189,11 @@ func TestClientAllocations_GarbageCollectAll_Remote(t *testing.T) {
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
c.GCDiskUsageThreshold = 100.0
})
defer c.Shutdown()
defer cleanup()
testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
@@ -268,11 +269,11 @@ func TestClientAllocations_GarbageCollect_Local(t *testing.T) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
c.GCDiskUsageThreshold = 100.0
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
a := mock.Alloc()
@@ -283,7 +284,7 @@ func TestClientAllocations_GarbageCollect_Local(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"run_for": 2 * time.Second,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
@@ -417,11 +418,11 @@ func TestClientAllocations_GarbageCollect_Remote(t *testing.T) {
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
c.GCDiskUsageThreshold = 100.0
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
a := mock.Alloc()
@@ -432,7 +433,7 @@ func TestClientAllocations_GarbageCollect_Remote(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"run_for": 2 * time.Second,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
@@ -539,10 +540,10 @@ func TestClientAllocations_Stats_Local(t *testing.T) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
a := mock.Alloc()
@@ -553,7 +554,7 @@ func TestClientAllocations_Stats_Local(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"run_for": 2 * time.Second,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
@@ -688,10 +689,10 @@ func TestClientAllocations_Stats_Remote(t *testing.T) {
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
a := mock.Alloc()
@@ -702,7 +703,7 @@ func TestClientAllocations_Stats_Remote(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"run_for": 2 * time.Second,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{

View File

@@ -31,10 +31,10 @@ func TestClientFS_List_Local(t *testing.T) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
a := mock.Alloc()
@@ -45,7 +45,7 @@ func TestClientFS_List_Local(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"run_for": 2 * time.Second,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
@@ -183,10 +183,10 @@ func TestClientFS_List_Remote(t *testing.T) {
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
a := mock.Alloc()
@@ -197,7 +197,7 @@ func TestClientFS_List_Remote(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"run_for": 2 * time.Second,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
@@ -300,10 +300,10 @@ func TestClientFS_Stat_Local(t *testing.T) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
a := mock.Alloc()
@@ -314,7 +314,7 @@ func TestClientFS_Stat_Local(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"run_for": 2 * time.Second,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
@@ -452,10 +452,10 @@ func TestClientFS_Stat_Remote(t *testing.T) {
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
a := mock.Alloc()
@@ -466,7 +466,7 @@ func TestClientFS_Stat_Remote(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"run_for": 2 * time.Second,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
@@ -719,10 +719,10 @@ func TestClientFS_Streaming_Local(t *testing.T) {
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
expected := "Hello from the other side"
@@ -734,7 +734,7 @@ func TestClientFS_Streaming_Local(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"run_for": 2 * time.Second,
"stdout_string": expected,
},
LogConfig: structs.DefaultLogConfig(),
@@ -851,10 +851,10 @@ func TestClientFS_Streaming_Local_Follow(t *testing.T) {
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
expectedBase := "Hello from the other side"
@@ -868,7 +868,7 @@ func TestClientFS_Streaming_Local_Follow(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "20s",
"run_for": 2 * time.Second,
"stdout_string": expectedBase,
"stdout_repeat": repeat,
"stdout_repeat_duration": 200 * time.Millisecond,
@@ -995,10 +995,10 @@ func TestClientFS_Streaming_Remote_Server(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
expected := "Hello from the other side"
@@ -1010,7 +1010,7 @@ func TestClientFS_Streaming_Remote_Server(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"run_for": 2 * time.Second,
"stdout_string": expected,
},
LogConfig: structs.DefaultLogConfig(),
@@ -1141,11 +1141,11 @@ func TestClientFS_Streaming_Remote_Region(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
c.Region = "two"
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
expected := "Hello from the other side"
@@ -1157,7 +1157,7 @@ func TestClientFS_Streaming_Remote_Region(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"run_for": 2 * time.Second,
"stdout_string": expected,
},
LogConfig: structs.DefaultLogConfig(),
@@ -1541,10 +1541,10 @@ func TestClientFS_Logs_Local(t *testing.T) {
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
expected := "Hello from the other side"
@@ -1556,7 +1556,7 @@ func TestClientFS_Logs_Local(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"run_for": 2 * time.Second,
"stdout_string": expected,
},
LogConfig: structs.DefaultLogConfig(),
@@ -1674,10 +1674,10 @@ func TestClientFS_Logs_Local_Follow(t *testing.T) {
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
expectedBase := "Hello from the other side"
@@ -1691,7 +1691,7 @@ func TestClientFS_Logs_Local_Follow(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "20s",
"run_for": 20 * time.Second,
"stdout_string": expectedBase,
"stdout_repeat": repeat,
"stdout_repeat_duration": 200 * time.Millisecond,
@@ -1819,10 +1819,10 @@ func TestClientFS_Logs_Remote_Server(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
expected := "Hello from the other side"
@@ -1834,7 +1834,7 @@ func TestClientFS_Logs_Remote_Server(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"run_for": 2 * time.Second,
"stdout_string": expected,
},
LogConfig: structs.DefaultLogConfig(),
@@ -1966,11 +1966,11 @@ func TestClientFS_Logs_Remote_Region(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
c.Region = "two"
})
defer c.Shutdown()
defer cleanup()
// Force an allocation onto the node
expected := "Hello from the other side"
@@ -1982,7 +1982,7 @@ func TestClientFS_Logs_Remote_Region(t *testing.T) {
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
"run_for": 2 * time.Second,
"stdout_string": expected,
},
LogConfig: structs.DefaultLogConfig(),

View File

@@ -264,10 +264,10 @@ func TestNodeStreamingRpc_badEndpoint(t *testing.T) {
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s1.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {

View File

@@ -25,10 +25,10 @@ func TestClientStats_Stats_Local(t *testing.T) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
testutil.WaitForResult(func() (bool, error) {
nodes := s.connectedNodes()
@@ -183,10 +183,10 @@ func TestClientStats_Stats_Remote(t *testing.T) {
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)
c := client.TestClient(t, func(c *config.Config) {
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
})
defer c.Shutdown()
defer cleanup()
testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()

View File

@@ -870,7 +870,6 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
// Test that transitions to force drain work.
func TestDrainer_Batch_TransitionToForce(t *testing.T) {
t.Parallel()
require := require.New(t)
for _, inf := range []bool{true, false} {
name := "Infinite"
@@ -878,6 +877,7 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) {
name = "Deadline"
}
t.Run(name, func(t *testing.T) {
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
@@ -948,12 +948,12 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) {
// Make sure the batch job isn't affected
testutil.AssertUntil(500*time.Millisecond, func() (bool, error) {
if err := checkAllocPromoter(errCh); err != nil {
return false, err
return false, fmt.Errorf("check alloc promoter error: %v", err)
}
allocs, err := state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
return false, fmt.Errorf("AllocsByNode error: %v", err)
}
for _, alloc := range allocs {
if alloc.DesiredStatus != structs.AllocDesiredStatusRun {

View File

@@ -264,7 +264,7 @@ func BatchJob() *structs.Job {
Name: "worker",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "500ms",
"run_for": 500 * time.Millisecond,
},
Env: map[string]string{
"FOO": "bar",

View File

@@ -5573,6 +5573,21 @@ type TaskState struct {
Events []*TaskEvent
}
// NewTaskState returns a TaskState initialized in the Pending state.
func NewTaskState() *TaskState {
return &TaskState{
State: TaskStatePending,
}
}
// Canonicalize ensures the TaskState has a State set. It should default to
// Pending.
func (ts *TaskState) Canonicalize() {
if ts.State == "" {
ts.State = TaskStatePending
}
}
func (ts *TaskState) Copy() *TaskState {
if ts == nil {
return nil

View File

@@ -29,6 +29,10 @@ func (h *TaskHandle) GetDriverState(v interface{}) error {
}
func (h *TaskHandle) Copy() *TaskHandle {
if h == nil {
return nil
}
handle := new(TaskHandle)
*handle = *h
handle.Config = h.Config.Copy()