mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Refactor - wip
This commit is contained in:
@@ -1,10 +1,9 @@
|
||||
package client
|
||||
package allocrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -13,7 +12,10 @@ import (
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
@@ -60,7 +62,7 @@ type AllocRunner struct {
|
||||
allocDir *allocdir.AllocDir
|
||||
allocDirLock sync.Mutex
|
||||
|
||||
tasks map[string]*TaskRunner
|
||||
tasks map[string]*taskrunner.TaskRunner
|
||||
taskStates map[string]*structs.TaskState
|
||||
restored map[string]struct{}
|
||||
taskLock sync.RWMutex
|
||||
@@ -70,7 +72,7 @@ type AllocRunner struct {
|
||||
updateCh chan *structs.Allocation
|
||||
|
||||
vaultClient vaultclient.VaultClient
|
||||
consulClient ConsulServiceAPI
|
||||
consulClient consulApi.ConsulServiceAPI
|
||||
|
||||
// prevAlloc allows for Waiting until a previous allocation exits and
|
||||
// the migrates it data. If sticky volumes aren't used and there's no
|
||||
@@ -154,7 +156,7 @@ type allocRunnerMutableState struct {
|
||||
|
||||
// NewAllocRunner is used to create a new allocation context
|
||||
func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater,
|
||||
alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI,
|
||||
alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient consulApi.ConsulServiceAPI,
|
||||
prevAlloc prevAllocWatcher) *AllocRunner {
|
||||
|
||||
ar := &AllocRunner{
|
||||
@@ -168,7 +170,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
|
||||
prevAlloc: prevAlloc,
|
||||
dirtyCh: make(chan struct{}, 1),
|
||||
allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)),
|
||||
tasks: make(map[string]*TaskRunner),
|
||||
tasks: make(map[string]*taskrunner.TaskRunner),
|
||||
taskStates: copyTaskStates(alloc.TaskStates),
|
||||
restored: make(map[string]struct{}),
|
||||
updateCh: make(chan *structs.Allocation, 64),
|
||||
@@ -220,82 +222,44 @@ func (r *AllocRunner) pre060StateFilePath() string {
|
||||
|
||||
// RestoreState is used to restore the state of the alloc runner
|
||||
func (r *AllocRunner) RestoreState() error {
|
||||
|
||||
// COMPAT: Remove in 0.7.0
|
||||
// Check if the old snapshot is there
|
||||
oldPath := r.pre060StateFilePath()
|
||||
var snap allocRunnerState
|
||||
var upgrading bool
|
||||
if err := pre060RestoreState(oldPath, &snap); err == nil {
|
||||
// Restore fields
|
||||
r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.allocID)
|
||||
r.alloc = snap.Alloc
|
||||
r.allocDir = snap.AllocDir
|
||||
r.allocClientStatus = snap.AllocClientStatus
|
||||
r.allocClientDescription = snap.AllocClientDescription
|
||||
|
||||
if r.alloc != nil {
|
||||
r.taskStates = snap.Alloc.TaskStates
|
||||
}
|
||||
|
||||
// COMPAT: Remove in 0.7.0
|
||||
// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
|
||||
// Context struct to new AllocDir struct
|
||||
if snap.AllocDir == nil && snap.Context != nil {
|
||||
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.allocID)
|
||||
r.allocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir)
|
||||
for taskName := range snap.Context.AllocDir.TaskDirs {
|
||||
r.allocDir.NewTaskDir(taskName)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete the old state
|
||||
os.RemoveAll(oldPath)
|
||||
upgrading = true
|
||||
} else if !os.IsNotExist(err) {
|
||||
// Something corrupt in the old state file
|
||||
return err
|
||||
} else {
|
||||
// We are doing a normal restore
|
||||
err := r.stateDB.View(func(tx *bolt.Tx) error {
|
||||
bkt, err := getAllocationBucket(tx, r.allocID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get allocation bucket: %v", err)
|
||||
}
|
||||
|
||||
// Get the state objects
|
||||
var mutable allocRunnerMutableState
|
||||
var immutable allocRunnerImmutableState
|
||||
var allocState allocRunnerAllocState
|
||||
var allocDir allocdir.AllocDir
|
||||
|
||||
if err := getObject(bkt, allocRunnerStateAllocKey, &allocState); err != nil {
|
||||
return fmt.Errorf("failed to read alloc runner alloc state: %v", err)
|
||||
}
|
||||
if err := getObject(bkt, allocRunnerStateImmutableKey, &immutable); err != nil {
|
||||
return fmt.Errorf("failed to read alloc runner immutable state: %v", err)
|
||||
}
|
||||
if err := getObject(bkt, allocRunnerStateMutableKey, &mutable); err != nil {
|
||||
return fmt.Errorf("failed to read alloc runner mutable state: %v", err)
|
||||
}
|
||||
if err := getObject(bkt, allocRunnerStateAllocDirKey, &allocDir); err != nil {
|
||||
return fmt.Errorf("failed to read alloc runner alloc_dir state: %v", err)
|
||||
}
|
||||
|
||||
// Populate the fields
|
||||
r.alloc = allocState.Alloc
|
||||
r.allocDir = &allocDir
|
||||
r.allocClientStatus = mutable.AllocClientStatus
|
||||
r.allocClientDescription = mutable.AllocClientDescription
|
||||
r.taskStates = mutable.TaskStates
|
||||
r.alloc.ClientStatus = getClientStatus(r.taskStates)
|
||||
r.alloc.DeploymentStatus = mutable.DeploymentStatus
|
||||
return nil
|
||||
})
|
||||
|
||||
err := r.stateDB.View(func(tx *bolt.Tx) error {
|
||||
bkt, err := state.GetAllocationBucket(tx, r.allocID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read allocation state: %v", err)
|
||||
return fmt.Errorf("failed to get allocation bucket: %v", err)
|
||||
}
|
||||
|
||||
// Get the state objects
|
||||
var mutable allocRunnerMutableState
|
||||
var immutable allocRunnerImmutableState
|
||||
var allocState allocRunnerAllocState
|
||||
var allocDir allocdir.AllocDir
|
||||
|
||||
if err := state.GetObject(bkt, allocRunnerStateAllocKey, &allocState); err != nil {
|
||||
return fmt.Errorf("failed to read alloc runner alloc state: %v", err)
|
||||
}
|
||||
if err := state.GetObject(bkt, allocRunnerStateImmutableKey, &immutable); err != nil {
|
||||
return fmt.Errorf("failed to read alloc runner immutable state: %v", err)
|
||||
}
|
||||
if err := state.GetObject(bkt, allocRunnerStateMutableKey, &mutable); err != nil {
|
||||
return fmt.Errorf("failed to read alloc runner mutable state: %v", err)
|
||||
}
|
||||
if err := state.GetObject(bkt, allocRunnerStateAllocDirKey, &allocDir); err != nil {
|
||||
return fmt.Errorf("failed to read alloc runner alloc_dir state: %v", err)
|
||||
}
|
||||
|
||||
// Populate the fields
|
||||
r.alloc = allocState.Alloc
|
||||
r.allocDir = &allocDir
|
||||
r.allocClientStatus = mutable.AllocClientStatus
|
||||
r.allocClientDescription = mutable.AllocClientDescription
|
||||
r.taskStates = mutable.TaskStates
|
||||
r.alloc.ClientStatus = getClientStatus(r.taskStates)
|
||||
r.alloc.DeploymentStatus = mutable.DeploymentStatus
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read allocation state: %v", err)
|
||||
}
|
||||
|
||||
var snapshotErrors multierror.Error
|
||||
@@ -344,7 +308,7 @@ func (r *AllocRunner) RestoreState() error {
|
||||
continue
|
||||
}
|
||||
|
||||
tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
|
||||
tr := taskrunner.NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
|
||||
r.tasks[name] = tr
|
||||
|
||||
if restartReason, err := tr.RestoreState(); err != nil {
|
||||
@@ -354,12 +318,6 @@ func (r *AllocRunner) RestoreState() error {
|
||||
// Only start if the alloc isn't in a terminal status.
|
||||
go tr.Run()
|
||||
|
||||
if upgrading {
|
||||
if err := tr.SaveState(); err != nil {
|
||||
r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.allocID, name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Restart task runner if RestoreState gave a reason
|
||||
if restartReason != "" {
|
||||
r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.allocID, name, restartReason)
|
||||
@@ -389,7 +347,7 @@ func (r *AllocRunner) SaveState() error {
|
||||
for _, tr := range runners {
|
||||
if err := tr.SaveState(); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("failed to save state for alloc %s task %q: %v",
|
||||
r.allocID, tr.task.Name, err))
|
||||
r.allocID, tr.Name(), err))
|
||||
}
|
||||
}
|
||||
return mErr.ErrorOrNil()
|
||||
@@ -419,7 +377,7 @@ func (r *AllocRunner) saveAllocRunnerState() error {
|
||||
return r.stateDB.Batch(func(tx *bolt.Tx) error {
|
||||
|
||||
// Grab the allocation bucket
|
||||
allocBkt, err := getAllocationBucket(tx, r.allocID)
|
||||
allocBkt, err := state.GetAllocationBucket(tx, r.allocID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
|
||||
}
|
||||
@@ -433,7 +391,7 @@ func (r *AllocRunner) saveAllocRunnerState() error {
|
||||
Alloc: alloc,
|
||||
}
|
||||
|
||||
if err := putObject(allocBkt, allocRunnerStateAllocKey, &allocState); err != nil {
|
||||
if err := state.PutObject(allocBkt, allocRunnerStateAllocKey, &allocState); err != nil {
|
||||
return fmt.Errorf("failed to write alloc_runner alloc state: %v", err)
|
||||
}
|
||||
|
||||
@@ -450,7 +408,7 @@ func (r *AllocRunner) saveAllocRunnerState() error {
|
||||
Version: r.config.Version.VersionNumber(),
|
||||
}
|
||||
|
||||
if err := putObject(allocBkt, allocRunnerStateImmutableKey, &immutable); err != nil {
|
||||
if err := state.PutObject(allocBkt, allocRunnerStateImmutableKey, &immutable); err != nil {
|
||||
return fmt.Errorf("failed to write alloc_runner immutable state: %v", err)
|
||||
}
|
||||
|
||||
@@ -461,7 +419,7 @@ func (r *AllocRunner) saveAllocRunnerState() error {
|
||||
|
||||
// Write the alloc dir data if it hasn't been written before and it exists.
|
||||
if !r.allocDirPersisted && allocDir != nil {
|
||||
if err := putObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil {
|
||||
if err := state.PutObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil {
|
||||
return fmt.Errorf("failed to write alloc_runner allocDir state: %v", err)
|
||||
}
|
||||
|
||||
@@ -478,7 +436,7 @@ func (r *AllocRunner) saveAllocRunnerState() error {
|
||||
DeploymentStatus: alloc.DeploymentStatus,
|
||||
}
|
||||
|
||||
if err := putObject(allocBkt, allocRunnerStateMutableKey, &mutable); err != nil {
|
||||
if err := state.PutObject(allocBkt, allocRunnerStateMutableKey, &mutable); err != nil {
|
||||
return fmt.Errorf("failed to write alloc_runner mutable state: %v", err)
|
||||
}
|
||||
|
||||
@@ -492,7 +450,7 @@ func (r *AllocRunner) DestroyState() error {
|
||||
defer r.allocStateLock.Unlock()
|
||||
|
||||
return r.stateDB.Update(func(tx *bolt.Tx) error {
|
||||
if err := deleteAllocationBucket(tx, r.allocID); err != nil {
|
||||
if err := state.DeleteAllocationBucket(tx, r.allocID); err != nil {
|
||||
return fmt.Errorf("failed to delete allocation bucket: %v", err)
|
||||
}
|
||||
return nil
|
||||
@@ -754,14 +712,14 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
|
||||
|
||||
// Find all tasks that are not the one that is dead and check if the one
|
||||
// that is dead is a leader
|
||||
var otherTaskRunners []*TaskRunner
|
||||
var otherTaskRunners []*taskrunner.TaskRunner
|
||||
var otherTaskNames []string
|
||||
leader := false
|
||||
for task, tr := range r.tasks {
|
||||
if task != taskName {
|
||||
otherTaskRunners = append(otherTaskRunners, tr)
|
||||
otherTaskNames = append(otherTaskNames, task)
|
||||
} else if tr.task.Leader {
|
||||
} else if tr.IsLeader() {
|
||||
leader = true
|
||||
}
|
||||
}
|
||||
@@ -922,7 +880,7 @@ func (r *AllocRunner) Run() {
|
||||
taskdir := r.allocDir.NewTaskDir(task.Name)
|
||||
r.allocDirLock.Unlock()
|
||||
|
||||
tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient)
|
||||
tr := taskrunner.NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient)
|
||||
r.tasks[task.Name] = tr
|
||||
tr.MarkReceived()
|
||||
|
||||
@@ -1118,11 +1076,11 @@ func (r *AllocRunner) StatsReporter() AllocStatsReporter {
|
||||
|
||||
// getTaskRunners is a helper that returns a copy of the task runners list using
|
||||
// the taskLock.
|
||||
func (r *AllocRunner) getTaskRunners() []*TaskRunner {
|
||||
func (r *AllocRunner) getTaskRunners() []*taskrunner.TaskRunner {
|
||||
// Get the task runners
|
||||
r.taskLock.RLock()
|
||||
defer r.taskLock.RUnlock()
|
||||
runners := make([]*TaskRunner, 0, len(r.tasks))
|
||||
runners := make([]*taskrunner.TaskRunner, 0, len(r.tasks))
|
||||
for _, tr := range r.tasks {
|
||||
runners = append(runners, tr)
|
||||
}
|
||||
@@ -1156,7 +1114,7 @@ func (r *AllocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResour
|
||||
for _, tr := range runners {
|
||||
l := tr.LatestResourceUsage()
|
||||
if l != nil {
|
||||
astat.Tasks[tr.task.Name] = l
|
||||
astat.Tasks[tr.Name()] = l
|
||||
flat = append(flat, l)
|
||||
if l.Timestamp > astat.Timestamp {
|
||||
astat.Timestamp = l.Timestamp
|
||||
@@ -1181,9 +1139,9 @@ func sumTaskResourceUsage(usages []*cstructs.TaskResourceUsage) *cstructs.Resour
|
||||
return summed
|
||||
}
|
||||
|
||||
// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and
|
||||
// ShouldUpdate takes the AllocModifyIndex of an allocation sent from the server and
|
||||
// checks if the current running allocation is behind and should be updated.
|
||||
func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool {
|
||||
func (r *AllocRunner) ShouldUpdate(serverIndex uint64) bool {
|
||||
r.allocLock.Lock()
|
||||
defer r.allocLock.Unlock()
|
||||
return r.alloc.AllocModifyIndex < serverIndex
|
||||
@@ -1215,3 +1173,11 @@ func (r *AllocRunner) IsDestroyed() bool {
|
||||
func (r *AllocRunner) WaitCh() <-chan struct{} {
|
||||
return r.waitCh
|
||||
}
|
||||
|
||||
// AllocID returns the allocation ID of the allocation being run
|
||||
func (r *AllocRunner) AllocID() string {
|
||||
if r == nil {
|
||||
return ""
|
||||
}
|
||||
return r.allocID
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package client
|
||||
package allocrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
@@ -160,7 +161,7 @@ type allocHealthTracker struct {
|
||||
allocUpdates *cstructs.AllocListener
|
||||
|
||||
// consulClient is used to look up the state of the task's checks
|
||||
consulClient ConsulServiceAPI
|
||||
consulClient consulApi.ConsulServiceAPI
|
||||
|
||||
// healthy is used to signal whether we have determined the allocation to be
|
||||
// healthy or unhealthy
|
||||
@@ -191,7 +192,7 @@ type allocHealthTracker struct {
|
||||
// alloc listener and consul API object are given so that the watcher can detect
|
||||
// health changes.
|
||||
func newAllocHealthTracker(parentCtx context.Context, logger *log.Logger, alloc *structs.Allocation,
|
||||
allocUpdates *cstructs.AllocListener, consulClient ConsulServiceAPI,
|
||||
allocUpdates *cstructs.AllocListener, consulClient consulApi.ConsulServiceAPI,
|
||||
minHealthyTime time.Duration, useChecks bool) *allocHealthTracker {
|
||||
|
||||
a := &allocHealthTracker{
|
||||
@@ -1,4 +1,4 @@
|
||||
package client
|
||||
package allocrunner
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -6,93 +6,34 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/hashicorp/nomad/version"
|
||||
"github.com/kr/pretty"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type MockAllocStateUpdater struct {
|
||||
Allocs []*structs.Allocation
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// Update fulfills the TaskStateUpdater interface
|
||||
func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) {
|
||||
m.mu.Lock()
|
||||
m.Allocs = append(m.Allocs, alloc)
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// Last returns a copy of the last alloc (or nil) sync'd
|
||||
func (m *MockAllocStateUpdater) Last() *structs.Allocation {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
n := len(m.Allocs)
|
||||
if n == 0 {
|
||||
return nil
|
||||
}
|
||||
return m.Allocs[n-1].Copy()
|
||||
}
|
||||
|
||||
// allocationBucketExists checks if the allocation bucket was created.
|
||||
func allocationBucketExists(tx *bolt.Tx, allocID string) bool {
|
||||
allocations := tx.Bucket(allocationsBucket)
|
||||
if allocations == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Retrieve the specific allocations bucket
|
||||
alloc := allocations.Bucket([]byte(allocID))
|
||||
return alloc != nil
|
||||
}
|
||||
|
||||
func testAllocRunnerFromAlloc(t *testing.T, alloc *structs.Allocation, restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
|
||||
conf := config.DefaultConfig()
|
||||
conf.Node = mock.Node()
|
||||
conf.StateDir = os.TempDir()
|
||||
conf.AllocDir = os.TempDir()
|
||||
tmp, _ := ioutil.TempFile("", "state-db")
|
||||
db, _ := bolt.Open(tmp.Name(), 0600, nil)
|
||||
upd := &MockAllocStateUpdater{}
|
||||
if !restarts {
|
||||
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
|
||||
alloc.Job.Type = structs.JobTypeBatch
|
||||
}
|
||||
vclient := vaultclient.NewMockVaultClient()
|
||||
ar := NewAllocRunner(testlog.Logger(t), conf, db, upd.Update, alloc, vclient, newMockConsulServiceClient(t), noopPrevAlloc{})
|
||||
return upd, ar
|
||||
}
|
||||
|
||||
func testAllocRunner(t *testing.T, restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
|
||||
// Use mock driver
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config["run_for"] = "500ms"
|
||||
return testAllocRunnerFromAlloc(t, alloc, restarts)
|
||||
bucket, err := state.GetAllocationBucket(tx, allocID)
|
||||
return err == nil && bucket != nil
|
||||
}
|
||||
|
||||
func TestAllocRunner_SimpleRun(t *testing.T) {
|
||||
t.Parallel()
|
||||
upd, ar := testAllocRunner(t, false)
|
||||
upd, ar := TestAllocRunner(t, false)
|
||||
go ar.Run()
|
||||
defer ar.Destroy()
|
||||
|
||||
@@ -114,7 +55,7 @@ func TestAllocRunner_SimpleRun(t *testing.T) {
|
||||
func TestAllocRunner_FinishedAtSet(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
_, ar := testAllocRunner(t, false)
|
||||
_, ar := TestAllocRunner(t, false)
|
||||
ar.allocClientStatus = structs.AllocClientStatusFailed
|
||||
alloc := ar.Alloc()
|
||||
taskFinishedAt := make(map[string]time.Time)
|
||||
@@ -136,7 +77,7 @@ func TestAllocRunner_FinishedAtSet(t *testing.T) {
|
||||
func TestAllocRunner_FinishedAtSet_TaskEvents(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
_, ar := testAllocRunner(t, false)
|
||||
_, ar := TestAllocRunner(t, false)
|
||||
ar.taskStates[ar.alloc.Job.TaskGroups[0].Tasks[0].Name] = &structs.TaskState{State: structs.TaskStateDead, Failed: true}
|
||||
|
||||
alloc := ar.Alloc()
|
||||
@@ -161,7 +102,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_BadStart(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
// Ensure the task fails and restarts
|
||||
upd, ar := testAllocRunner(t, true)
|
||||
upd, ar := TestAllocRunner(t, true)
|
||||
|
||||
// Make the task fail
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
@@ -208,7 +149,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Deadline(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Don't restart but force service job type
|
||||
upd, ar := testAllocRunner(t, false)
|
||||
upd, ar := TestAllocRunner(t, false)
|
||||
ar.alloc.Job.Type = structs.JobTypeService
|
||||
|
||||
// Make the task block
|
||||
@@ -268,7 +209,7 @@ func TestAllocRunner_DeploymentHealth_Healthy_NoChecks(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Ensure the task fails and restarts
|
||||
upd, ar := testAllocRunner(t, true)
|
||||
upd, ar := TestAllocRunner(t, true)
|
||||
|
||||
// Make the task run healthy
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
@@ -316,7 +257,7 @@ func TestAllocRunner_DeploymentHealth_Healthy_Checks(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Ensure the task fails and restarts
|
||||
upd, ar := testAllocRunner(t, true)
|
||||
upd, ar := TestAllocRunner(t, true)
|
||||
|
||||
// Make the task fail
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
@@ -347,7 +288,7 @@ func TestAllocRunner_DeploymentHealth_Healthy_Checks(t *testing.T) {
|
||||
|
||||
// Only return the check as healthy after a duration
|
||||
trigger := time.After(500 * time.Millisecond)
|
||||
ar.consulClient.(*mockConsulServiceClient).allocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
|
||||
ar.consulClient.(*consulApi.MockConsulServiceClient).AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
|
||||
select {
|
||||
case <-trigger:
|
||||
return &consul.AllocRegistration{
|
||||
@@ -409,7 +350,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
// Ensure the task fails and restarts
|
||||
upd, ar := testAllocRunner(t, true)
|
||||
upd, ar := TestAllocRunner(t, true)
|
||||
|
||||
// Make the task fail
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
@@ -430,7 +371,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) {
|
||||
}
|
||||
|
||||
// Only return the check as healthy after a duration
|
||||
ar.consulClient.(*mockConsulServiceClient).allocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
|
||||
ar.consulClient.(*consulApi.MockConsulServiceClient).AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
|
||||
return &consul.AllocRegistration{
|
||||
Tasks: map[string]*consul.TaskRegistration{
|
||||
task.Name: {
|
||||
@@ -478,7 +419,7 @@ func TestAllocRunner_DeploymentHealth_Healthy_UpdatedDeployment(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Ensure the task fails and restarts
|
||||
upd, ar := testAllocRunner(t, true)
|
||||
upd, ar := TestAllocRunner(t, true)
|
||||
|
||||
// Make the task run healthy
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
@@ -535,7 +476,7 @@ func TestAllocRunner_DeploymentHealth_Healthy_Migration(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Ensure the task fails and restarts
|
||||
upd, ar := testAllocRunner(t, true)
|
||||
upd, ar := TestAllocRunner(t, true)
|
||||
|
||||
// Make the task run healthy
|
||||
tg := ar.alloc.Job.TaskGroups[0]
|
||||
@@ -588,7 +529,7 @@ func TestAllocRunner_DeploymentHealth_BatchDisabled(t *testing.T) {
|
||||
task := tg.Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config["run_for"] = "5s"
|
||||
upd, ar := testAllocRunnerFromAlloc(t, alloc, false)
|
||||
upd, ar := TestAllocRunnerFromAlloc(t, alloc, false)
|
||||
|
||||
go ar.Run()
|
||||
defer ar.Destroy()
|
||||
@@ -634,7 +575,7 @@ func TestAllocRunner_RetryArtifact(t *testing.T) {
|
||||
}
|
||||
|
||||
alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, badtask)
|
||||
upd, ar := testAllocRunnerFromAlloc(t, alloc, true)
|
||||
upd, ar := TestAllocRunnerFromAlloc(t, alloc, true)
|
||||
go ar.Run()
|
||||
defer ar.Destroy()
|
||||
|
||||
@@ -673,7 +614,7 @@ func TestAllocRunner_RetryArtifact(t *testing.T) {
|
||||
|
||||
func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
|
||||
t.Parallel()
|
||||
upd, ar := testAllocRunner(t, false)
|
||||
upd, ar := TestAllocRunner(t, false)
|
||||
|
||||
// Ensure task takes some time
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
@@ -772,7 +713,7 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
|
||||
|
||||
func TestAllocRunner_Destroy(t *testing.T) {
|
||||
t.Parallel()
|
||||
upd, ar := testAllocRunner(t, false)
|
||||
upd, ar := TestAllocRunner(t, false)
|
||||
|
||||
// Ensure task takes some time
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
@@ -828,7 +769,7 @@ func TestAllocRunner_Destroy(t *testing.T) {
|
||||
|
||||
func TestAllocRunner_Update(t *testing.T) {
|
||||
t.Parallel()
|
||||
_, ar := testAllocRunner(t, false)
|
||||
_, ar := TestAllocRunner(t, false)
|
||||
|
||||
// Deep copy the alloc to avoid races when updating
|
||||
newAlloc := ar.Alloc().Copy()
|
||||
@@ -863,7 +804,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
|
||||
"run_for": "10s",
|
||||
}
|
||||
|
||||
upd, ar := testAllocRunnerFromAlloc(t, alloc, false)
|
||||
upd, ar := TestAllocRunnerFromAlloc(t, alloc, false)
|
||||
go ar.Run()
|
||||
defer ar.Destroy()
|
||||
|
||||
@@ -882,9 +823,9 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
|
||||
}
|
||||
|
||||
// Create a new alloc runner
|
||||
l2 := prefixedTestLogger("----- ar2: ")
|
||||
l2 := testlog.WithPrefix(t, "----- ar2: ")
|
||||
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
|
||||
prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2, "")
|
||||
prevAlloc := NewAllocWatcher(alloc2, ar, nil, ar.config, l2, "")
|
||||
ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update,
|
||||
alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
|
||||
err = ar2.RestoreState()
|
||||
@@ -931,8 +872,8 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
|
||||
|
||||
func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
|
||||
t.Parallel()
|
||||
upd, ar := testAllocRunner(t, false)
|
||||
ar.logger = prefixedTestLogger("ar1: ")
|
||||
upd, ar := TestAllocRunner(t, false)
|
||||
ar.logger = testlog.WithPrefix(t, "ar1: ")
|
||||
|
||||
// Ensure task takes some time
|
||||
ar.alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
|
||||
@@ -977,9 +918,9 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
|
||||
defer ar.allocLock.Unlock()
|
||||
|
||||
// Create a new alloc runner
|
||||
l2 := prefixedTestLogger("ar2: ")
|
||||
l2 := testlog.WithPrefix(t, "ar2: ")
|
||||
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
|
||||
prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2, "")
|
||||
prevAlloc := NewAllocWatcher(alloc2, ar, nil, ar.config, l2, "")
|
||||
ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update,
|
||||
alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
|
||||
err = ar2.RestoreState()
|
||||
@@ -1052,230 +993,9 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestAllocRunner_SaveRestoreState_Upgrade asserts that pre-0.6 exec tasks are
|
||||
// restarted on upgrade.
|
||||
func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {
|
||||
t.Parallel()
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "10s",
|
||||
}
|
||||
|
||||
upd, ar := testAllocRunnerFromAlloc(t, alloc, false)
|
||||
// Hack in old version to cause an upgrade on RestoreState
|
||||
origConfig := ar.config.Copy()
|
||||
ar.config.Version = &version.VersionInfo{Version: "0.5.6"}
|
||||
go ar.Run()
|
||||
defer ar.Destroy()
|
||||
|
||||
// Snapshot state
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
last := upd.Last()
|
||||
if last == nil {
|
||||
return false, fmt.Errorf("No updates")
|
||||
}
|
||||
|
||||
if last.ClientStatus != structs.AllocClientStatusRunning {
|
||||
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("task never started: %v", err)
|
||||
})
|
||||
|
||||
err := ar.SaveState()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create a new alloc runner
|
||||
l2 := prefixedTestLogger("ar2: ")
|
||||
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
|
||||
prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2, "")
|
||||
ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
|
||||
err = ar2.RestoreState()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
go ar2.Run()
|
||||
defer ar2.Destroy() // Just-in-case of failure before Destroy below
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
last := upd.Last()
|
||||
if last == nil {
|
||||
return false, fmt.Errorf("No updates")
|
||||
}
|
||||
for _, ev := range last.TaskStates["web"].Events {
|
||||
if strings.HasSuffix(ev.RestartReason, pre06ScriptCheckReason) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, fmt.Errorf("no restart with proper reason found")
|
||||
}, func(err error) {
|
||||
last := upd.Last()
|
||||
t.Fatalf("err: %v\nweb state: % #v", err, pretty.Formatter(last.TaskStates["web"]))
|
||||
})
|
||||
|
||||
// Destroy and wait
|
||||
ar2.Destroy()
|
||||
start := time.Now()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
alloc := ar2.Alloc()
|
||||
if alloc.ClientStatus != structs.AllocClientStatusComplete {
|
||||
return false, fmt.Errorf("Bad client status; got %v; want %v", alloc.ClientStatus, structs.AllocClientStatusComplete)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
last := upd.Last()
|
||||
t.Fatalf("err: %v %#v %#v", err, last, last.TaskStates)
|
||||
})
|
||||
|
||||
if time.Since(start) > time.Duration(testutil.TestMultiplier()*5)*time.Second {
|
||||
t.Fatalf("took too long to terminate")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure pre-#2132 state files containing the Context struct are properly
|
||||
// migrated to the new format.
|
||||
//
|
||||
// Old Context State:
|
||||
//
|
||||
// "Context": {
|
||||
// "AllocDir": {
|
||||
// "AllocDir": "/path/to/allocs/2a54fcff-fc44-8d4f-e025-53c48e9cbbbb",
|
||||
// "SharedDir": "/path/to/allocs/2a54fcff-fc44-8d4f-e025-53c48e9cbbbb/alloc",
|
||||
// "TaskDirs": {
|
||||
// "echo1": "/path/to/allocs/2a54fcff-fc44-8d4f-e025-53c48e9cbbbb/echo1"
|
||||
// }
|
||||
// },
|
||||
// "AllocID": "2a54fcff-fc44-8d4f-e025-53c48e9cbbbb"
|
||||
// }
|
||||
func TestAllocRunner_RestoreOldState(t *testing.T) {
|
||||
t.Parallel()
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "10s",
|
||||
}
|
||||
|
||||
logger := testLogger()
|
||||
conf := config.DefaultConfig()
|
||||
conf.Node = mock.Node()
|
||||
conf.StateDir = os.TempDir()
|
||||
conf.AllocDir = os.TempDir()
|
||||
tmp, err := ioutil.TempFile("", "state-db")
|
||||
if err != nil {
|
||||
t.Fatalf("error creating state db file: %v", err)
|
||||
}
|
||||
db, err := bolt.Open(tmp.Name(), 0600, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating state db: %v", err)
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(filepath.Join(conf.StateDir, "alloc", alloc.ID), 0777); err != nil {
|
||||
t.Fatalf("error creating state dir: %v", err)
|
||||
}
|
||||
statePath := filepath.Join(conf.StateDir, "alloc", alloc.ID, "state.json")
|
||||
w, err := os.Create(statePath)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating state file: %v", err)
|
||||
}
|
||||
tmplctx := &struct {
|
||||
AllocID string
|
||||
AllocDir string
|
||||
}{alloc.ID, conf.AllocDir}
|
||||
err = template.Must(template.New("test_state").Parse(`{
|
||||
"Version": "0.5.1",
|
||||
"Alloc": {
|
||||
"ID": "{{ .AllocID }}",
|
||||
"Name": "example",
|
||||
"JobID": "example",
|
||||
"Job": {
|
||||
"ID": "example",
|
||||
"Name": "example",
|
||||
"Type": "batch",
|
||||
"TaskGroups": [
|
||||
{
|
||||
"Name": "example",
|
||||
"Tasks": [
|
||||
{
|
||||
"Name": "example",
|
||||
"Driver": "mock",
|
||||
"Config": {
|
||||
"exit_code": "0",
|
||||
"run_for": "10s"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
"TaskGroup": "example",
|
||||
"DesiredStatus": "run",
|
||||
"ClientStatus": "running",
|
||||
"TaskStates": {
|
||||
"example": {
|
||||
"State": "running",
|
||||
"Failed": false,
|
||||
"Events": []
|
||||
}
|
||||
}
|
||||
},
|
||||
"Context": {
|
||||
"AllocDir": {
|
||||
"AllocDir": "{{ .AllocDir }}/{{ .AllocID }}",
|
||||
"SharedDir": "{{ .AllocDir }}/{{ .AllocID }}/alloc",
|
||||
"TaskDirs": {
|
||||
"example": "{{ .AllocDir }}/{{ .AllocID }}/example"
|
||||
}
|
||||
},
|
||||
"AllocID": "{{ .AllocID }}"
|
||||
}
|
||||
}`)).Execute(w, tmplctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error writing state file: %v", err)
|
||||
}
|
||||
w.Close()
|
||||
|
||||
upd := &MockAllocStateUpdater{}
|
||||
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
|
||||
alloc.Job.Type = structs.JobTypeBatch
|
||||
vclient := vaultclient.NewMockVaultClient()
|
||||
cclient := newMockConsulServiceClient(t)
|
||||
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, cclient, noopPrevAlloc{})
|
||||
defer ar.Destroy()
|
||||
|
||||
// RestoreState should fail on the task state since we only test the
|
||||
// alloc state restoring.
|
||||
err = ar.RestoreState()
|
||||
if err == nil {
|
||||
t.Fatal("expected error restoring Task state")
|
||||
}
|
||||
merr, ok := err.(*multierror.Error)
|
||||
if !ok {
|
||||
t.Fatalf("expected RestoreState to return a multierror but found: %T -> %v", err, err)
|
||||
}
|
||||
if len(merr.Errors) != 1 {
|
||||
t.Fatalf("expected exactly 1 error from RestoreState but found: %d: %v", len(merr.Errors), err)
|
||||
}
|
||||
if expected := "failed to get task bucket"; !strings.Contains(merr.Errors[0].Error(), expected) {
|
||||
t.Fatalf("expected %q but got: %q", expected, merr.Errors[0].Error())
|
||||
}
|
||||
|
||||
if err := ar.SaveState(); err != nil {
|
||||
t.Fatalf("error saving new state: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
|
||||
t.Parallel()
|
||||
upd, ar := testAllocRunner(t, false)
|
||||
upd, ar := TestAllocRunner(t, false)
|
||||
|
||||
// Create two tasks in the task group
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
@@ -1343,7 +1063,7 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
|
||||
|
||||
func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
|
||||
t.Parallel()
|
||||
upd, ar := testAllocRunner(t, false)
|
||||
upd, ar := TestAllocRunner(t, false)
|
||||
|
||||
// Create two tasks in the task group
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
@@ -1417,7 +1137,7 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
|
||||
// with a leader the leader is stopped before other tasks.
|
||||
func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
|
||||
t.Parallel()
|
||||
upd, ar := testAllocRunner(t, false)
|
||||
upd, ar := TestAllocRunner(t, false)
|
||||
|
||||
// Create 3 tasks in the task group
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
@@ -1510,7 +1230,7 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
|
||||
// See https://github.com/hashicorp/nomad/issues/3420#issuecomment-341666932
|
||||
func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) {
|
||||
t.Parallel()
|
||||
_, ar := testAllocRunner(t, false)
|
||||
_, ar := TestAllocRunner(t, false)
|
||||
defer ar.Destroy()
|
||||
|
||||
// Create a leader and follower task in the task group
|
||||
@@ -1535,11 +1255,11 @@ func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) {
|
||||
ar.alloc.TaskResources[task2.Name] = task2.Resources
|
||||
|
||||
// Mimic Nomad exiting before the leader stopping is able to stop other tasks.
|
||||
ar.tasks = map[string]*TaskRunner{
|
||||
"leader": NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState,
|
||||
ar.tasks = map[string]*taskrunner.TaskRunner{
|
||||
"leader": taskrunner.NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState,
|
||||
ar.allocDir.NewTaskDir(task2.Name), ar.Alloc(), task2.Copy(),
|
||||
ar.vaultClient, ar.consulClient),
|
||||
"follower1": NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState,
|
||||
"follower1": taskrunner.NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState,
|
||||
ar.allocDir.NewTaskDir(task.Name), ar.Alloc(), task.Copy(),
|
||||
ar.vaultClient, ar.consulClient),
|
||||
}
|
||||
@@ -1606,7 +1326,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) {
|
||||
task.Config = map[string]interface{}{
|
||||
"run_for": "1s",
|
||||
}
|
||||
upd, ar := testAllocRunnerFromAlloc(t, alloc, false)
|
||||
upd, ar := TestAllocRunnerFromAlloc(t, alloc, false)
|
||||
go ar.Run()
|
||||
defer ar.Destroy()
|
||||
|
||||
@@ -1639,10 +1359,10 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) {
|
||||
task.Config = map[string]interface{}{
|
||||
"run_for": "1s",
|
||||
}
|
||||
upd2, ar2 := testAllocRunnerFromAlloc(t, alloc2, false)
|
||||
upd2, ar2 := TestAllocRunnerFromAlloc(t, alloc2, false)
|
||||
|
||||
// Set prevAlloc like Client does
|
||||
ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger, "")
|
||||
ar2.prevAlloc = NewAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger, "")
|
||||
|
||||
go ar2.Run()
|
||||
defer ar2.Destroy()
|
||||
@@ -1,4 +1,4 @@
|
||||
package client
|
||||
package allocrunner
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
@@ -20,6 +20,12 @@ import (
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
// getRemoteRetryIntv is minimum interval on which we retry
|
||||
// to fetch remote objects. We pick a value between this and 2x this.
|
||||
getRemoteRetryIntv = 30 * time.Second
|
||||
)
|
||||
|
||||
// rpcer is the interface needed by a prevAllocWatcher to make RPC calls.
|
||||
type rpcer interface {
|
||||
// RPC allows retrieving remote allocs.
|
||||
@@ -49,13 +55,13 @@ type prevAllocWatcher interface {
|
||||
IsMigrating() bool
|
||||
}
|
||||
|
||||
// newAllocWatcher creates a prevAllocWatcher appropriate for whether this
|
||||
// NewAllocWatcher creates a prevAllocWatcher appropriate for whether this
|
||||
// alloc's previous allocation was local or remote. If this alloc has no
|
||||
// previous alloc then a noop implementation is returned.
|
||||
func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger, migrateToken string) prevAllocWatcher {
|
||||
func NewAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger, migrateToken string) prevAllocWatcher {
|
||||
if alloc.PreviousAllocation == "" {
|
||||
// No previous allocation, use noop transitioner
|
||||
return noopPrevAlloc{}
|
||||
return NoopPrevAlloc{}
|
||||
}
|
||||
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
@@ -295,7 +301,7 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error {
|
||||
err := p.rpc.RPC("Alloc.GetAlloc", &req, &resp)
|
||||
if err != nil {
|
||||
p.logger.Printf("[ERR] client: failed to query previous alloc %q: %v", p.prevAllocID, err)
|
||||
retry := getAllocRetryIntv + lib.RandomStagger(getAllocRetryIntv)
|
||||
retry := getRemoteRetryIntv + lib.RandomStagger(getRemoteRetryIntv)
|
||||
select {
|
||||
case <-time.After(retry):
|
||||
continue
|
||||
@@ -386,7 +392,7 @@ func (p *remotePrevAlloc) getNodeAddr(ctx context.Context, nodeID string) (strin
|
||||
err := p.rpc.RPC("Node.GetNode", &req, &resp)
|
||||
if err != nil {
|
||||
p.logger.Printf("[ERR] client: failed to query node info %q: %v", nodeID, err)
|
||||
retry := getAllocRetryIntv + lib.RandomStagger(getAllocRetryIntv)
|
||||
retry := getRemoteRetryIntv + lib.RandomStagger(getRemoteRetryIntv)
|
||||
select {
|
||||
case <-time.After(retry):
|
||||
continue
|
||||
@@ -568,15 +574,15 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser
|
||||
return nil
|
||||
}
|
||||
|
||||
// noopPrevAlloc does not block or migrate on a previous allocation and never
|
||||
// NoopPrevAlloc does not block or migrate on a previous allocation and never
|
||||
// returns an error.
|
||||
type noopPrevAlloc struct{}
|
||||
type NoopPrevAlloc struct{}
|
||||
|
||||
// Wait returns nil immediately.
|
||||
func (noopPrevAlloc) Wait(context.Context) error { return nil }
|
||||
func (NoopPrevAlloc) Wait(context.Context) error { return nil }
|
||||
|
||||
// Migrate returns nil immediately.
|
||||
func (noopPrevAlloc) Migrate(context.Context, *allocdir.AllocDir) error { return nil }
|
||||
func (NoopPrevAlloc) Migrate(context.Context, *allocdir.AllocDir) error { return nil }
|
||||
|
||||
func (noopPrevAlloc) IsWaiting() bool { return false }
|
||||
func (noopPrevAlloc) IsMigrating() bool { return false }
|
||||
func (NoopPrevAlloc) IsWaiting() bool { return false }
|
||||
func (NoopPrevAlloc) IsMigrating() bool { return false }
|
||||
@@ -1,4 +1,4 @@
|
||||
package client
|
||||
package allocrunner
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
@@ -15,15 +15,15 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/testutil"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
)
|
||||
|
||||
// TestPrevAlloc_LocalPrevAlloc asserts that when a previous alloc runner is
|
||||
// set a localPrevAlloc will block on it.
|
||||
func TestPrevAlloc_LocalPrevAlloc(t *testing.T) {
|
||||
_, prevAR := testAllocRunner(t, false)
|
||||
_, prevAR := TestAllocRunner(t, false)
|
||||
prevAR.alloc.Job.TaskGroups[0].Tasks[0].Config["run_for"] = "10s"
|
||||
|
||||
newAlloc := mock.Alloc()
|
||||
@@ -33,7 +33,7 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) {
|
||||
task.Driver = "mock_driver"
|
||||
task.Config["run_for"] = "500ms"
|
||||
|
||||
waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger(), "")
|
||||
waiter := NewAllocWatcher(newAlloc, prevAR, nil, nil, testlog.Logger(t), "")
|
||||
|
||||
// Wait in a goroutine with a context to make sure it exits at the right time
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@@ -177,14 +177,8 @@ func TestPrevAlloc_StreamAllocDir_Ok(t *testing.T) {
|
||||
}
|
||||
defer os.RemoveAll(dir1)
|
||||
|
||||
c1 := TestClient(t, func(c *config.Config) {
|
||||
c.RPCHandler = nil
|
||||
})
|
||||
defer c1.Shutdown()
|
||||
|
||||
rc := ioutil.NopCloser(buf)
|
||||
|
||||
prevAlloc := &remotePrevAlloc{logger: testLogger()}
|
||||
prevAlloc := &remotePrevAlloc{logger: testlog.Logger(t)}
|
||||
if err := prevAlloc.streamAllocDir(context.Background(), rc, dir1); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -234,7 +228,7 @@ func TestPrevAlloc_StreamAllocDir_Error(t *testing.T) {
|
||||
// This test only unit tests streamAllocDir so we only need a partially
|
||||
// complete remotePrevAlloc
|
||||
prevAlloc := &remotePrevAlloc{
|
||||
logger: testLogger(),
|
||||
logger: testlog.Logger(t),
|
||||
allocID: "123",
|
||||
prevAllocID: "abc",
|
||||
migrate: true,
|
||||
@@ -1,4 +1,4 @@
|
||||
package client
|
||||
package taskrunner
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -1,4 +1,4 @@
|
||||
package client
|
||||
package taskrunner
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
19
client/allocrunner/taskrunner/getters.go
Normal file
19
client/allocrunner/taskrunner/getters.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package taskrunner
|
||||
|
||||
// Name returns the name of the task
|
||||
func (r *TaskRunner) Name() string {
|
||||
if r == nil || r.task == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
return r.task.Name
|
||||
}
|
||||
|
||||
// IsLeader returns whether the task is a leader task
|
||||
func (r *TaskRunner) IsLeader() bool {
|
||||
if r == nil || r.task == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return r.task.Leader
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package client
|
||||
package restarts
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -20,7 +20,7 @@ const (
|
||||
ReasonDelay = "Exceeded allowed attempts, applying a delay"
|
||||
)
|
||||
|
||||
func newRestartTracker(policy *structs.RestartPolicy, jobType string) *RestartTracker {
|
||||
func NewRestartTracker(policy *structs.RestartPolicy, jobType string) *RestartTracker {
|
||||
onSuccess := true
|
||||
if jobType == structs.JobTypeBatch {
|
||||
onSuccess = false
|
||||
@@ -54,6 +54,13 @@ func (r *RestartTracker) SetPolicy(policy *structs.RestartPolicy) {
|
||||
r.policy = policy
|
||||
}
|
||||
|
||||
// GetPolicy returns a copy of the policy used to determine restarts.
|
||||
func (r *RestartTracker) GetPolicy() *structs.RestartPolicy {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
return r.policy.Copy()
|
||||
}
|
||||
|
||||
// SetStartError is used to mark the most recent start error. If starting was
|
||||
// successful the error should be nil.
|
||||
func (r *RestartTracker) SetStartError(err error) *RestartTracker {
|
||||
@@ -1,4 +1,4 @@
|
||||
package client
|
||||
package restarts
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -32,7 +32,7 @@ func testWaitResult(exit int) *cstructs.WaitResult {
|
||||
func TestClient_RestartTracker_ModeDelay(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(true, structs.RestartPolicyModeDelay)
|
||||
rt := newRestartTracker(p, structs.JobTypeService)
|
||||
rt := NewRestartTracker(p, structs.JobTypeService)
|
||||
for i := 0; i < p.Attempts; i++ {
|
||||
state, when := rt.SetWaitResult(testWaitResult(127)).GetState()
|
||||
if state != structs.TaskRestarting {
|
||||
@@ -58,7 +58,7 @@ func TestClient_RestartTracker_ModeDelay(t *testing.T) {
|
||||
func TestClient_RestartTracker_ModeFail(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(true, structs.RestartPolicyModeFail)
|
||||
rt := newRestartTracker(p, structs.JobTypeSystem)
|
||||
rt := NewRestartTracker(p, structs.JobTypeSystem)
|
||||
for i := 0; i < p.Attempts; i++ {
|
||||
state, when := rt.SetWaitResult(testWaitResult(127)).GetState()
|
||||
if state != structs.TaskRestarting {
|
||||
@@ -78,7 +78,7 @@ func TestClient_RestartTracker_ModeFail(t *testing.T) {
|
||||
func TestClient_RestartTracker_NoRestartOnSuccess(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(false, structs.RestartPolicyModeDelay)
|
||||
rt := newRestartTracker(p, structs.JobTypeBatch)
|
||||
rt := NewRestartTracker(p, structs.JobTypeBatch)
|
||||
if state, _ := rt.SetWaitResult(testWaitResult(0)).GetState(); state != structs.TaskTerminated {
|
||||
t.Fatalf("NextRestart() returned %v, expected: %v", state, structs.TaskTerminated)
|
||||
}
|
||||
@@ -90,28 +90,28 @@ func TestClient_RestartTracker_ZeroAttempts(t *testing.T) {
|
||||
p.Attempts = 0
|
||||
|
||||
// Test with a non-zero exit code
|
||||
rt := newRestartTracker(p, structs.JobTypeService)
|
||||
rt := NewRestartTracker(p, structs.JobTypeService)
|
||||
if state, when := rt.SetWaitResult(testWaitResult(1)).GetState(); state != structs.TaskNotRestarting {
|
||||
t.Fatalf("expect no restart, got restart/delay: %v/%v", state, when)
|
||||
}
|
||||
|
||||
// Even with a zero (successful) exit code non-batch jobs should exit
|
||||
// with TaskNotRestarting
|
||||
rt = newRestartTracker(p, structs.JobTypeService)
|
||||
rt = NewRestartTracker(p, structs.JobTypeService)
|
||||
if state, when := rt.SetWaitResult(testWaitResult(0)).GetState(); state != structs.TaskNotRestarting {
|
||||
t.Fatalf("expect no restart, got restart/delay: %v/%v", state, when)
|
||||
}
|
||||
|
||||
// Batch jobs with a zero exit code and 0 attempts *do* exit cleanly
|
||||
// with Terminated
|
||||
rt = newRestartTracker(p, structs.JobTypeBatch)
|
||||
rt = NewRestartTracker(p, structs.JobTypeBatch)
|
||||
if state, when := rt.SetWaitResult(testWaitResult(0)).GetState(); state != structs.TaskTerminated {
|
||||
t.Fatalf("expect terminated, got restart/delay: %v/%v", state, when)
|
||||
}
|
||||
|
||||
// Batch jobs with a non-zero exit code and 0 attempts exit with
|
||||
// TaskNotRestarting
|
||||
rt = newRestartTracker(p, structs.JobTypeBatch)
|
||||
rt = NewRestartTracker(p, structs.JobTypeBatch)
|
||||
if state, when := rt.SetWaitResult(testWaitResult(1)).GetState(); state != structs.TaskNotRestarting {
|
||||
t.Fatalf("expect no restart, got restart/delay: %v/%v", state, when)
|
||||
}
|
||||
@@ -121,7 +121,7 @@ func TestClient_RestartTracker_RestartTriggered(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(true, structs.RestartPolicyModeFail)
|
||||
p.Attempts = 0
|
||||
rt := newRestartTracker(p, structs.JobTypeService)
|
||||
rt := NewRestartTracker(p, structs.JobTypeService)
|
||||
if state, when := rt.SetRestartTriggered(false).GetState(); state != structs.TaskRestarting && when != 0 {
|
||||
t.Fatalf("expect restart immediately, got %v %v", state, when)
|
||||
}
|
||||
@@ -131,7 +131,7 @@ func TestClient_RestartTracker_RestartTriggered_Failure(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(true, structs.RestartPolicyModeFail)
|
||||
p.Attempts = 1
|
||||
rt := newRestartTracker(p, structs.JobTypeService)
|
||||
rt := NewRestartTracker(p, structs.JobTypeService)
|
||||
if state, when := rt.SetRestartTriggered(true).GetState(); state != structs.TaskRestarting || when == 0 {
|
||||
t.Fatalf("expect restart got %v %v", state, when)
|
||||
}
|
||||
@@ -143,7 +143,7 @@ func TestClient_RestartTracker_RestartTriggered_Failure(t *testing.T) {
|
||||
func TestClient_RestartTracker_StartError_Recoverable_Fail(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(true, structs.RestartPolicyModeFail)
|
||||
rt := newRestartTracker(p, structs.JobTypeSystem)
|
||||
rt := NewRestartTracker(p, structs.JobTypeSystem)
|
||||
recErr := structs.NewRecoverableError(fmt.Errorf("foo"), true)
|
||||
for i := 0; i < p.Attempts; i++ {
|
||||
state, when := rt.SetStartError(recErr).GetState()
|
||||
@@ -164,7 +164,7 @@ func TestClient_RestartTracker_StartError_Recoverable_Fail(t *testing.T) {
|
||||
func TestClient_RestartTracker_StartError_Recoverable_Delay(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(true, structs.RestartPolicyModeDelay)
|
||||
rt := newRestartTracker(p, structs.JobTypeSystem)
|
||||
rt := NewRestartTracker(p, structs.JobTypeSystem)
|
||||
recErr := structs.NewRecoverableError(fmt.Errorf("foo"), true)
|
||||
for i := 0; i < p.Attempts; i++ {
|
||||
state, when := rt.SetStartError(recErr).GetState()
|
||||
@@ -1,4 +1,4 @@
|
||||
package client
|
||||
package taskrunner
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
@@ -21,9 +21,12 @@ import (
|
||||
"github.com/hashicorp/go-multierror"
|
||||
version "github.com/hashicorp/go-version"
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/getter"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/driver"
|
||||
"github.com/hashicorp/nomad/client/getter"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
@@ -89,8 +92,8 @@ type TaskRunner struct {
|
||||
config *config.Config
|
||||
updater TaskStateUpdater
|
||||
logger *log.Logger
|
||||
restartTracker *RestartTracker
|
||||
consul ConsulServiceAPI
|
||||
restartTracker *restarts.RestartTracker
|
||||
consul consulApi.ConsulServiceAPI
|
||||
|
||||
// running marks whether the task is running
|
||||
running bool
|
||||
@@ -230,7 +233,7 @@ type SignalEvent struct {
|
||||
func NewTaskRunner(logger *log.Logger, config *config.Config,
|
||||
stateDB *bolt.DB, updater TaskStateUpdater, taskDir *allocdir.TaskDir,
|
||||
alloc *structs.Allocation, task *structs.Task,
|
||||
vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI) *TaskRunner {
|
||||
vaultClient vaultclient.VaultClient, consulClient consulApi.ConsulServiceAPI) *TaskRunner {
|
||||
|
||||
// Merge in the task resources
|
||||
task.Resources = alloc.TaskResources[task.Name]
|
||||
@@ -241,7 +244,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
|
||||
logger.Printf("[ERR] client: alloc %q for missing task group %q", alloc.ID, alloc.TaskGroup)
|
||||
return nil
|
||||
}
|
||||
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)
|
||||
restartTracker := restarts.NewRestartTracker(tg.RestartPolicy, alloc.Job.Type)
|
||||
|
||||
// Initialize the environment builder
|
||||
envBuilder := env.NewBuilder(config.Node, alloc, task, config.Region)
|
||||
@@ -329,40 +332,20 @@ func (r *TaskRunner) pre060StateFilePath() string {
|
||||
// backwards incompatible upgrades that need to restart tasks with a new
|
||||
// executor.
|
||||
func (r *TaskRunner) RestoreState() (string, error) {
|
||||
// COMPAT: Remove in 0.7.0
|
||||
// 0.6.0 transitioned from individual state files to a single bolt-db.
|
||||
// The upgrade path is to:
|
||||
// Check if old state exists
|
||||
// If so, restore from that and delete old state
|
||||
// Restore using state database
|
||||
|
||||
var snap taskRunnerState
|
||||
|
||||
// Check if the old snapshot is there
|
||||
oldPath := r.pre060StateFilePath()
|
||||
if err := pre060RestoreState(oldPath, &snap); err == nil {
|
||||
// Delete the old state
|
||||
os.RemoveAll(oldPath)
|
||||
} else if !os.IsNotExist(err) {
|
||||
// Something corrupt in the old state file
|
||||
return "", err
|
||||
} else {
|
||||
// We are doing a normal restore
|
||||
err := r.stateDB.View(func(tx *bolt.Tx) error {
|
||||
bkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get task bucket: %v", err)
|
||||
}
|
||||
|
||||
if err := getObject(bkt, taskRunnerStateAllKey, &snap); err != nil {
|
||||
return fmt.Errorf("failed to read task runner state: %v", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
err := r.stateDB.View(func(tx *bolt.Tx) error {
|
||||
bkt, err := state.GetTaskBucket(tx, r.alloc.ID, r.task.Name)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return fmt.Errorf("failed to get task bucket: %v", err)
|
||||
}
|
||||
|
||||
if err := state.GetObject(bkt, taskRunnerStateAllKey, &snap); err != nil {
|
||||
return fmt.Errorf("failed to read task runner state: %v", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Restore fields from the snapshot
|
||||
@@ -510,12 +493,12 @@ func (r *TaskRunner) SaveState() error {
|
||||
// Start the transaction.
|
||||
return r.stateDB.Batch(func(tx *bolt.Tx) error {
|
||||
// Grab the task bucket
|
||||
taskBkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name)
|
||||
taskBkt, err := state.GetTaskBucket(tx, r.alloc.ID, r.task.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
|
||||
}
|
||||
|
||||
if err := putData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil {
|
||||
if err := state.PutData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil {
|
||||
return fmt.Errorf("failed to write task_runner state: %v", err)
|
||||
}
|
||||
|
||||
@@ -534,7 +517,7 @@ func (r *TaskRunner) DestroyState() error {
|
||||
defer r.persistLock.Unlock()
|
||||
|
||||
return r.stateDB.Update(func(tx *bolt.Tx) error {
|
||||
if err := deleteTaskBucket(tx, r.alloc.ID, r.task.Name); err != nil {
|
||||
if err := state.DeleteTaskBucket(tx, r.alloc.ID, r.task.Name); err != nil {
|
||||
return fmt.Errorf("failed to delete task bucket: %v", err)
|
||||
}
|
||||
return nil
|
||||
@@ -1,4 +1,4 @@
|
||||
package client
|
||||
package taskrunner
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -17,7 +17,9 @@ import (
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/driver/env"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
@@ -40,9 +42,9 @@ func prefixedTestLogger(prefix string) *log.Logger {
|
||||
}
|
||||
|
||||
// Returns a tracker that never restarts.
|
||||
func noRestartsTracker() *RestartTracker {
|
||||
func noRestartsTracker() *restarts.RestartTracker {
|
||||
policy := &structs.RestartPolicy{Attempts: 0, Mode: structs.RestartPolicyModeFail}
|
||||
return newRestartTracker(policy, structs.JobTypeBatch)
|
||||
return restarts.NewRestartTracker(policy, structs.JobTypeBatch)
|
||||
}
|
||||
|
||||
type MockTaskStateUpdater struct {
|
||||
@@ -387,8 +389,8 @@ func TestTaskRunner_Update(t *testing.T) {
|
||||
if ctx.tr.task.Driver != newTask.Driver {
|
||||
return false, fmt.Errorf("Task not copied")
|
||||
}
|
||||
if ctx.tr.restartTracker.policy.Mode != newMode {
|
||||
return false, fmt.Errorf("expected restart policy %q but found %q", newMode, ctx.tr.restartTracker.policy.Mode)
|
||||
if ctx.tr.restartTracker.GetPolicy().Mode != newMode {
|
||||
return false, fmt.Errorf("expected restart policy %q but found %q", newMode, ctx.tr.restartTracker.GetPolicy().Mode)
|
||||
}
|
||||
if ctx.tr.handle.ID() == oldHandle {
|
||||
return false, fmt.Errorf("handle not ctx.updated")
|
||||
@@ -642,7 +644,7 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) {
|
||||
ctx := testTaskRunnerFromAlloc(t, true, alloc)
|
||||
|
||||
// Use mockConsulServiceClient
|
||||
consul := newMockConsulServiceClient(t)
|
||||
consul := consulApi.NewMockConsulServiceClient(t)
|
||||
ctx.tr.consul = consul
|
||||
|
||||
ctx.tr.MarkReceived()
|
||||
@@ -650,26 +652,26 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) {
|
||||
defer ctx.Cleanup()
|
||||
|
||||
// Assert it is properly registered and unregistered
|
||||
if expected := 6; len(consul.ops) != expected {
|
||||
t.Errorf("expected %d consul ops but found: %d", expected, len(consul.ops))
|
||||
if expected := 6; len(consul.Ops) != expected {
|
||||
t.Errorf("expected %d consul ops but found: %d", expected, len(consul.Ops))
|
||||
}
|
||||
if consul.ops[0].op != "add" {
|
||||
t.Errorf("expected first op to be add but found: %q", consul.ops[0].op)
|
||||
if consul.Ops[0].Op != "add" {
|
||||
t.Errorf("expected first Op to be add but found: %q", consul.Ops[0].Op)
|
||||
}
|
||||
if consul.ops[1].op != "remove" {
|
||||
t.Errorf("expected second op to be remove but found: %q", consul.ops[1].op)
|
||||
if consul.Ops[1].Op != "remove" {
|
||||
t.Errorf("expected second op to be remove but found: %q", consul.Ops[1].Op)
|
||||
}
|
||||
if consul.ops[2].op != "remove" {
|
||||
t.Errorf("expected third op to be remove but found: %q", consul.ops[2].op)
|
||||
if consul.Ops[2].Op != "remove" {
|
||||
t.Errorf("expected third op to be remove but found: %q", consul.Ops[2].Op)
|
||||
}
|
||||
if consul.ops[3].op != "add" {
|
||||
t.Errorf("expected fourth op to be add but found: %q", consul.ops[3].op)
|
||||
if consul.Ops[3].Op != "add" {
|
||||
t.Errorf("expected fourth op to be add but found: %q", consul.Ops[3].Op)
|
||||
}
|
||||
if consul.ops[4].op != "remove" {
|
||||
t.Errorf("expected fifth op to be remove but found: %q", consul.ops[4].op)
|
||||
if consul.Ops[4].Op != "remove" {
|
||||
t.Errorf("expected fifth op to be remove but found: %q", consul.Ops[4].Op)
|
||||
}
|
||||
if consul.ops[5].op != "remove" {
|
||||
t.Errorf("expected sixth op to be remove but found: %q", consul.ops[5].op)
|
||||
if consul.Ops[5].Op != "remove" {
|
||||
t.Errorf("expected sixth op to be remove but found: %q", consul.Ops[5].Op)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// +build !windows
|
||||
|
||||
package client
|
||||
package taskrunner
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
65
client/allocrunner/testing.go
Normal file
65
client/allocrunner/testing.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package allocrunner
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
type MockAllocStateUpdater struct {
|
||||
Allocs []*structs.Allocation
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// Update fulfills the TaskStateUpdater interface
|
||||
func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) {
|
||||
m.mu.Lock()
|
||||
m.Allocs = append(m.Allocs, alloc)
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// Last returns a copy of the last alloc (or nil) sync'd
|
||||
func (m *MockAllocStateUpdater) Last() *structs.Allocation {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
n := len(m.Allocs)
|
||||
if n == 0 {
|
||||
return nil
|
||||
}
|
||||
return m.Allocs[n-1].Copy()
|
||||
}
|
||||
|
||||
func TestAllocRunnerFromAlloc(t *testing.T, alloc *structs.Allocation, restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
|
||||
conf := config.DefaultConfig()
|
||||
conf.Node = mock.Node()
|
||||
conf.StateDir = os.TempDir()
|
||||
conf.AllocDir = os.TempDir()
|
||||
tmp, _ := ioutil.TempFile("", "state-db")
|
||||
db, _ := bolt.Open(tmp.Name(), 0600, nil)
|
||||
upd := &MockAllocStateUpdater{}
|
||||
if !restarts {
|
||||
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
|
||||
alloc.Job.Type = structs.JobTypeBatch
|
||||
}
|
||||
vclient := vaultclient.NewMockVaultClient()
|
||||
ar := NewAllocRunner(testlog.Logger(t), conf, db, upd.Update, alloc, vclient, consulApi.NewMockConsulServiceClient(t), NoopPrevAlloc{})
|
||||
return upd, ar
|
||||
}
|
||||
|
||||
func TestAllocRunner(t *testing.T, restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
|
||||
// Use mock driver
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config["run_for"] = "500ms"
|
||||
return TestAllocRunnerFromAlloc(t, alloc, restarts)
|
||||
}
|
||||
@@ -21,8 +21,11 @@ import (
|
||||
"github.com/hashicorp/consul/lib"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/allocrunner"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/servers"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/stats"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
@@ -88,7 +91,7 @@ const (
|
||||
type ClientStatsReporter interface {
|
||||
// GetAllocStats returns the AllocStatsReporter for the passed allocation.
|
||||
// If it does not exist an error is reported.
|
||||
GetAllocStats(allocID string) (AllocStatsReporter, error)
|
||||
GetAllocStats(allocID string) (allocrunner.AllocStatsReporter, error)
|
||||
|
||||
// LatestHostStats returns the latest resource usage stats for the host
|
||||
LatestHostStats() *stats.HostStats
|
||||
@@ -145,7 +148,7 @@ type Client struct {
|
||||
|
||||
// allocs maps alloc IDs to their AllocRunner. This map includes all
|
||||
// AllocRunners - running and GC'd - until the server GCs them.
|
||||
allocs map[string]*AllocRunner
|
||||
allocs map[string]*allocrunner.AllocRunner
|
||||
allocLock sync.RWMutex
|
||||
|
||||
// allocUpdates stores allocations that need to be synced to the server.
|
||||
@@ -153,7 +156,7 @@ type Client struct {
|
||||
|
||||
// consulService is Nomad's custom Consul client for managing services
|
||||
// and checks.
|
||||
consulService ConsulServiceAPI
|
||||
consulService consulApi.ConsulServiceAPI
|
||||
|
||||
// consulCatalog is the subset of Consul's Catalog API Nomad uses.
|
||||
consulCatalog consul.CatalogAPI
|
||||
@@ -193,7 +196,7 @@ var (
|
||||
)
|
||||
|
||||
// NewClient is used to create a new client from the given configuration
|
||||
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulService ConsulServiceAPI, logger *log.Logger) (*Client, error) {
|
||||
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulService consulApi.ConsulServiceAPI, logger *log.Logger) (*Client, error) {
|
||||
// Create the tls wrapper
|
||||
var tlsWrap tlsutil.RegionWrapper
|
||||
if cfg.TLSConfig.EnableRPC {
|
||||
@@ -217,7 +220,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
tlsWrap: tlsWrap,
|
||||
streamingRpcs: structs.NewStreamingRpcRegistry(),
|
||||
logger: logger,
|
||||
allocs: make(map[string]*AllocRunner),
|
||||
allocs: make(map[string]*allocrunner.AllocRunner),
|
||||
allocUpdates: make(chan *structs.Allocation, 64),
|
||||
shutdownCh: make(chan struct{}),
|
||||
triggerDiscoveryCh: make(chan struct{}),
|
||||
@@ -562,7 +565,7 @@ func (c *Client) StatsReporter() ClientStatsReporter {
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Client) GetAllocStats(allocID string) (AllocStatsReporter, error) {
|
||||
func (c *Client) GetAllocStats(allocID string) (allocrunner.AllocStatsReporter, error) {
|
||||
c.allocLock.RLock()
|
||||
defer c.allocLock.RUnlock()
|
||||
ar, ok := c.allocs[allocID]
|
||||
@@ -714,7 +717,7 @@ func (c *Client) restoreState() error {
|
||||
} else {
|
||||
// Normal path
|
||||
err := c.stateDB.View(func(tx *bolt.Tx) error {
|
||||
allocs, err = getAllAllocationIDs(tx)
|
||||
allocs, err = state.GetAllAllocationIDs(tx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to list allocations: %v", err)
|
||||
}
|
||||
@@ -731,10 +734,10 @@ func (c *Client) restoreState() error {
|
||||
alloc := &structs.Allocation{ID: id}
|
||||
|
||||
// don't worry about blocking/migrating when restoring
|
||||
watcher := noopPrevAlloc{}
|
||||
watcher := allocrunner.NoopPrevAlloc{}
|
||||
|
||||
c.configLock.RLock()
|
||||
ar := NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, watcher)
|
||||
ar := allocrunner.NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, watcher)
|
||||
c.configLock.RUnlock()
|
||||
|
||||
c.allocLock.Lock()
|
||||
@@ -778,7 +781,7 @@ func (c *Client) saveState() error {
|
||||
wg.Add(len(runners))
|
||||
|
||||
for id, ar := range runners {
|
||||
go func(id string, ar *AllocRunner) {
|
||||
go func(id string, ar *allocrunner.AllocRunner) {
|
||||
err := ar.SaveState()
|
||||
if err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to save state for alloc %q: %v", id, err)
|
||||
@@ -795,10 +798,10 @@ func (c *Client) saveState() error {
|
||||
}
|
||||
|
||||
// getAllocRunners returns a snapshot of the current set of alloc runners.
|
||||
func (c *Client) getAllocRunners() map[string]*AllocRunner {
|
||||
func (c *Client) getAllocRunners() map[string]*allocrunner.AllocRunner {
|
||||
c.allocLock.RLock()
|
||||
defer c.allocLock.RUnlock()
|
||||
runners := make(map[string]*AllocRunner, len(c.allocs))
|
||||
runners := make(map[string]*allocrunner.AllocRunner, len(c.allocs))
|
||||
for id, ar := range c.allocs {
|
||||
runners[id] = ar
|
||||
}
|
||||
@@ -1677,7 +1680,7 @@ OUTER:
|
||||
// allocation or if the alloc runner requires an updated allocation.
|
||||
runner, ok := runners[allocID]
|
||||
|
||||
if !ok || runner.shouldUpdate(modifyIndex) {
|
||||
if !ok || runner.ShouldUpdate(modifyIndex) {
|
||||
// Only pull allocs that are required. Filtered
|
||||
// allocs might be at a higher index, so ignore
|
||||
// it.
|
||||
@@ -1810,7 +1813,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
|
||||
c.allocLock.RLock()
|
||||
exist := make([]*structs.Allocation, 0, len(c.allocs))
|
||||
for _, ar := range c.allocs {
|
||||
exist = append(exist, ar.alloc)
|
||||
exist = append(exist, ar.Alloc())
|
||||
}
|
||||
c.allocLock.RUnlock()
|
||||
|
||||
@@ -1899,18 +1902,18 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
|
||||
|
||||
// get the previous alloc runner - if one exists - for the
|
||||
// blocking/migrating watcher
|
||||
var prevAR *AllocRunner
|
||||
var prevAR *allocrunner.AllocRunner
|
||||
if alloc.PreviousAllocation != "" {
|
||||
prevAR = c.allocs[alloc.PreviousAllocation]
|
||||
}
|
||||
|
||||
c.configLock.RLock()
|
||||
prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger, migrateToken)
|
||||
prevAlloc := allocrunner.NewAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger, migrateToken)
|
||||
|
||||
// Copy the config since the node can be swapped out as it is being updated.
|
||||
// The long term fix is to pass in the config and node separately and then
|
||||
// we don't have to do a copy.
|
||||
ar := NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc)
|
||||
ar := allocrunner.NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc)
|
||||
c.configLock.RUnlock()
|
||||
|
||||
// Store the alloc runner.
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/driver"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
@@ -603,8 +604,8 @@ func TestClient_SaveRestoreState(t *testing.T) {
|
||||
// Create a new client
|
||||
logger := log.New(c1.config.LogOutput, "", log.LstdFlags)
|
||||
catalog := consul.NewMockCatalog(logger)
|
||||
mockService := newMockConsulServiceClient(t)
|
||||
mockService.logger = logger
|
||||
mockService := consulApi.NewMockConsulServiceClient(t)
|
||||
mockService.Logger = logger
|
||||
c2, err := NewClient(c1.config, catalog, mockService, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package client
|
||||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
86
client/consul/consul_testing.go
Normal file
86
client/consul/consul_testing.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/mitchellh/go-testing-interface"
|
||||
)
|
||||
|
||||
// MockConsulOp represents the register/deregister operations.
|
||||
type MockConsulOp struct {
|
||||
Op string // add, remove, or update
|
||||
AllocID string
|
||||
Task string
|
||||
}
|
||||
|
||||
func NewMockConsulOp(op, allocID, task string) MockConsulOp {
|
||||
if op != "add" && op != "remove" && op != "update" && op != "alloc_registrations" {
|
||||
panic(fmt.Errorf("invalid consul op: %s", op))
|
||||
}
|
||||
return MockConsulOp{
|
||||
Op: op,
|
||||
AllocID: allocID,
|
||||
Task: task,
|
||||
}
|
||||
}
|
||||
|
||||
// MockConsulServiceClient implements the ConsulServiceAPI interface to record
|
||||
// and log task registration/deregistration.
|
||||
type MockConsulServiceClient struct {
|
||||
Ops []MockConsulOp
|
||||
mu sync.Mutex
|
||||
|
||||
Logger *log.Logger
|
||||
|
||||
// AllocRegistrationsFn allows injecting return values for the
|
||||
// AllocRegistrations function.
|
||||
AllocRegistrationsFn func(allocID string) (*consul.AllocRegistration, error)
|
||||
}
|
||||
|
||||
func NewMockConsulServiceClient(t testing.T) *MockConsulServiceClient {
|
||||
m := MockConsulServiceClient{
|
||||
Ops: make([]MockConsulOp, 0, 20),
|
||||
Logger: testlog.Logger(t),
|
||||
}
|
||||
return &m
|
||||
}
|
||||
|
||||
func (m *MockConsulServiceClient) UpdateTask(old, new *consul.TaskServices) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.Logger.Printf("[TEST] mock_consul: UpdateTask(alloc: %s, task: %s)", new.AllocID[:6], new.Name)
|
||||
m.Ops = append(m.Ops, NewMockConsulOp("update", new.AllocID, new.Name))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockConsulServiceClient) RegisterTask(task *consul.TaskServices) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.Logger.Printf("[TEST] mock_consul: RegisterTask(alloc: %s, task: %s)", task.AllocID, task.Name)
|
||||
m.Ops = append(m.Ops, NewMockConsulOp("add", task.AllocID, task.Name))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockConsulServiceClient) RemoveTask(task *consul.TaskServices) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.Logger.Printf("[TEST] mock_consul: RemoveTask(%q, %q)", task.AllocID, task.Name)
|
||||
m.Ops = append(m.Ops, NewMockConsulOp("remove", task.AllocID, task.Name))
|
||||
}
|
||||
|
||||
func (m *MockConsulServiceClient) AllocRegistrations(allocID string) (*consul.AllocRegistration, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.Logger.Printf("[TEST] mock_consul: AllocRegistrations(%q)", allocID)
|
||||
m.Ops = append(m.Ops, NewMockConsulOp("alloc_registrations", allocID, ""))
|
||||
|
||||
if m.AllocRegistrationsFn != nil {
|
||||
return m.AllocRegistrationsFn(allocID)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
@@ -1,86 +0,0 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/mitchellh/go-testing-interface"
|
||||
)
|
||||
|
||||
// mockConsulOp represents the register/deregister operations.
|
||||
type mockConsulOp struct {
|
||||
op string // add, remove, or update
|
||||
allocID string
|
||||
task string
|
||||
}
|
||||
|
||||
func newMockConsulOp(op, allocID, task string) mockConsulOp {
|
||||
if op != "add" && op != "remove" && op != "update" && op != "alloc_registrations" {
|
||||
panic(fmt.Errorf("invalid consul op: %s", op))
|
||||
}
|
||||
return mockConsulOp{
|
||||
op: op,
|
||||
allocID: allocID,
|
||||
task: task,
|
||||
}
|
||||
}
|
||||
|
||||
// mockConsulServiceClient implements the ConsulServiceAPI interface to record
|
||||
// and log task registration/deregistration.
|
||||
type mockConsulServiceClient struct {
|
||||
ops []mockConsulOp
|
||||
mu sync.Mutex
|
||||
|
||||
logger *log.Logger
|
||||
|
||||
// allocRegistrationsFn allows injecting return values for the
|
||||
// AllocRegistrations function.
|
||||
allocRegistrationsFn func(allocID string) (*consul.AllocRegistration, error)
|
||||
}
|
||||
|
||||
func newMockConsulServiceClient(t testing.T) *mockConsulServiceClient {
|
||||
m := mockConsulServiceClient{
|
||||
ops: make([]mockConsulOp, 0, 20),
|
||||
logger: testlog.Logger(t),
|
||||
}
|
||||
return &m
|
||||
}
|
||||
|
||||
func (m *mockConsulServiceClient) UpdateTask(old, new *consul.TaskServices) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.logger.Printf("[TEST] mock_consul: UpdateTask(alloc: %s, task: %s)", new.AllocID[:6], new.Name)
|
||||
m.ops = append(m.ops, newMockConsulOp("update", new.AllocID, new.Name))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockConsulServiceClient) RegisterTask(task *consul.TaskServices) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.logger.Printf("[TEST] mock_consul: RegisterTask(alloc: %s, task: %s)", task.AllocID, task.Name)
|
||||
m.ops = append(m.ops, newMockConsulOp("add", task.AllocID, task.Name))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockConsulServiceClient) RemoveTask(task *consul.TaskServices) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.logger.Printf("[TEST] mock_consul: RemoveTask(%q, %q)", task.AllocID, task.Name)
|
||||
m.ops = append(m.ops, newMockConsulOp("remove", task.AllocID, task.Name))
|
||||
}
|
||||
|
||||
func (m *mockConsulServiceClient) AllocRegistrations(allocID string) (*consul.AllocRegistration, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.logger.Printf("[TEST] mock_consul: AllocRegistrations(%q)", allocID)
|
||||
m.ops = append(m.ops, newMockConsulOp("alloc_registrations", allocID, ""))
|
||||
|
||||
if m.allocRegistrationsFn != nil {
|
||||
return m.allocRegistrationsFn(allocID)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@@ -25,7 +26,7 @@ func TestFingerprintManager_Run_MockDriver(t *testing.T) {
|
||||
testClient.shutdownCh,
|
||||
testClient.updateNodeFromFingerprint,
|
||||
testClient.updateNodeFromDriver,
|
||||
testLogger(),
|
||||
testlog.Logger(t),
|
||||
)
|
||||
|
||||
err := fm.Run()
|
||||
@@ -43,7 +44,7 @@ func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) {
|
||||
require := require.New(t)
|
||||
testClient := TestClient(t, nil)
|
||||
|
||||
testClient.logger = testLogger()
|
||||
testClient.logger = testlog.Logger(t)
|
||||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
@@ -70,7 +71,7 @@ func TestFingerprintManager_Fingerprint_Run(t *testing.T) {
|
||||
require := require.New(t)
|
||||
testClient := TestClient(t, nil)
|
||||
|
||||
testClient.logger = testLogger()
|
||||
testClient.logger = testlog.Logger(t)
|
||||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
@@ -102,7 +103,7 @@ func TestFingerprintManager_Fingerprint_Periodic(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testLogger()
|
||||
testClient.logger = testlog.Logger(t)
|
||||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
@@ -164,7 +165,7 @@ func TestFingerprintManager_HealthCheck_Driver(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testLogger()
|
||||
testClient.logger = testlog.Logger(t)
|
||||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
@@ -264,7 +265,7 @@ func TestFingerprintManager_HealthCheck_Periodic(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testLogger()
|
||||
testClient.logger = testlog.Logger(t)
|
||||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
@@ -360,7 +361,7 @@ func TestFimgerprintManager_Run_InWhitelist(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testLogger()
|
||||
testClient.logger = testlog.Logger(t)
|
||||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
@@ -390,7 +391,7 @@ func TestFingerprintManager_Run_InBlacklist(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testLogger()
|
||||
testClient.logger = testlog.Logger(t)
|
||||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
@@ -422,7 +423,7 @@ func TestFingerprintManager_Run_Combination(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testLogger()
|
||||
testClient.logger = testlog.Logger(t)
|
||||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
@@ -455,7 +456,7 @@ func TestFingerprintManager_Run_WhitelistDrivers(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testLogger()
|
||||
testClient.logger = testlog.Logger(t)
|
||||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
@@ -485,7 +486,7 @@ func TestFingerprintManager_Run_AllDriversBlacklisted(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testLogger()
|
||||
testClient.logger = testlog.Logger(t)
|
||||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
@@ -519,7 +520,7 @@ func TestFingerprintManager_Run_DriversWhiteListBlacklistCombination(t *testing.
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testLogger()
|
||||
testClient.logger = testlog.Logger(t)
|
||||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
@@ -552,7 +553,7 @@ func TestFingerprintManager_Run_DriversInBlacklist(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
testClient.logger = testLogger()
|
||||
testClient.logger = testlog.Logger(t)
|
||||
defer testClient.Shutdown()
|
||||
|
||||
fm := NewFingerprintManager(
|
||||
|
||||
@@ -92,7 +92,17 @@ func TestFS_Stat(t *testing.T) {
|
||||
return false, fmt.Errorf("alloc doesn't exist")
|
||||
}
|
||||
|
||||
return len(ar.tasks) != 0, fmt.Errorf("tasks not running")
|
||||
alloc := ar.Alloc()
|
||||
running := false
|
||||
for _, s := range alloc.TaskStates {
|
||||
if s.State == structs.TaskStateRunning {
|
||||
running = true
|
||||
} else {
|
||||
running = false
|
||||
}
|
||||
}
|
||||
|
||||
return running, fmt.Errorf("tasks not running")
|
||||
}, func(err error) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
@@ -217,7 +227,17 @@ func TestFS_List(t *testing.T) {
|
||||
return false, fmt.Errorf("alloc doesn't exist")
|
||||
}
|
||||
|
||||
return len(ar.tasks) != 0, fmt.Errorf("tasks not running")
|
||||
alloc := ar.Alloc()
|
||||
running := false
|
||||
for _, s := range alloc.TaskStates {
|
||||
if s.State == structs.TaskStateRunning {
|
||||
running = true
|
||||
} else {
|
||||
running = false
|
||||
}
|
||||
}
|
||||
|
||||
return running, fmt.Errorf("tasks not running")
|
||||
}, func(err error) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocrunner"
|
||||
"github.com/hashicorp/nomad/client/stats"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
@@ -169,7 +170,7 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
|
||||
// destroyAllocRunner is used to destroy an allocation runner. It will acquire a
|
||||
// lock to restrict parallelism and then destroy the alloc runner, returning
|
||||
// once the allocation has been destroyed.
|
||||
func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason string) {
|
||||
func (a *AllocGarbageCollector) destroyAllocRunner(ar *allocrunner.AllocRunner, reason string) {
|
||||
id := "<nil>"
|
||||
if alloc := ar.Alloc(); alloc != nil {
|
||||
id = alloc.ID
|
||||
@@ -327,7 +328,7 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e
|
||||
}
|
||||
|
||||
// MarkForCollection starts tracking an allocation for Garbage Collection
|
||||
func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) {
|
||||
func (a *AllocGarbageCollector) MarkForCollection(ar *allocrunner.AllocRunner) {
|
||||
if ar.Alloc() == nil {
|
||||
a.destroyAllocRunner(ar, "alloc is nil")
|
||||
return
|
||||
@@ -342,7 +343,7 @@ func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) {
|
||||
// a PQ
|
||||
type GCAlloc struct {
|
||||
timeStamp time.Time
|
||||
allocRunner *AllocRunner
|
||||
allocRunner *allocrunner.AllocRunner
|
||||
index int
|
||||
}
|
||||
|
||||
@@ -396,7 +397,7 @@ func NewIndexedGCAllocPQ() *IndexedGCAllocPQ {
|
||||
|
||||
// Push an alloc runner into the GC queue. Returns true if alloc was added,
|
||||
// false if the alloc already existed.
|
||||
func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) bool {
|
||||
func (i *IndexedGCAllocPQ) Push(ar *allocrunner.AllocRunner) bool {
|
||||
i.pqLock.Lock()
|
||||
defer i.pqLock.Unlock()
|
||||
|
||||
|
||||
@@ -1,17 +1,6 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/stats"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
)
|
||||
|
||||
/*
|
||||
func gcConfig() *GCConfig {
|
||||
return &GCConfig{
|
||||
DiskUsageThreshold: 80,
|
||||
@@ -26,10 +15,10 @@ func TestIndexedGCAllocPQ(t *testing.T) {
|
||||
t.Parallel()
|
||||
pq := NewIndexedGCAllocPQ()
|
||||
|
||||
_, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar3 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar4 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar3 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar4 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
|
||||
pq.Push(ar1)
|
||||
pq.Push(ar2)
|
||||
@@ -105,10 +94,10 @@ func (m *MockStatsCollector) Stats() *stats.HostStats {
|
||||
|
||||
func TestAllocGarbageCollector_MarkForCollection(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testLogger()
|
||||
logger := testlog.Logger(t)
|
||||
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &MockAllocCounter{}, gcConfig())
|
||||
|
||||
_, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
gc.MarkForCollection(ar1)
|
||||
|
||||
gcAlloc := gc.allocRunners.Pop()
|
||||
@@ -119,11 +108,11 @@ func TestAllocGarbageCollector_MarkForCollection(t *testing.T) {
|
||||
|
||||
func TestAllocGarbageCollector_Collect(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testLogger()
|
||||
logger := testlog.Logger(t)
|
||||
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &MockAllocCounter{}, gcConfig())
|
||||
|
||||
_, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
|
||||
@@ -140,11 +129,11 @@ func TestAllocGarbageCollector_Collect(t *testing.T) {
|
||||
|
||||
func TestAllocGarbageCollector_CollectAll(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testLogger()
|
||||
logger := testlog.Logger(t)
|
||||
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &MockAllocCounter{}, gcConfig())
|
||||
|
||||
_, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
|
||||
@@ -157,15 +146,15 @@ func TestAllocGarbageCollector_CollectAll(t *testing.T) {
|
||||
|
||||
func TestAllocGarbageCollector_MakeRoomForAllocations_EnoughSpace(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testLogger()
|
||||
logger := testlog.Logger(t)
|
||||
statsCollector := &MockStatsCollector{}
|
||||
conf := gcConfig()
|
||||
conf.ReservedDiskMB = 20
|
||||
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
|
||||
|
||||
_, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
close(ar1.waitCh)
|
||||
_, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
close(ar2.waitCh)
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
@@ -192,15 +181,15 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_EnoughSpace(t *testing.T)
|
||||
|
||||
func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Partial(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testLogger()
|
||||
logger := testlog.Logger(t)
|
||||
statsCollector := &MockStatsCollector{}
|
||||
conf := gcConfig()
|
||||
conf.ReservedDiskMB = 20
|
||||
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
|
||||
|
||||
_, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
close(ar1.waitCh)
|
||||
_, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
close(ar2.waitCh)
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
@@ -228,15 +217,15 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Partial(t *testing.T) {
|
||||
|
||||
func TestAllocGarbageCollector_MakeRoomForAllocations_GC_All(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testLogger()
|
||||
logger := testlog.Logger(t)
|
||||
statsCollector := &MockStatsCollector{}
|
||||
conf := gcConfig()
|
||||
conf.ReservedDiskMB = 20
|
||||
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
|
||||
|
||||
_, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
close(ar1.waitCh)
|
||||
_, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
close(ar2.waitCh)
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
@@ -260,15 +249,15 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_All(t *testing.T) {
|
||||
|
||||
func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testLogger()
|
||||
logger := testlog.Logger(t)
|
||||
statsCollector := &MockStatsCollector{}
|
||||
conf := gcConfig()
|
||||
conf.ReservedDiskMB = 20
|
||||
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
|
||||
|
||||
_, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
close(ar1.waitCh)
|
||||
_, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
close(ar2.waitCh)
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
@@ -419,15 +408,15 @@ func TestAllocGarbageCollector_MaxAllocs(t *testing.T) {
|
||||
|
||||
func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testLogger()
|
||||
logger := testlog.Logger(t)
|
||||
statsCollector := &MockStatsCollector{}
|
||||
conf := gcConfig()
|
||||
conf.ReservedDiskMB = 20
|
||||
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
|
||||
|
||||
_, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
close(ar1.waitCh)
|
||||
_, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
close(ar2.waitCh)
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
@@ -451,15 +440,15 @@ func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) {
|
||||
|
||||
func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testLogger()
|
||||
logger := testlog.Logger(t)
|
||||
statsCollector := &MockStatsCollector{}
|
||||
conf := gcConfig()
|
||||
conf.ReservedDiskMB = 20
|
||||
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
|
||||
|
||||
_, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
close(ar1.waitCh)
|
||||
_, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
_, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false)
|
||||
close(ar2.waitCh)
|
||||
gc.MarkForCollection(ar1)
|
||||
gc.MarkForCollection(ar2)
|
||||
@@ -482,3 +471,4 @@ func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) {
|
||||
t.Fatalf("gcAlloc: %v", gcAlloc)
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package client
|
||||
package state
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
@@ -25,7 +25,7 @@ var (
|
||||
allocationsBucket = []byte("allocations")
|
||||
)
|
||||
|
||||
func putObject(bkt *bolt.Bucket, key []byte, obj interface{}) error {
|
||||
func PutObject(bkt *bolt.Bucket, key []byte, obj interface{}) error {
|
||||
if !bkt.Writable() {
|
||||
return fmt.Errorf("bucket must be writable")
|
||||
}
|
||||
@@ -43,7 +43,7 @@ func putObject(bkt *bolt.Bucket, key []byte, obj interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func putData(bkt *bolt.Bucket, key, value []byte) error {
|
||||
func PutData(bkt *bolt.Bucket, key, value []byte) error {
|
||||
if !bkt.Writable() {
|
||||
return fmt.Errorf("bucket must be writable")
|
||||
}
|
||||
@@ -55,7 +55,7 @@ func putData(bkt *bolt.Bucket, key, value []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getObject(bkt *bolt.Bucket, key []byte, obj interface{}) error {
|
||||
func GetObject(bkt *bolt.Bucket, key []byte, obj interface{}) error {
|
||||
// Get the data
|
||||
data := bkt.Get(key)
|
||||
if data == nil {
|
||||
@@ -70,11 +70,11 @@ func getObject(bkt *bolt.Bucket, key []byte, obj interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// getAllocationBucket returns the bucket used to persist state about a
|
||||
// GetAllocationBucket returns the bucket used to persist state about a
|
||||
// particular allocation. If the root allocation bucket or the specific
|
||||
// allocation bucket doesn't exist, it will be created as long as the
|
||||
// transaction is writable.
|
||||
func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) {
|
||||
func GetAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) {
|
||||
var err error
|
||||
w := tx.Writable()
|
||||
|
||||
@@ -108,12 +108,12 @@ func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) {
|
||||
return alloc, nil
|
||||
}
|
||||
|
||||
// getTaskBucket returns the bucket used to persist state about a
|
||||
// GetTaskBucket returns the bucket used to persist state about a
|
||||
// particular task. If the root allocation bucket, the specific
|
||||
// allocation or task bucket doesn't exist, they will be created as long as the
|
||||
// transaction is writable.
|
||||
func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error) {
|
||||
alloc, err := getAllocationBucket(tx, allocID)
|
||||
func GetTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error) {
|
||||
alloc, err := GetAllocationBucket(tx, allocID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -136,8 +136,8 @@ func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error)
|
||||
return task, nil
|
||||
}
|
||||
|
||||
// deleteAllocationBucket is used to delete an allocation bucket if it exists.
|
||||
func deleteAllocationBucket(tx *bolt.Tx, allocID string) error {
|
||||
// DeleteAllocationBucket is used to delete an allocation bucket if it exists.
|
||||
func DeleteAllocationBucket(tx *bolt.Tx, allocID string) error {
|
||||
if !tx.Writable() {
|
||||
return fmt.Errorf("transaction must be writable")
|
||||
}
|
||||
@@ -157,8 +157,8 @@ func deleteAllocationBucket(tx *bolt.Tx, allocID string) error {
|
||||
return allocations.DeleteBucket(key)
|
||||
}
|
||||
|
||||
// deleteTaskBucket is used to delete a task bucket if it exists.
|
||||
func deleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error {
|
||||
// DeleteTaskBucket is used to delete a task bucket if it exists.
|
||||
func DeleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error {
|
||||
if !tx.Writable() {
|
||||
return fmt.Errorf("transaction must be writable")
|
||||
}
|
||||
@@ -184,7 +184,7 @@ func deleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error {
|
||||
return alloc.DeleteBucket(key)
|
||||
}
|
||||
|
||||
func getAllAllocationIDs(tx *bolt.Tx) ([]string, error) {
|
||||
func GetAllAllocationIDs(tx *bolt.Tx) ([]string, error) {
|
||||
allocationsBkt := tx.Bucket(allocationsBucket)
|
||||
if allocationsBkt == nil {
|
||||
return nil, nil
|
||||
@@ -2,6 +2,7 @@ package client
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/fingerprint"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
@@ -37,8 +38,8 @@ func TestClient(t testing.T, cb func(c *config.Config)) *Client {
|
||||
|
||||
logger := testlog.Logger(t)
|
||||
catalog := consul.NewMockCatalog(logger)
|
||||
mockService := newMockConsulServiceClient(t)
|
||||
mockService.logger = logger
|
||||
mockService := consulApi.NewMockConsulServiceClient(t)
|
||||
mockService.Logger = logger
|
||||
client, err := NewClient(conf, catalog, mockService, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
@@ -73,16 +71,3 @@ func shuffleStrings(list []string) {
|
||||
list[i], list[j] = list[j], list[i]
|
||||
}
|
||||
}
|
||||
|
||||
// pre060RestoreState is used to read back in the persisted state for pre v0.6.0
|
||||
// state
|
||||
func pre060RestoreState(path string, data interface{}) error {
|
||||
buf, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := json.Unmarshal(buf, data); err != nil {
|
||||
return fmt.Errorf("failed to decode state: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
|
||||
"github.com/hashicorp/nomad/api"
|
||||
"github.com/hashicorp/nomad/api/contexts"
|
||||
"github.com/hashicorp/nomad/client"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
|
||||
"github.com/posener/complete"
|
||||
)
|
||||
|
||||
@@ -425,7 +425,7 @@ func buildDisplayMessage(event *api.TaskEvent) string {
|
||||
desc = strings.Join(parts, ", ")
|
||||
case api.TaskRestarting:
|
||||
in := fmt.Sprintf("Task restarting in %v", time.Duration(event.StartDelay))
|
||||
if event.RestartReason != "" && event.RestartReason != client.ReasonWithinPolicy {
|
||||
if event.RestartReason != "" && event.RestartReason != restarts.ReasonWithinPolicy {
|
||||
desc = fmt.Sprintf("%s - %s", event.RestartReason, in)
|
||||
} else {
|
||||
desc = in
|
||||
|
||||
Reference in New Issue
Block a user