From a62e412b88eafdb8e65a983a5f2a71431ca103fc Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 11 Jun 2018 13:33:18 -0700 Subject: [PATCH] Refactor - wip --- client/{ => allocrunner}/alloc_runner.go | 170 ++++----- .../alloc_runner_health_watcher.go | 7 +- client/{ => allocrunner}/alloc_runner_test.go | 360 ++---------------- client/{ => allocrunner}/alloc_watcher.go | 30 +- .../{ => allocrunner}/alloc_watcher_test.go | 18 +- client/{ => allocrunner}/getter/getter.go | 0 .../{ => allocrunner}/getter/getter_test.go | 0 .../getter/test-fixtures/archive.tar.gz | Bin .../test-fixtures/archive/exist/my.config | 0 .../test-fixtures/archive/new/my.config | 0 .../getter/test-fixtures/archive/test.sh | 0 .../getter/test-fixtures/test.sh | 0 .../taskrunner}/consul_template.go | 2 +- .../taskrunner}/consul_template_test.go | 2 +- client/allocrunner/taskrunner/getters.go | 19 + .../taskrunner/restarts}/restarts.go | 11 +- .../taskrunner/restarts}/restarts_test.go | 24 +- .../taskrunner}/task_runner.go | 61 ++- .../taskrunner}/task_runner_test.go | 42 +- .../taskrunner}/task_runner_unix_test.go | 2 +- client/allocrunner/testing.go | 65 ++++ client/client.go | 37 +- client/client_test.go | 5 +- client/{ => consul}/consul.go | 2 +- client/consul/consul_testing.go | 86 +++++ client/consul_testing.go | 86 ----- client/fingerprint_manager_test.go | 27 +- client/fs_endpoint_test.go | 24 +- client/gc.go | 9 +- client/gc_test.go | 74 ++-- client/{ => state}/state_database.go | 28 +- client/testing.go | 5 +- client/util.go | 15 - command/alloc_status.go | 4 +- 34 files changed, 490 insertions(+), 725 deletions(-) rename client/{ => allocrunner}/alloc_runner.go (89%) rename client/{ => allocrunner}/alloc_runner_health_watcher.go (98%) rename client/{ => allocrunner}/alloc_runner_test.go (79%) rename client/{ => allocrunner}/alloc_watcher.go (94%) rename client/{ => allocrunner}/alloc_watcher_test.go (95%) rename client/{ => allocrunner}/getter/getter.go (100%) rename client/{ => allocrunner}/getter/getter_test.go (100%) rename client/{ => allocrunner}/getter/test-fixtures/archive.tar.gz (100%) rename client/{ => allocrunner}/getter/test-fixtures/archive/exist/my.config (100%) rename client/{ => allocrunner}/getter/test-fixtures/archive/new/my.config (100%) rename client/{ => allocrunner}/getter/test-fixtures/archive/test.sh (100%) rename client/{ => allocrunner}/getter/test-fixtures/test.sh (100%) rename client/{ => allocrunner/taskrunner}/consul_template.go (99%) rename client/{ => allocrunner/taskrunner}/consul_template_test.go (99%) create mode 100644 client/allocrunner/taskrunner/getters.go rename client/{ => allocrunner/taskrunner/restarts}/restarts.go (95%) rename client/{ => allocrunner/taskrunner/restarts}/restarts_test.go (91%) rename client/{ => allocrunner/taskrunner}/task_runner.go (97%) rename client/{ => allocrunner/taskrunner}/task_runner_test.go (98%) rename client/{ => allocrunner/taskrunner}/task_runner_unix_test.go (99%) create mode 100644 client/allocrunner/testing.go rename client/{ => consul}/consul.go (96%) create mode 100644 client/consul/consul_testing.go delete mode 100644 client/consul_testing.go rename client/{ => state}/state_database.go (83%) diff --git a/client/alloc_runner.go b/client/allocrunner/alloc_runner.go similarity index 89% rename from client/alloc_runner.go rename to client/allocrunner/alloc_runner.go index aeb284a3b..3a4b5a417 100644 --- a/client/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -1,10 +1,9 @@ -package client +package allocrunner import ( "context" "fmt" "log" - "os" "path/filepath" "sync" "time" @@ -13,7 +12,10 @@ import ( "github.com/boltdb/bolt" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner" "github.com/hashicorp/nomad/client/config" + consulApi "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" @@ -60,7 +62,7 @@ type AllocRunner struct { allocDir *allocdir.AllocDir allocDirLock sync.Mutex - tasks map[string]*TaskRunner + tasks map[string]*taskrunner.TaskRunner taskStates map[string]*structs.TaskState restored map[string]struct{} taskLock sync.RWMutex @@ -70,7 +72,7 @@ type AllocRunner struct { updateCh chan *structs.Allocation vaultClient vaultclient.VaultClient - consulClient ConsulServiceAPI + consulClient consulApi.ConsulServiceAPI // prevAlloc allows for Waiting until a previous allocation exits and // the migrates it data. If sticky volumes aren't used and there's no @@ -154,7 +156,7 @@ type allocRunnerMutableState struct { // NewAllocRunner is used to create a new allocation context func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater, - alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI, + alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient consulApi.ConsulServiceAPI, prevAlloc prevAllocWatcher) *AllocRunner { ar := &AllocRunner{ @@ -168,7 +170,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, prevAlloc: prevAlloc, dirtyCh: make(chan struct{}, 1), allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)), - tasks: make(map[string]*TaskRunner), + tasks: make(map[string]*taskrunner.TaskRunner), taskStates: copyTaskStates(alloc.TaskStates), restored: make(map[string]struct{}), updateCh: make(chan *structs.Allocation, 64), @@ -220,82 +222,44 @@ func (r *AllocRunner) pre060StateFilePath() string { // RestoreState is used to restore the state of the alloc runner func (r *AllocRunner) RestoreState() error { - - // COMPAT: Remove in 0.7.0 - // Check if the old snapshot is there - oldPath := r.pre060StateFilePath() - var snap allocRunnerState - var upgrading bool - if err := pre060RestoreState(oldPath, &snap); err == nil { - // Restore fields - r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.allocID) - r.alloc = snap.Alloc - r.allocDir = snap.AllocDir - r.allocClientStatus = snap.AllocClientStatus - r.allocClientDescription = snap.AllocClientDescription - - if r.alloc != nil { - r.taskStates = snap.Alloc.TaskStates - } - - // COMPAT: Remove in 0.7.0 - // #2132 Upgrade path: if snap.AllocDir is nil, try to convert old - // Context struct to new AllocDir struct - if snap.AllocDir == nil && snap.Context != nil { - r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.allocID) - r.allocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir) - for taskName := range snap.Context.AllocDir.TaskDirs { - r.allocDir.NewTaskDir(taskName) - } - } - - // Delete the old state - os.RemoveAll(oldPath) - upgrading = true - } else if !os.IsNotExist(err) { - // Something corrupt in the old state file - return err - } else { - // We are doing a normal restore - err := r.stateDB.View(func(tx *bolt.Tx) error { - bkt, err := getAllocationBucket(tx, r.allocID) - if err != nil { - return fmt.Errorf("failed to get allocation bucket: %v", err) - } - - // Get the state objects - var mutable allocRunnerMutableState - var immutable allocRunnerImmutableState - var allocState allocRunnerAllocState - var allocDir allocdir.AllocDir - - if err := getObject(bkt, allocRunnerStateAllocKey, &allocState); err != nil { - return fmt.Errorf("failed to read alloc runner alloc state: %v", err) - } - if err := getObject(bkt, allocRunnerStateImmutableKey, &immutable); err != nil { - return fmt.Errorf("failed to read alloc runner immutable state: %v", err) - } - if err := getObject(bkt, allocRunnerStateMutableKey, &mutable); err != nil { - return fmt.Errorf("failed to read alloc runner mutable state: %v", err) - } - if err := getObject(bkt, allocRunnerStateAllocDirKey, &allocDir); err != nil { - return fmt.Errorf("failed to read alloc runner alloc_dir state: %v", err) - } - - // Populate the fields - r.alloc = allocState.Alloc - r.allocDir = &allocDir - r.allocClientStatus = mutable.AllocClientStatus - r.allocClientDescription = mutable.AllocClientDescription - r.taskStates = mutable.TaskStates - r.alloc.ClientStatus = getClientStatus(r.taskStates) - r.alloc.DeploymentStatus = mutable.DeploymentStatus - return nil - }) - + err := r.stateDB.View(func(tx *bolt.Tx) error { + bkt, err := state.GetAllocationBucket(tx, r.allocID) if err != nil { - return fmt.Errorf("failed to read allocation state: %v", err) + return fmt.Errorf("failed to get allocation bucket: %v", err) } + + // Get the state objects + var mutable allocRunnerMutableState + var immutable allocRunnerImmutableState + var allocState allocRunnerAllocState + var allocDir allocdir.AllocDir + + if err := state.GetObject(bkt, allocRunnerStateAllocKey, &allocState); err != nil { + return fmt.Errorf("failed to read alloc runner alloc state: %v", err) + } + if err := state.GetObject(bkt, allocRunnerStateImmutableKey, &immutable); err != nil { + return fmt.Errorf("failed to read alloc runner immutable state: %v", err) + } + if err := state.GetObject(bkt, allocRunnerStateMutableKey, &mutable); err != nil { + return fmt.Errorf("failed to read alloc runner mutable state: %v", err) + } + if err := state.GetObject(bkt, allocRunnerStateAllocDirKey, &allocDir); err != nil { + return fmt.Errorf("failed to read alloc runner alloc_dir state: %v", err) + } + + // Populate the fields + r.alloc = allocState.Alloc + r.allocDir = &allocDir + r.allocClientStatus = mutable.AllocClientStatus + r.allocClientDescription = mutable.AllocClientDescription + r.taskStates = mutable.TaskStates + r.alloc.ClientStatus = getClientStatus(r.taskStates) + r.alloc.DeploymentStatus = mutable.DeploymentStatus + return nil + }) + + if err != nil { + return fmt.Errorf("failed to read allocation state: %v", err) } var snapshotErrors multierror.Error @@ -344,7 +308,7 @@ func (r *AllocRunner) RestoreState() error { continue } - tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient) + tr := taskrunner.NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient) r.tasks[name] = tr if restartReason, err := tr.RestoreState(); err != nil { @@ -354,12 +318,6 @@ func (r *AllocRunner) RestoreState() error { // Only start if the alloc isn't in a terminal status. go tr.Run() - if upgrading { - if err := tr.SaveState(); err != nil { - r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.allocID, name, err) - } - } - // Restart task runner if RestoreState gave a reason if restartReason != "" { r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.allocID, name, restartReason) @@ -389,7 +347,7 @@ func (r *AllocRunner) SaveState() error { for _, tr := range runners { if err := tr.SaveState(); err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("failed to save state for alloc %s task %q: %v", - r.allocID, tr.task.Name, err)) + r.allocID, tr.Name(), err)) } } return mErr.ErrorOrNil() @@ -419,7 +377,7 @@ func (r *AllocRunner) saveAllocRunnerState() error { return r.stateDB.Batch(func(tx *bolt.Tx) error { // Grab the allocation bucket - allocBkt, err := getAllocationBucket(tx, r.allocID) + allocBkt, err := state.GetAllocationBucket(tx, r.allocID) if err != nil { return fmt.Errorf("failed to retrieve allocation bucket: %v", err) } @@ -433,7 +391,7 @@ func (r *AllocRunner) saveAllocRunnerState() error { Alloc: alloc, } - if err := putObject(allocBkt, allocRunnerStateAllocKey, &allocState); err != nil { + if err := state.PutObject(allocBkt, allocRunnerStateAllocKey, &allocState); err != nil { return fmt.Errorf("failed to write alloc_runner alloc state: %v", err) } @@ -450,7 +408,7 @@ func (r *AllocRunner) saveAllocRunnerState() error { Version: r.config.Version.VersionNumber(), } - if err := putObject(allocBkt, allocRunnerStateImmutableKey, &immutable); err != nil { + if err := state.PutObject(allocBkt, allocRunnerStateImmutableKey, &immutable); err != nil { return fmt.Errorf("failed to write alloc_runner immutable state: %v", err) } @@ -461,7 +419,7 @@ func (r *AllocRunner) saveAllocRunnerState() error { // Write the alloc dir data if it hasn't been written before and it exists. if !r.allocDirPersisted && allocDir != nil { - if err := putObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil { + if err := state.PutObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil { return fmt.Errorf("failed to write alloc_runner allocDir state: %v", err) } @@ -478,7 +436,7 @@ func (r *AllocRunner) saveAllocRunnerState() error { DeploymentStatus: alloc.DeploymentStatus, } - if err := putObject(allocBkt, allocRunnerStateMutableKey, &mutable); err != nil { + if err := state.PutObject(allocBkt, allocRunnerStateMutableKey, &mutable); err != nil { return fmt.Errorf("failed to write alloc_runner mutable state: %v", err) } @@ -492,7 +450,7 @@ func (r *AllocRunner) DestroyState() error { defer r.allocStateLock.Unlock() return r.stateDB.Update(func(tx *bolt.Tx) error { - if err := deleteAllocationBucket(tx, r.allocID); err != nil { + if err := state.DeleteAllocationBucket(tx, r.allocID); err != nil { return fmt.Errorf("failed to delete allocation bucket: %v", err) } return nil @@ -754,14 +712,14 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv // Find all tasks that are not the one that is dead and check if the one // that is dead is a leader - var otherTaskRunners []*TaskRunner + var otherTaskRunners []*taskrunner.TaskRunner var otherTaskNames []string leader := false for task, tr := range r.tasks { if task != taskName { otherTaskRunners = append(otherTaskRunners, tr) otherTaskNames = append(otherTaskNames, task) - } else if tr.task.Leader { + } else if tr.IsLeader() { leader = true } } @@ -922,7 +880,7 @@ func (r *AllocRunner) Run() { taskdir := r.allocDir.NewTaskDir(task.Name) r.allocDirLock.Unlock() - tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient) + tr := taskrunner.NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient) r.tasks[task.Name] = tr tr.MarkReceived() @@ -1118,11 +1076,11 @@ func (r *AllocRunner) StatsReporter() AllocStatsReporter { // getTaskRunners is a helper that returns a copy of the task runners list using // the taskLock. -func (r *AllocRunner) getTaskRunners() []*TaskRunner { +func (r *AllocRunner) getTaskRunners() []*taskrunner.TaskRunner { // Get the task runners r.taskLock.RLock() defer r.taskLock.RUnlock() - runners := make([]*TaskRunner, 0, len(r.tasks)) + runners := make([]*taskrunner.TaskRunner, 0, len(r.tasks)) for _, tr := range r.tasks { runners = append(runners, tr) } @@ -1156,7 +1114,7 @@ func (r *AllocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResour for _, tr := range runners { l := tr.LatestResourceUsage() if l != nil { - astat.Tasks[tr.task.Name] = l + astat.Tasks[tr.Name()] = l flat = append(flat, l) if l.Timestamp > astat.Timestamp { astat.Timestamp = l.Timestamp @@ -1181,9 +1139,9 @@ func sumTaskResourceUsage(usages []*cstructs.TaskResourceUsage) *cstructs.Resour return summed } -// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and +// ShouldUpdate takes the AllocModifyIndex of an allocation sent from the server and // checks if the current running allocation is behind and should be updated. -func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool { +func (r *AllocRunner) ShouldUpdate(serverIndex uint64) bool { r.allocLock.Lock() defer r.allocLock.Unlock() return r.alloc.AllocModifyIndex < serverIndex @@ -1215,3 +1173,11 @@ func (r *AllocRunner) IsDestroyed() bool { func (r *AllocRunner) WaitCh() <-chan struct{} { return r.waitCh } + +// AllocID returns the allocation ID of the allocation being run +func (r *AllocRunner) AllocID() string { + if r == nil { + return "" + } + return r.allocID +} diff --git a/client/alloc_runner_health_watcher.go b/client/allocrunner/alloc_runner_health_watcher.go similarity index 98% rename from client/alloc_runner_health_watcher.go rename to client/allocrunner/alloc_runner_health_watcher.go index ede4eaeaa..81c726f73 100644 --- a/client/alloc_runner_health_watcher.go +++ b/client/allocrunner/alloc_runner_health_watcher.go @@ -1,4 +1,4 @@ -package client +package allocrunner import ( "context" @@ -9,6 +9,7 @@ import ( "time" "github.com/hashicorp/consul/api" + consulApi "github.com/hashicorp/nomad/client/consul" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" @@ -160,7 +161,7 @@ type allocHealthTracker struct { allocUpdates *cstructs.AllocListener // consulClient is used to look up the state of the task's checks - consulClient ConsulServiceAPI + consulClient consulApi.ConsulServiceAPI // healthy is used to signal whether we have determined the allocation to be // healthy or unhealthy @@ -191,7 +192,7 @@ type allocHealthTracker struct { // alloc listener and consul API object are given so that the watcher can detect // health changes. func newAllocHealthTracker(parentCtx context.Context, logger *log.Logger, alloc *structs.Allocation, - allocUpdates *cstructs.AllocListener, consulClient ConsulServiceAPI, + allocUpdates *cstructs.AllocListener, consulClient consulApi.ConsulServiceAPI, minHealthyTime time.Duration, useChecks bool) *allocHealthTracker { a := &allocHealthTracker{ diff --git a/client/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go similarity index 79% rename from client/alloc_runner_test.go rename to client/allocrunner/alloc_runner_test.go index 34f483feb..f8a8d048e 100644 --- a/client/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -1,4 +1,4 @@ -package client +package allocrunner import ( "fmt" @@ -6,93 +6,34 @@ import ( "os" "path/filepath" "strings" - "sync" "testing" - "text/template" "time" "github.com/boltdb/bolt" "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" - "github.com/hashicorp/nomad/version" - "github.com/kr/pretty" "github.com/stretchr/testify/assert" - "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/client/vaultclient" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner" + consulApi "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/state" "github.com/stretchr/testify/require" ) -type MockAllocStateUpdater struct { - Allocs []*structs.Allocation - mu sync.Mutex -} - -// Update fulfills the TaskStateUpdater interface -func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) { - m.mu.Lock() - m.Allocs = append(m.Allocs, alloc) - m.mu.Unlock() -} - -// Last returns a copy of the last alloc (or nil) sync'd -func (m *MockAllocStateUpdater) Last() *structs.Allocation { - m.mu.Lock() - defer m.mu.Unlock() - n := len(m.Allocs) - if n == 0 { - return nil - } - return m.Allocs[n-1].Copy() -} - // allocationBucketExists checks if the allocation bucket was created. func allocationBucketExists(tx *bolt.Tx, allocID string) bool { - allocations := tx.Bucket(allocationsBucket) - if allocations == nil { - return false - } - - // Retrieve the specific allocations bucket - alloc := allocations.Bucket([]byte(allocID)) - return alloc != nil -} - -func testAllocRunnerFromAlloc(t *testing.T, alloc *structs.Allocation, restarts bool) (*MockAllocStateUpdater, *AllocRunner) { - conf := config.DefaultConfig() - conf.Node = mock.Node() - conf.StateDir = os.TempDir() - conf.AllocDir = os.TempDir() - tmp, _ := ioutil.TempFile("", "state-db") - db, _ := bolt.Open(tmp.Name(), 0600, nil) - upd := &MockAllocStateUpdater{} - if !restarts { - *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} - alloc.Job.Type = structs.JobTypeBatch - } - vclient := vaultclient.NewMockVaultClient() - ar := NewAllocRunner(testlog.Logger(t), conf, db, upd.Update, alloc, vclient, newMockConsulServiceClient(t), noopPrevAlloc{}) - return upd, ar -} - -func testAllocRunner(t *testing.T, restarts bool) (*MockAllocStateUpdater, *AllocRunner) { - // Use mock driver - alloc := mock.Alloc() - task := alloc.Job.TaskGroups[0].Tasks[0] - task.Driver = "mock_driver" - task.Config["run_for"] = "500ms" - return testAllocRunnerFromAlloc(t, alloc, restarts) + bucket, err := state.GetAllocationBucket(tx, allocID) + return err == nil && bucket != nil } func TestAllocRunner_SimpleRun(t *testing.T) { t.Parallel() - upd, ar := testAllocRunner(t, false) + upd, ar := TestAllocRunner(t, false) go ar.Run() defer ar.Destroy() @@ -114,7 +55,7 @@ func TestAllocRunner_SimpleRun(t *testing.T) { func TestAllocRunner_FinishedAtSet(t *testing.T) { t.Parallel() require := require.New(t) - _, ar := testAllocRunner(t, false) + _, ar := TestAllocRunner(t, false) ar.allocClientStatus = structs.AllocClientStatusFailed alloc := ar.Alloc() taskFinishedAt := make(map[string]time.Time) @@ -136,7 +77,7 @@ func TestAllocRunner_FinishedAtSet(t *testing.T) { func TestAllocRunner_FinishedAtSet_TaskEvents(t *testing.T) { t.Parallel() require := require.New(t) - _, ar := testAllocRunner(t, false) + _, ar := TestAllocRunner(t, false) ar.taskStates[ar.alloc.Job.TaskGroups[0].Tasks[0].Name] = &structs.TaskState{State: structs.TaskStateDead, Failed: true} alloc := ar.Alloc() @@ -161,7 +102,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_BadStart(t *testing.T) { assert := assert.New(t) // Ensure the task fails and restarts - upd, ar := testAllocRunner(t, true) + upd, ar := TestAllocRunner(t, true) // Make the task fail task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -208,7 +149,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Deadline(t *testing.T) { t.Parallel() // Don't restart but force service job type - upd, ar := testAllocRunner(t, false) + upd, ar := TestAllocRunner(t, false) ar.alloc.Job.Type = structs.JobTypeService // Make the task block @@ -268,7 +209,7 @@ func TestAllocRunner_DeploymentHealth_Healthy_NoChecks(t *testing.T) { t.Parallel() // Ensure the task fails and restarts - upd, ar := testAllocRunner(t, true) + upd, ar := TestAllocRunner(t, true) // Make the task run healthy task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -316,7 +257,7 @@ func TestAllocRunner_DeploymentHealth_Healthy_Checks(t *testing.T) { t.Parallel() // Ensure the task fails and restarts - upd, ar := testAllocRunner(t, true) + upd, ar := TestAllocRunner(t, true) // Make the task fail task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -347,7 +288,7 @@ func TestAllocRunner_DeploymentHealth_Healthy_Checks(t *testing.T) { // Only return the check as healthy after a duration trigger := time.After(500 * time.Millisecond) - ar.consulClient.(*mockConsulServiceClient).allocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) { + ar.consulClient.(*consulApi.MockConsulServiceClient).AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) { select { case <-trigger: return &consul.AllocRegistration{ @@ -409,7 +350,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) { assert := assert.New(t) // Ensure the task fails and restarts - upd, ar := testAllocRunner(t, true) + upd, ar := TestAllocRunner(t, true) // Make the task fail task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -430,7 +371,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) { } // Only return the check as healthy after a duration - ar.consulClient.(*mockConsulServiceClient).allocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) { + ar.consulClient.(*consulApi.MockConsulServiceClient).AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) { return &consul.AllocRegistration{ Tasks: map[string]*consul.TaskRegistration{ task.Name: { @@ -478,7 +419,7 @@ func TestAllocRunner_DeploymentHealth_Healthy_UpdatedDeployment(t *testing.T) { t.Parallel() // Ensure the task fails and restarts - upd, ar := testAllocRunner(t, true) + upd, ar := TestAllocRunner(t, true) // Make the task run healthy task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -535,7 +476,7 @@ func TestAllocRunner_DeploymentHealth_Healthy_Migration(t *testing.T) { t.Parallel() // Ensure the task fails and restarts - upd, ar := testAllocRunner(t, true) + upd, ar := TestAllocRunner(t, true) // Make the task run healthy tg := ar.alloc.Job.TaskGroups[0] @@ -588,7 +529,7 @@ func TestAllocRunner_DeploymentHealth_BatchDisabled(t *testing.T) { task := tg.Tasks[0] task.Driver = "mock_driver" task.Config["run_for"] = "5s" - upd, ar := testAllocRunnerFromAlloc(t, alloc, false) + upd, ar := TestAllocRunnerFromAlloc(t, alloc, false) go ar.Run() defer ar.Destroy() @@ -634,7 +575,7 @@ func TestAllocRunner_RetryArtifact(t *testing.T) { } alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, badtask) - upd, ar := testAllocRunnerFromAlloc(t, alloc, true) + upd, ar := TestAllocRunnerFromAlloc(t, alloc, true) go ar.Run() defer ar.Destroy() @@ -673,7 +614,7 @@ func TestAllocRunner_RetryArtifact(t *testing.T) { func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { t.Parallel() - upd, ar := testAllocRunner(t, false) + upd, ar := TestAllocRunner(t, false) // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -772,7 +713,7 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { func TestAllocRunner_Destroy(t *testing.T) { t.Parallel() - upd, ar := testAllocRunner(t, false) + upd, ar := TestAllocRunner(t, false) // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -828,7 +769,7 @@ func TestAllocRunner_Destroy(t *testing.T) { func TestAllocRunner_Update(t *testing.T) { t.Parallel() - _, ar := testAllocRunner(t, false) + _, ar := TestAllocRunner(t, false) // Deep copy the alloc to avoid races when updating newAlloc := ar.Alloc().Copy() @@ -863,7 +804,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { "run_for": "10s", } - upd, ar := testAllocRunnerFromAlloc(t, alloc, false) + upd, ar := TestAllocRunnerFromAlloc(t, alloc, false) go ar.Run() defer ar.Destroy() @@ -882,9 +823,9 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { } // Create a new alloc runner - l2 := prefixedTestLogger("----- ar2: ") + l2 := testlog.WithPrefix(t, "----- ar2: ") alloc2 := &structs.Allocation{ID: ar.alloc.ID} - prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2, "") + prevAlloc := NewAllocWatcher(alloc2, ar, nil, ar.config, l2, "") ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc) err = ar2.RestoreState() @@ -931,8 +872,8 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { t.Parallel() - upd, ar := testAllocRunner(t, false) - ar.logger = prefixedTestLogger("ar1: ") + upd, ar := TestAllocRunner(t, false) + ar.logger = testlog.WithPrefix(t, "ar1: ") // Ensure task takes some time ar.alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" @@ -977,9 +918,9 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { defer ar.allocLock.Unlock() // Create a new alloc runner - l2 := prefixedTestLogger("ar2: ") + l2 := testlog.WithPrefix(t, "ar2: ") alloc2 := &structs.Allocation{ID: ar.alloc.ID} - prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2, "") + prevAlloc := NewAllocWatcher(alloc2, ar, nil, ar.config, l2, "") ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc) err = ar2.RestoreState() @@ -1052,230 +993,9 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { }) } -// TestAllocRunner_SaveRestoreState_Upgrade asserts that pre-0.6 exec tasks are -// restarted on upgrade. -func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) { - t.Parallel() - alloc := mock.Alloc() - task := alloc.Job.TaskGroups[0].Tasks[0] - task.Driver = "mock_driver" - task.Config = map[string]interface{}{ - "exit_code": "0", - "run_for": "10s", - } - - upd, ar := testAllocRunnerFromAlloc(t, alloc, false) - // Hack in old version to cause an upgrade on RestoreState - origConfig := ar.config.Copy() - ar.config.Version = &version.VersionInfo{Version: "0.5.6"} - go ar.Run() - defer ar.Destroy() - - // Snapshot state - testutil.WaitForResult(func() (bool, error) { - last := upd.Last() - if last == nil { - return false, fmt.Errorf("No updates") - } - - if last.ClientStatus != structs.AllocClientStatusRunning { - return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) - } - return true, nil - }, func(err error) { - t.Fatalf("task never started: %v", err) - }) - - err := ar.SaveState() - if err != nil { - t.Fatalf("err: %v", err) - } - - // Create a new alloc runner - l2 := prefixedTestLogger("ar2: ") - alloc2 := &structs.Allocation{ID: ar.alloc.ID} - prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2, "") - ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc) - err = ar2.RestoreState() - if err != nil { - t.Fatalf("err: %v", err) - } - go ar2.Run() - defer ar2.Destroy() // Just-in-case of failure before Destroy below - - testutil.WaitForResult(func() (bool, error) { - last := upd.Last() - if last == nil { - return false, fmt.Errorf("No updates") - } - for _, ev := range last.TaskStates["web"].Events { - if strings.HasSuffix(ev.RestartReason, pre06ScriptCheckReason) { - return true, nil - } - } - return false, fmt.Errorf("no restart with proper reason found") - }, func(err error) { - last := upd.Last() - t.Fatalf("err: %v\nweb state: % #v", err, pretty.Formatter(last.TaskStates["web"])) - }) - - // Destroy and wait - ar2.Destroy() - start := time.Now() - - testutil.WaitForResult(func() (bool, error) { - alloc := ar2.Alloc() - if alloc.ClientStatus != structs.AllocClientStatusComplete { - return false, fmt.Errorf("Bad client status; got %v; want %v", alloc.ClientStatus, structs.AllocClientStatusComplete) - } - return true, nil - }, func(err error) { - last := upd.Last() - t.Fatalf("err: %v %#v %#v", err, last, last.TaskStates) - }) - - if time.Since(start) > time.Duration(testutil.TestMultiplier()*5)*time.Second { - t.Fatalf("took too long to terminate") - } -} - -// Ensure pre-#2132 state files containing the Context struct are properly -// migrated to the new format. -// -// Old Context State: -// -// "Context": { -// "AllocDir": { -// "AllocDir": "/path/to/allocs/2a54fcff-fc44-8d4f-e025-53c48e9cbbbb", -// "SharedDir": "/path/to/allocs/2a54fcff-fc44-8d4f-e025-53c48e9cbbbb/alloc", -// "TaskDirs": { -// "echo1": "/path/to/allocs/2a54fcff-fc44-8d4f-e025-53c48e9cbbbb/echo1" -// } -// }, -// "AllocID": "2a54fcff-fc44-8d4f-e025-53c48e9cbbbb" -// } -func TestAllocRunner_RestoreOldState(t *testing.T) { - t.Parallel() - alloc := mock.Alloc() - task := alloc.Job.TaskGroups[0].Tasks[0] - task.Driver = "mock_driver" - task.Config = map[string]interface{}{ - "exit_code": "0", - "run_for": "10s", - } - - logger := testLogger() - conf := config.DefaultConfig() - conf.Node = mock.Node() - conf.StateDir = os.TempDir() - conf.AllocDir = os.TempDir() - tmp, err := ioutil.TempFile("", "state-db") - if err != nil { - t.Fatalf("error creating state db file: %v", err) - } - db, err := bolt.Open(tmp.Name(), 0600, nil) - if err != nil { - t.Fatalf("error creating state db: %v", err) - } - - if err := os.MkdirAll(filepath.Join(conf.StateDir, "alloc", alloc.ID), 0777); err != nil { - t.Fatalf("error creating state dir: %v", err) - } - statePath := filepath.Join(conf.StateDir, "alloc", alloc.ID, "state.json") - w, err := os.Create(statePath) - if err != nil { - t.Fatalf("error creating state file: %v", err) - } - tmplctx := &struct { - AllocID string - AllocDir string - }{alloc.ID, conf.AllocDir} - err = template.Must(template.New("test_state").Parse(`{ - "Version": "0.5.1", - "Alloc": { - "ID": "{{ .AllocID }}", - "Name": "example", - "JobID": "example", - "Job": { - "ID": "example", - "Name": "example", - "Type": "batch", - "TaskGroups": [ - { - "Name": "example", - "Tasks": [ - { - "Name": "example", - "Driver": "mock", - "Config": { - "exit_code": "0", - "run_for": "10s" - } - } - ] - } - ] - }, - "TaskGroup": "example", - "DesiredStatus": "run", - "ClientStatus": "running", - "TaskStates": { - "example": { - "State": "running", - "Failed": false, - "Events": [] - } - } - }, - "Context": { - "AllocDir": { - "AllocDir": "{{ .AllocDir }}/{{ .AllocID }}", - "SharedDir": "{{ .AllocDir }}/{{ .AllocID }}/alloc", - "TaskDirs": { - "example": "{{ .AllocDir }}/{{ .AllocID }}/example" - } - }, - "AllocID": "{{ .AllocID }}" - } -}`)).Execute(w, tmplctx) - if err != nil { - t.Fatalf("error writing state file: %v", err) - } - w.Close() - - upd := &MockAllocStateUpdater{} - *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} - alloc.Job.Type = structs.JobTypeBatch - vclient := vaultclient.NewMockVaultClient() - cclient := newMockConsulServiceClient(t) - ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, cclient, noopPrevAlloc{}) - defer ar.Destroy() - - // RestoreState should fail on the task state since we only test the - // alloc state restoring. - err = ar.RestoreState() - if err == nil { - t.Fatal("expected error restoring Task state") - } - merr, ok := err.(*multierror.Error) - if !ok { - t.Fatalf("expected RestoreState to return a multierror but found: %T -> %v", err, err) - } - if len(merr.Errors) != 1 { - t.Fatalf("expected exactly 1 error from RestoreState but found: %d: %v", len(merr.Errors), err) - } - if expected := "failed to get task bucket"; !strings.Contains(merr.Errors[0].Error(), expected) { - t.Fatalf("expected %q but got: %q", expected, merr.Errors[0].Error()) - } - - if err := ar.SaveState(); err != nil { - t.Fatalf("error saving new state: %v", err) - } -} - func TestAllocRunner_TaskFailed_KillTG(t *testing.T) { t.Parallel() - upd, ar := testAllocRunner(t, false) + upd, ar := TestAllocRunner(t, false) // Create two tasks in the task group task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -1343,7 +1063,7 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) { func TestAllocRunner_TaskLeader_KillTG(t *testing.T) { t.Parallel() - upd, ar := testAllocRunner(t, false) + upd, ar := TestAllocRunner(t, false) // Create two tasks in the task group task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -1417,7 +1137,7 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) { // with a leader the leader is stopped before other tasks. func TestAllocRunner_TaskLeader_StopTG(t *testing.T) { t.Parallel() - upd, ar := testAllocRunner(t, false) + upd, ar := TestAllocRunner(t, false) // Create 3 tasks in the task group task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -1510,7 +1230,7 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) { // See https://github.com/hashicorp/nomad/issues/3420#issuecomment-341666932 func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) { t.Parallel() - _, ar := testAllocRunner(t, false) + _, ar := TestAllocRunner(t, false) defer ar.Destroy() // Create a leader and follower task in the task group @@ -1535,11 +1255,11 @@ func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) { ar.alloc.TaskResources[task2.Name] = task2.Resources // Mimic Nomad exiting before the leader stopping is able to stop other tasks. - ar.tasks = map[string]*TaskRunner{ - "leader": NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState, + ar.tasks = map[string]*taskrunner.TaskRunner{ + "leader": taskrunner.NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState, ar.allocDir.NewTaskDir(task2.Name), ar.Alloc(), task2.Copy(), ar.vaultClient, ar.consulClient), - "follower1": NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState, + "follower1": taskrunner.NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState, ar.allocDir.NewTaskDir(task.Name), ar.Alloc(), task.Copy(), ar.vaultClient, ar.consulClient), } @@ -1606,7 +1326,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { task.Config = map[string]interface{}{ "run_for": "1s", } - upd, ar := testAllocRunnerFromAlloc(t, alloc, false) + upd, ar := TestAllocRunnerFromAlloc(t, alloc, false) go ar.Run() defer ar.Destroy() @@ -1639,10 +1359,10 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { task.Config = map[string]interface{}{ "run_for": "1s", } - upd2, ar2 := testAllocRunnerFromAlloc(t, alloc2, false) + upd2, ar2 := TestAllocRunnerFromAlloc(t, alloc2, false) // Set prevAlloc like Client does - ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger, "") + ar2.prevAlloc = NewAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger, "") go ar2.Run() defer ar2.Destroy() diff --git a/client/alloc_watcher.go b/client/allocrunner/alloc_watcher.go similarity index 94% rename from client/alloc_watcher.go rename to client/allocrunner/alloc_watcher.go index e84db9ed2..8fdd6bf27 100644 --- a/client/alloc_watcher.go +++ b/client/allocrunner/alloc_watcher.go @@ -1,4 +1,4 @@ -package client +package allocrunner import ( "archive/tar" @@ -20,6 +20,12 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // getRemoteRetryIntv is minimum interval on which we retry + // to fetch remote objects. We pick a value between this and 2x this. + getRemoteRetryIntv = 30 * time.Second +) + // rpcer is the interface needed by a prevAllocWatcher to make RPC calls. type rpcer interface { // RPC allows retrieving remote allocs. @@ -49,13 +55,13 @@ type prevAllocWatcher interface { IsMigrating() bool } -// newAllocWatcher creates a prevAllocWatcher appropriate for whether this +// NewAllocWatcher creates a prevAllocWatcher appropriate for whether this // alloc's previous allocation was local or remote. If this alloc has no // previous alloc then a noop implementation is returned. -func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger, migrateToken string) prevAllocWatcher { +func NewAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger, migrateToken string) prevAllocWatcher { if alloc.PreviousAllocation == "" { // No previous allocation, use noop transitioner - return noopPrevAlloc{} + return NoopPrevAlloc{} } tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) @@ -295,7 +301,7 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error { err := p.rpc.RPC("Alloc.GetAlloc", &req, &resp) if err != nil { p.logger.Printf("[ERR] client: failed to query previous alloc %q: %v", p.prevAllocID, err) - retry := getAllocRetryIntv + lib.RandomStagger(getAllocRetryIntv) + retry := getRemoteRetryIntv + lib.RandomStagger(getRemoteRetryIntv) select { case <-time.After(retry): continue @@ -386,7 +392,7 @@ func (p *remotePrevAlloc) getNodeAddr(ctx context.Context, nodeID string) (strin err := p.rpc.RPC("Node.GetNode", &req, &resp) if err != nil { p.logger.Printf("[ERR] client: failed to query node info %q: %v", nodeID, err) - retry := getAllocRetryIntv + lib.RandomStagger(getAllocRetryIntv) + retry := getRemoteRetryIntv + lib.RandomStagger(getRemoteRetryIntv) select { case <-time.After(retry): continue @@ -568,15 +574,15 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser return nil } -// noopPrevAlloc does not block or migrate on a previous allocation and never +// NoopPrevAlloc does not block or migrate on a previous allocation and never // returns an error. -type noopPrevAlloc struct{} +type NoopPrevAlloc struct{} // Wait returns nil immediately. -func (noopPrevAlloc) Wait(context.Context) error { return nil } +func (NoopPrevAlloc) Wait(context.Context) error { return nil } // Migrate returns nil immediately. -func (noopPrevAlloc) Migrate(context.Context, *allocdir.AllocDir) error { return nil } +func (NoopPrevAlloc) Migrate(context.Context, *allocdir.AllocDir) error { return nil } -func (noopPrevAlloc) IsWaiting() bool { return false } -func (noopPrevAlloc) IsMigrating() bool { return false } +func (NoopPrevAlloc) IsWaiting() bool { return false } +func (NoopPrevAlloc) IsMigrating() bool { return false } diff --git a/client/alloc_watcher_test.go b/client/allocrunner/alloc_watcher_test.go similarity index 95% rename from client/alloc_watcher_test.go rename to client/allocrunner/alloc_watcher_test.go index 43048363b..907f4a521 100644 --- a/client/alloc_watcher_test.go +++ b/client/allocrunner/alloc_watcher_test.go @@ -1,4 +1,4 @@ -package client +package allocrunner import ( "archive/tar" @@ -15,15 +15,15 @@ import ( "time" "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/testutil" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" ) // TestPrevAlloc_LocalPrevAlloc asserts that when a previous alloc runner is // set a localPrevAlloc will block on it. func TestPrevAlloc_LocalPrevAlloc(t *testing.T) { - _, prevAR := testAllocRunner(t, false) + _, prevAR := TestAllocRunner(t, false) prevAR.alloc.Job.TaskGroups[0].Tasks[0].Config["run_for"] = "10s" newAlloc := mock.Alloc() @@ -33,7 +33,7 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) { task.Driver = "mock_driver" task.Config["run_for"] = "500ms" - waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger(), "") + waiter := NewAllocWatcher(newAlloc, prevAR, nil, nil, testlog.Logger(t), "") // Wait in a goroutine with a context to make sure it exits at the right time ctx, cancel := context.WithCancel(context.Background()) @@ -177,14 +177,8 @@ func TestPrevAlloc_StreamAllocDir_Ok(t *testing.T) { } defer os.RemoveAll(dir1) - c1 := TestClient(t, func(c *config.Config) { - c.RPCHandler = nil - }) - defer c1.Shutdown() - rc := ioutil.NopCloser(buf) - - prevAlloc := &remotePrevAlloc{logger: testLogger()} + prevAlloc := &remotePrevAlloc{logger: testlog.Logger(t)} if err := prevAlloc.streamAllocDir(context.Background(), rc, dir1); err != nil { t.Fatalf("err: %v", err) } @@ -234,7 +228,7 @@ func TestPrevAlloc_StreamAllocDir_Error(t *testing.T) { // This test only unit tests streamAllocDir so we only need a partially // complete remotePrevAlloc prevAlloc := &remotePrevAlloc{ - logger: testLogger(), + logger: testlog.Logger(t), allocID: "123", prevAllocID: "abc", migrate: true, diff --git a/client/getter/getter.go b/client/allocrunner/getter/getter.go similarity index 100% rename from client/getter/getter.go rename to client/allocrunner/getter/getter.go diff --git a/client/getter/getter_test.go b/client/allocrunner/getter/getter_test.go similarity index 100% rename from client/getter/getter_test.go rename to client/allocrunner/getter/getter_test.go diff --git a/client/getter/test-fixtures/archive.tar.gz b/client/allocrunner/getter/test-fixtures/archive.tar.gz similarity index 100% rename from client/getter/test-fixtures/archive.tar.gz rename to client/allocrunner/getter/test-fixtures/archive.tar.gz diff --git a/client/getter/test-fixtures/archive/exist/my.config b/client/allocrunner/getter/test-fixtures/archive/exist/my.config similarity index 100% rename from client/getter/test-fixtures/archive/exist/my.config rename to client/allocrunner/getter/test-fixtures/archive/exist/my.config diff --git a/client/getter/test-fixtures/archive/new/my.config b/client/allocrunner/getter/test-fixtures/archive/new/my.config similarity index 100% rename from client/getter/test-fixtures/archive/new/my.config rename to client/allocrunner/getter/test-fixtures/archive/new/my.config diff --git a/client/getter/test-fixtures/archive/test.sh b/client/allocrunner/getter/test-fixtures/archive/test.sh similarity index 100% rename from client/getter/test-fixtures/archive/test.sh rename to client/allocrunner/getter/test-fixtures/archive/test.sh diff --git a/client/getter/test-fixtures/test.sh b/client/allocrunner/getter/test-fixtures/test.sh similarity index 100% rename from client/getter/test-fixtures/test.sh rename to client/allocrunner/getter/test-fixtures/test.sh diff --git a/client/consul_template.go b/client/allocrunner/taskrunner/consul_template.go similarity index 99% rename from client/consul_template.go rename to client/allocrunner/taskrunner/consul_template.go index 3e4869706..ac7aed79c 100644 --- a/client/consul_template.go +++ b/client/allocrunner/taskrunner/consul_template.go @@ -1,4 +1,4 @@ -package client +package taskrunner import ( "fmt" diff --git a/client/consul_template_test.go b/client/allocrunner/taskrunner/consul_template_test.go similarity index 99% rename from client/consul_template_test.go rename to client/allocrunner/taskrunner/consul_template_test.go index fa79a61ee..d07744493 100644 --- a/client/consul_template_test.go +++ b/client/allocrunner/taskrunner/consul_template_test.go @@ -1,4 +1,4 @@ -package client +package taskrunner import ( "fmt" diff --git a/client/allocrunner/taskrunner/getters.go b/client/allocrunner/taskrunner/getters.go new file mode 100644 index 000000000..0a4bca06f --- /dev/null +++ b/client/allocrunner/taskrunner/getters.go @@ -0,0 +1,19 @@ +package taskrunner + +// Name returns the name of the task +func (r *TaskRunner) Name() string { + if r == nil || r.task == nil { + return "" + } + + return r.task.Name +} + +// IsLeader returns whether the task is a leader task +func (r *TaskRunner) IsLeader() bool { + if r == nil || r.task == nil { + return false + } + + return r.task.Leader +} diff --git a/client/restarts.go b/client/allocrunner/taskrunner/restarts/restarts.go similarity index 95% rename from client/restarts.go rename to client/allocrunner/taskrunner/restarts/restarts.go index ebdc62cff..05dd22958 100644 --- a/client/restarts.go +++ b/client/allocrunner/taskrunner/restarts/restarts.go @@ -1,4 +1,4 @@ -package client +package restarts import ( "fmt" @@ -20,7 +20,7 @@ const ( ReasonDelay = "Exceeded allowed attempts, applying a delay" ) -func newRestartTracker(policy *structs.RestartPolicy, jobType string) *RestartTracker { +func NewRestartTracker(policy *structs.RestartPolicy, jobType string) *RestartTracker { onSuccess := true if jobType == structs.JobTypeBatch { onSuccess = false @@ -54,6 +54,13 @@ func (r *RestartTracker) SetPolicy(policy *structs.RestartPolicy) { r.policy = policy } +// GetPolicy returns a copy of the policy used to determine restarts. +func (r *RestartTracker) GetPolicy() *structs.RestartPolicy { + r.lock.Lock() + defer r.lock.Unlock() + return r.policy.Copy() +} + // SetStartError is used to mark the most recent start error. If starting was // successful the error should be nil. func (r *RestartTracker) SetStartError(err error) *RestartTracker { diff --git a/client/restarts_test.go b/client/allocrunner/taskrunner/restarts/restarts_test.go similarity index 91% rename from client/restarts_test.go rename to client/allocrunner/taskrunner/restarts/restarts_test.go index 915902e04..7b8e5ea36 100644 --- a/client/restarts_test.go +++ b/client/allocrunner/taskrunner/restarts/restarts_test.go @@ -1,4 +1,4 @@ -package client +package restarts import ( "fmt" @@ -32,7 +32,7 @@ func testWaitResult(exit int) *cstructs.WaitResult { func TestClient_RestartTracker_ModeDelay(t *testing.T) { t.Parallel() p := testPolicy(true, structs.RestartPolicyModeDelay) - rt := newRestartTracker(p, structs.JobTypeService) + rt := NewRestartTracker(p, structs.JobTypeService) for i := 0; i < p.Attempts; i++ { state, when := rt.SetWaitResult(testWaitResult(127)).GetState() if state != structs.TaskRestarting { @@ -58,7 +58,7 @@ func TestClient_RestartTracker_ModeDelay(t *testing.T) { func TestClient_RestartTracker_ModeFail(t *testing.T) { t.Parallel() p := testPolicy(true, structs.RestartPolicyModeFail) - rt := newRestartTracker(p, structs.JobTypeSystem) + rt := NewRestartTracker(p, structs.JobTypeSystem) for i := 0; i < p.Attempts; i++ { state, when := rt.SetWaitResult(testWaitResult(127)).GetState() if state != structs.TaskRestarting { @@ -78,7 +78,7 @@ func TestClient_RestartTracker_ModeFail(t *testing.T) { func TestClient_RestartTracker_NoRestartOnSuccess(t *testing.T) { t.Parallel() p := testPolicy(false, structs.RestartPolicyModeDelay) - rt := newRestartTracker(p, structs.JobTypeBatch) + rt := NewRestartTracker(p, structs.JobTypeBatch) if state, _ := rt.SetWaitResult(testWaitResult(0)).GetState(); state != structs.TaskTerminated { t.Fatalf("NextRestart() returned %v, expected: %v", state, structs.TaskTerminated) } @@ -90,28 +90,28 @@ func TestClient_RestartTracker_ZeroAttempts(t *testing.T) { p.Attempts = 0 // Test with a non-zero exit code - rt := newRestartTracker(p, structs.JobTypeService) + rt := NewRestartTracker(p, structs.JobTypeService) if state, when := rt.SetWaitResult(testWaitResult(1)).GetState(); state != structs.TaskNotRestarting { t.Fatalf("expect no restart, got restart/delay: %v/%v", state, when) } // Even with a zero (successful) exit code non-batch jobs should exit // with TaskNotRestarting - rt = newRestartTracker(p, structs.JobTypeService) + rt = NewRestartTracker(p, structs.JobTypeService) if state, when := rt.SetWaitResult(testWaitResult(0)).GetState(); state != structs.TaskNotRestarting { t.Fatalf("expect no restart, got restart/delay: %v/%v", state, when) } // Batch jobs with a zero exit code and 0 attempts *do* exit cleanly // with Terminated - rt = newRestartTracker(p, structs.JobTypeBatch) + rt = NewRestartTracker(p, structs.JobTypeBatch) if state, when := rt.SetWaitResult(testWaitResult(0)).GetState(); state != structs.TaskTerminated { t.Fatalf("expect terminated, got restart/delay: %v/%v", state, when) } // Batch jobs with a non-zero exit code and 0 attempts exit with // TaskNotRestarting - rt = newRestartTracker(p, structs.JobTypeBatch) + rt = NewRestartTracker(p, structs.JobTypeBatch) if state, when := rt.SetWaitResult(testWaitResult(1)).GetState(); state != structs.TaskNotRestarting { t.Fatalf("expect no restart, got restart/delay: %v/%v", state, when) } @@ -121,7 +121,7 @@ func TestClient_RestartTracker_RestartTriggered(t *testing.T) { t.Parallel() p := testPolicy(true, structs.RestartPolicyModeFail) p.Attempts = 0 - rt := newRestartTracker(p, structs.JobTypeService) + rt := NewRestartTracker(p, structs.JobTypeService) if state, when := rt.SetRestartTriggered(false).GetState(); state != structs.TaskRestarting && when != 0 { t.Fatalf("expect restart immediately, got %v %v", state, when) } @@ -131,7 +131,7 @@ func TestClient_RestartTracker_RestartTriggered_Failure(t *testing.T) { t.Parallel() p := testPolicy(true, structs.RestartPolicyModeFail) p.Attempts = 1 - rt := newRestartTracker(p, structs.JobTypeService) + rt := NewRestartTracker(p, structs.JobTypeService) if state, when := rt.SetRestartTriggered(true).GetState(); state != structs.TaskRestarting || when == 0 { t.Fatalf("expect restart got %v %v", state, when) } @@ -143,7 +143,7 @@ func TestClient_RestartTracker_RestartTriggered_Failure(t *testing.T) { func TestClient_RestartTracker_StartError_Recoverable_Fail(t *testing.T) { t.Parallel() p := testPolicy(true, structs.RestartPolicyModeFail) - rt := newRestartTracker(p, structs.JobTypeSystem) + rt := NewRestartTracker(p, structs.JobTypeSystem) recErr := structs.NewRecoverableError(fmt.Errorf("foo"), true) for i := 0; i < p.Attempts; i++ { state, when := rt.SetStartError(recErr).GetState() @@ -164,7 +164,7 @@ func TestClient_RestartTracker_StartError_Recoverable_Fail(t *testing.T) { func TestClient_RestartTracker_StartError_Recoverable_Delay(t *testing.T) { t.Parallel() p := testPolicy(true, structs.RestartPolicyModeDelay) - rt := newRestartTracker(p, structs.JobTypeSystem) + rt := NewRestartTracker(p, structs.JobTypeSystem) recErr := structs.NewRecoverableError(fmt.Errorf("foo"), true) for i := 0; i < p.Attempts; i++ { state, when := rt.SetStartError(recErr).GetState() diff --git a/client/task_runner.go b/client/allocrunner/taskrunner/task_runner.go similarity index 97% rename from client/task_runner.go rename to client/allocrunner/taskrunner/task_runner.go index 4affba3e6..2cbb01008 100644 --- a/client/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -1,4 +1,4 @@ -package client +package taskrunner import ( "bytes" @@ -21,9 +21,12 @@ import ( "github.com/hashicorp/go-multierror" version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/allocrunner/getter" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts" "github.com/hashicorp/nomad/client/config" + consulApi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/driver" - "github.com/hashicorp/nomad/client/getter" + "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/structs" @@ -89,8 +92,8 @@ type TaskRunner struct { config *config.Config updater TaskStateUpdater logger *log.Logger - restartTracker *RestartTracker - consul ConsulServiceAPI + restartTracker *restarts.RestartTracker + consul consulApi.ConsulServiceAPI // running marks whether the task is running running bool @@ -230,7 +233,7 @@ type SignalEvent struct { func NewTaskRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater TaskStateUpdater, taskDir *allocdir.TaskDir, alloc *structs.Allocation, task *structs.Task, - vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI) *TaskRunner { + vaultClient vaultclient.VaultClient, consulClient consulApi.ConsulServiceAPI) *TaskRunner { // Merge in the task resources task.Resources = alloc.TaskResources[task.Name] @@ -241,7 +244,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, logger.Printf("[ERR] client: alloc %q for missing task group %q", alloc.ID, alloc.TaskGroup) return nil } - restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type) + restartTracker := restarts.NewRestartTracker(tg.RestartPolicy, alloc.Job.Type) // Initialize the environment builder envBuilder := env.NewBuilder(config.Node, alloc, task, config.Region) @@ -329,40 +332,20 @@ func (r *TaskRunner) pre060StateFilePath() string { // backwards incompatible upgrades that need to restart tasks with a new // executor. func (r *TaskRunner) RestoreState() (string, error) { - // COMPAT: Remove in 0.7.0 - // 0.6.0 transitioned from individual state files to a single bolt-db. - // The upgrade path is to: - // Check if old state exists - // If so, restore from that and delete old state - // Restore using state database - var snap taskRunnerState - - // Check if the old snapshot is there - oldPath := r.pre060StateFilePath() - if err := pre060RestoreState(oldPath, &snap); err == nil { - // Delete the old state - os.RemoveAll(oldPath) - } else if !os.IsNotExist(err) { - // Something corrupt in the old state file - return "", err - } else { - // We are doing a normal restore - err := r.stateDB.View(func(tx *bolt.Tx) error { - bkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name) - if err != nil { - return fmt.Errorf("failed to get task bucket: %v", err) - } - - if err := getObject(bkt, taskRunnerStateAllKey, &snap); err != nil { - return fmt.Errorf("failed to read task runner state: %v", err) - } - return nil - }) + err := r.stateDB.View(func(tx *bolt.Tx) error { + bkt, err := state.GetTaskBucket(tx, r.alloc.ID, r.task.Name) if err != nil { - return "", err + return fmt.Errorf("failed to get task bucket: %v", err) } + if err := state.GetObject(bkt, taskRunnerStateAllKey, &snap); err != nil { + return fmt.Errorf("failed to read task runner state: %v", err) + } + return nil + }) + if err != nil { + return "", err } // Restore fields from the snapshot @@ -510,12 +493,12 @@ func (r *TaskRunner) SaveState() error { // Start the transaction. return r.stateDB.Batch(func(tx *bolt.Tx) error { // Grab the task bucket - taskBkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name) + taskBkt, err := state.GetTaskBucket(tx, r.alloc.ID, r.task.Name) if err != nil { return fmt.Errorf("failed to retrieve allocation bucket: %v", err) } - if err := putData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil { + if err := state.PutData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil { return fmt.Errorf("failed to write task_runner state: %v", err) } @@ -534,7 +517,7 @@ func (r *TaskRunner) DestroyState() error { defer r.persistLock.Unlock() return r.stateDB.Update(func(tx *bolt.Tx) error { - if err := deleteTaskBucket(tx, r.alloc.ID, r.task.Name); err != nil { + if err := state.DeleteTaskBucket(tx, r.alloc.ID, r.task.Name); err != nil { return fmt.Errorf("failed to delete task bucket: %v", err) } return nil diff --git a/client/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go similarity index 98% rename from client/task_runner_test.go rename to client/allocrunner/taskrunner/task_runner_test.go index dc22abcaf..e481e5063 100644 --- a/client/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -1,4 +1,4 @@ -package client +package taskrunner import ( "fmt" @@ -17,7 +17,9 @@ import ( "github.com/boltdb/bolt" "github.com/golang/snappy" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts" "github.com/hashicorp/nomad/client/config" + consulApi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/driver/env" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" @@ -40,9 +42,9 @@ func prefixedTestLogger(prefix string) *log.Logger { } // Returns a tracker that never restarts. -func noRestartsTracker() *RestartTracker { +func noRestartsTracker() *restarts.RestartTracker { policy := &structs.RestartPolicy{Attempts: 0, Mode: structs.RestartPolicyModeFail} - return newRestartTracker(policy, structs.JobTypeBatch) + return restarts.NewRestartTracker(policy, structs.JobTypeBatch) } type MockTaskStateUpdater struct { @@ -387,8 +389,8 @@ func TestTaskRunner_Update(t *testing.T) { if ctx.tr.task.Driver != newTask.Driver { return false, fmt.Errorf("Task not copied") } - if ctx.tr.restartTracker.policy.Mode != newMode { - return false, fmt.Errorf("expected restart policy %q but found %q", newMode, ctx.tr.restartTracker.policy.Mode) + if ctx.tr.restartTracker.GetPolicy().Mode != newMode { + return false, fmt.Errorf("expected restart policy %q but found %q", newMode, ctx.tr.restartTracker.GetPolicy().Mode) } if ctx.tr.handle.ID() == oldHandle { return false, fmt.Errorf("handle not ctx.updated") @@ -642,7 +644,7 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) { ctx := testTaskRunnerFromAlloc(t, true, alloc) // Use mockConsulServiceClient - consul := newMockConsulServiceClient(t) + consul := consulApi.NewMockConsulServiceClient(t) ctx.tr.consul = consul ctx.tr.MarkReceived() @@ -650,26 +652,26 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) { defer ctx.Cleanup() // Assert it is properly registered and unregistered - if expected := 6; len(consul.ops) != expected { - t.Errorf("expected %d consul ops but found: %d", expected, len(consul.ops)) + if expected := 6; len(consul.Ops) != expected { + t.Errorf("expected %d consul ops but found: %d", expected, len(consul.Ops)) } - if consul.ops[0].op != "add" { - t.Errorf("expected first op to be add but found: %q", consul.ops[0].op) + if consul.Ops[0].Op != "add" { + t.Errorf("expected first Op to be add but found: %q", consul.Ops[0].Op) } - if consul.ops[1].op != "remove" { - t.Errorf("expected second op to be remove but found: %q", consul.ops[1].op) + if consul.Ops[1].Op != "remove" { + t.Errorf("expected second op to be remove but found: %q", consul.Ops[1].Op) } - if consul.ops[2].op != "remove" { - t.Errorf("expected third op to be remove but found: %q", consul.ops[2].op) + if consul.Ops[2].Op != "remove" { + t.Errorf("expected third op to be remove but found: %q", consul.Ops[2].Op) } - if consul.ops[3].op != "add" { - t.Errorf("expected fourth op to be add but found: %q", consul.ops[3].op) + if consul.Ops[3].Op != "add" { + t.Errorf("expected fourth op to be add but found: %q", consul.Ops[3].Op) } - if consul.ops[4].op != "remove" { - t.Errorf("expected fifth op to be remove but found: %q", consul.ops[4].op) + if consul.Ops[4].Op != "remove" { + t.Errorf("expected fifth op to be remove but found: %q", consul.Ops[4].Op) } - if consul.ops[5].op != "remove" { - t.Errorf("expected sixth op to be remove but found: %q", consul.ops[5].op) + if consul.Ops[5].Op != "remove" { + t.Errorf("expected sixth op to be remove but found: %q", consul.Ops[5].Op) } } diff --git a/client/task_runner_unix_test.go b/client/allocrunner/taskrunner/task_runner_unix_test.go similarity index 99% rename from client/task_runner_unix_test.go rename to client/allocrunner/taskrunner/task_runner_unix_test.go index b7c2aa441..1bb397f46 100644 --- a/client/task_runner_unix_test.go +++ b/client/allocrunner/taskrunner/task_runner_unix_test.go @@ -1,6 +1,6 @@ // +build !windows -package client +package taskrunner import ( "syscall" diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go new file mode 100644 index 000000000..60a7b7b20 --- /dev/null +++ b/client/allocrunner/testing.go @@ -0,0 +1,65 @@ +package allocrunner + +import ( + "io/ioutil" + "os" + "sync" + "testing" + + "github.com/boltdb/bolt" + "github.com/hashicorp/nomad/client/config" + consulApi "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/vaultclient" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" +) + +type MockAllocStateUpdater struct { + Allocs []*structs.Allocation + mu sync.Mutex +} + +// Update fulfills the TaskStateUpdater interface +func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) { + m.mu.Lock() + m.Allocs = append(m.Allocs, alloc) + m.mu.Unlock() +} + +// Last returns a copy of the last alloc (or nil) sync'd +func (m *MockAllocStateUpdater) Last() *structs.Allocation { + m.mu.Lock() + defer m.mu.Unlock() + n := len(m.Allocs) + if n == 0 { + return nil + } + return m.Allocs[n-1].Copy() +} + +func TestAllocRunnerFromAlloc(t *testing.T, alloc *structs.Allocation, restarts bool) (*MockAllocStateUpdater, *AllocRunner) { + conf := config.DefaultConfig() + conf.Node = mock.Node() + conf.StateDir = os.TempDir() + conf.AllocDir = os.TempDir() + tmp, _ := ioutil.TempFile("", "state-db") + db, _ := bolt.Open(tmp.Name(), 0600, nil) + upd := &MockAllocStateUpdater{} + if !restarts { + *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} + alloc.Job.Type = structs.JobTypeBatch + } + vclient := vaultclient.NewMockVaultClient() + ar := NewAllocRunner(testlog.Logger(t), conf, db, upd.Update, alloc, vclient, consulApi.NewMockConsulServiceClient(t), NoopPrevAlloc{}) + return upd, ar +} + +func TestAllocRunner(t *testing.T, restarts bool) (*MockAllocStateUpdater, *AllocRunner) { + // Use mock driver + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config["run_for"] = "500ms" + return TestAllocRunnerFromAlloc(t, alloc, restarts) +} diff --git a/client/client.go b/client/client.go index 0cc2d8111..b6e30a8f6 100644 --- a/client/client.go +++ b/client/client.go @@ -21,8 +21,11 @@ import ( "github.com/hashicorp/consul/lib" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/allocrunner" "github.com/hashicorp/nomad/client/config" + consulApi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/servers" + "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/stats" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" @@ -88,7 +91,7 @@ const ( type ClientStatsReporter interface { // GetAllocStats returns the AllocStatsReporter for the passed allocation. // If it does not exist an error is reported. - GetAllocStats(allocID string) (AllocStatsReporter, error) + GetAllocStats(allocID string) (allocrunner.AllocStatsReporter, error) // LatestHostStats returns the latest resource usage stats for the host LatestHostStats() *stats.HostStats @@ -145,7 +148,7 @@ type Client struct { // allocs maps alloc IDs to their AllocRunner. This map includes all // AllocRunners - running and GC'd - until the server GCs them. - allocs map[string]*AllocRunner + allocs map[string]*allocrunner.AllocRunner allocLock sync.RWMutex // allocUpdates stores allocations that need to be synced to the server. @@ -153,7 +156,7 @@ type Client struct { // consulService is Nomad's custom Consul client for managing services // and checks. - consulService ConsulServiceAPI + consulService consulApi.ConsulServiceAPI // consulCatalog is the subset of Consul's Catalog API Nomad uses. consulCatalog consul.CatalogAPI @@ -193,7 +196,7 @@ var ( ) // NewClient is used to create a new client from the given configuration -func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulService ConsulServiceAPI, logger *log.Logger) (*Client, error) { +func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulService consulApi.ConsulServiceAPI, logger *log.Logger) (*Client, error) { // Create the tls wrapper var tlsWrap tlsutil.RegionWrapper if cfg.TLSConfig.EnableRPC { @@ -217,7 +220,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic tlsWrap: tlsWrap, streamingRpcs: structs.NewStreamingRpcRegistry(), logger: logger, - allocs: make(map[string]*AllocRunner), + allocs: make(map[string]*allocrunner.AllocRunner), allocUpdates: make(chan *structs.Allocation, 64), shutdownCh: make(chan struct{}), triggerDiscoveryCh: make(chan struct{}), @@ -562,7 +565,7 @@ func (c *Client) StatsReporter() ClientStatsReporter { return c } -func (c *Client) GetAllocStats(allocID string) (AllocStatsReporter, error) { +func (c *Client) GetAllocStats(allocID string) (allocrunner.AllocStatsReporter, error) { c.allocLock.RLock() defer c.allocLock.RUnlock() ar, ok := c.allocs[allocID] @@ -714,7 +717,7 @@ func (c *Client) restoreState() error { } else { // Normal path err := c.stateDB.View(func(tx *bolt.Tx) error { - allocs, err = getAllAllocationIDs(tx) + allocs, err = state.GetAllAllocationIDs(tx) if err != nil { return fmt.Errorf("failed to list allocations: %v", err) } @@ -731,10 +734,10 @@ func (c *Client) restoreState() error { alloc := &structs.Allocation{ID: id} // don't worry about blocking/migrating when restoring - watcher := noopPrevAlloc{} + watcher := allocrunner.NoopPrevAlloc{} c.configLock.RLock() - ar := NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, watcher) + ar := allocrunner.NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, watcher) c.configLock.RUnlock() c.allocLock.Lock() @@ -778,7 +781,7 @@ func (c *Client) saveState() error { wg.Add(len(runners)) for id, ar := range runners { - go func(id string, ar *AllocRunner) { + go func(id string, ar *allocrunner.AllocRunner) { err := ar.SaveState() if err != nil { c.logger.Printf("[ERR] client: failed to save state for alloc %q: %v", id, err) @@ -795,10 +798,10 @@ func (c *Client) saveState() error { } // getAllocRunners returns a snapshot of the current set of alloc runners. -func (c *Client) getAllocRunners() map[string]*AllocRunner { +func (c *Client) getAllocRunners() map[string]*allocrunner.AllocRunner { c.allocLock.RLock() defer c.allocLock.RUnlock() - runners := make(map[string]*AllocRunner, len(c.allocs)) + runners := make(map[string]*allocrunner.AllocRunner, len(c.allocs)) for id, ar := range c.allocs { runners[id] = ar } @@ -1677,7 +1680,7 @@ OUTER: // allocation or if the alloc runner requires an updated allocation. runner, ok := runners[allocID] - if !ok || runner.shouldUpdate(modifyIndex) { + if !ok || runner.ShouldUpdate(modifyIndex) { // Only pull allocs that are required. Filtered // allocs might be at a higher index, so ignore // it. @@ -1810,7 +1813,7 @@ func (c *Client) runAllocs(update *allocUpdates) { c.allocLock.RLock() exist := make([]*structs.Allocation, 0, len(c.allocs)) for _, ar := range c.allocs { - exist = append(exist, ar.alloc) + exist = append(exist, ar.Alloc()) } c.allocLock.RUnlock() @@ -1899,18 +1902,18 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error // get the previous alloc runner - if one exists - for the // blocking/migrating watcher - var prevAR *AllocRunner + var prevAR *allocrunner.AllocRunner if alloc.PreviousAllocation != "" { prevAR = c.allocs[alloc.PreviousAllocation] } c.configLock.RLock() - prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger, migrateToken) + prevAlloc := allocrunner.NewAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger, migrateToken) // Copy the config since the node can be swapped out as it is being updated. // The long term fix is to pass in the config and node separately and then // we don't have to do a copy. - ar := NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc) + ar := allocrunner.NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc) c.configLock.RUnlock() // Store the alloc runner. diff --git a/client/client_test.go b/client/client_test.go index f697972d7..5d44ad878 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -11,6 +11,7 @@ import ( memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/client/config" + consulApi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/uuid" @@ -603,8 +604,8 @@ func TestClient_SaveRestoreState(t *testing.T) { // Create a new client logger := log.New(c1.config.LogOutput, "", log.LstdFlags) catalog := consul.NewMockCatalog(logger) - mockService := newMockConsulServiceClient(t) - mockService.logger = logger + mockService := consulApi.NewMockConsulServiceClient(t) + mockService.Logger = logger c2, err := NewClient(c1.config, catalog, mockService, logger) if err != nil { t.Fatalf("err: %v", err) diff --git a/client/consul.go b/client/consul/consul.go similarity index 96% rename from client/consul.go rename to client/consul/consul.go index 58f75e6f8..69f2f6e67 100644 --- a/client/consul.go +++ b/client/consul/consul.go @@ -1,4 +1,4 @@ -package client +package consul import ( "github.com/hashicorp/nomad/command/agent/consul" diff --git a/client/consul/consul_testing.go b/client/consul/consul_testing.go new file mode 100644 index 000000000..e9f642112 --- /dev/null +++ b/client/consul/consul_testing.go @@ -0,0 +1,86 @@ +package consul + +import ( + "fmt" + "log" + "sync" + + "github.com/hashicorp/nomad/command/agent/consul" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/mitchellh/go-testing-interface" +) + +// MockConsulOp represents the register/deregister operations. +type MockConsulOp struct { + Op string // add, remove, or update + AllocID string + Task string +} + +func NewMockConsulOp(op, allocID, task string) MockConsulOp { + if op != "add" && op != "remove" && op != "update" && op != "alloc_registrations" { + panic(fmt.Errorf("invalid consul op: %s", op)) + } + return MockConsulOp{ + Op: op, + AllocID: allocID, + Task: task, + } +} + +// MockConsulServiceClient implements the ConsulServiceAPI interface to record +// and log task registration/deregistration. +type MockConsulServiceClient struct { + Ops []MockConsulOp + mu sync.Mutex + + Logger *log.Logger + + // AllocRegistrationsFn allows injecting return values for the + // AllocRegistrations function. + AllocRegistrationsFn func(allocID string) (*consul.AllocRegistration, error) +} + +func NewMockConsulServiceClient(t testing.T) *MockConsulServiceClient { + m := MockConsulServiceClient{ + Ops: make([]MockConsulOp, 0, 20), + Logger: testlog.Logger(t), + } + return &m +} + +func (m *MockConsulServiceClient) UpdateTask(old, new *consul.TaskServices) error { + m.mu.Lock() + defer m.mu.Unlock() + m.Logger.Printf("[TEST] mock_consul: UpdateTask(alloc: %s, task: %s)", new.AllocID[:6], new.Name) + m.Ops = append(m.Ops, NewMockConsulOp("update", new.AllocID, new.Name)) + return nil +} + +func (m *MockConsulServiceClient) RegisterTask(task *consul.TaskServices) error { + m.mu.Lock() + defer m.mu.Unlock() + m.Logger.Printf("[TEST] mock_consul: RegisterTask(alloc: %s, task: %s)", task.AllocID, task.Name) + m.Ops = append(m.Ops, NewMockConsulOp("add", task.AllocID, task.Name)) + return nil +} + +func (m *MockConsulServiceClient) RemoveTask(task *consul.TaskServices) { + m.mu.Lock() + defer m.mu.Unlock() + m.Logger.Printf("[TEST] mock_consul: RemoveTask(%q, %q)", task.AllocID, task.Name) + m.Ops = append(m.Ops, NewMockConsulOp("remove", task.AllocID, task.Name)) +} + +func (m *MockConsulServiceClient) AllocRegistrations(allocID string) (*consul.AllocRegistration, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.Logger.Printf("[TEST] mock_consul: AllocRegistrations(%q)", allocID) + m.Ops = append(m.Ops, NewMockConsulOp("alloc_registrations", allocID, "")) + + if m.AllocRegistrationsFn != nil { + return m.AllocRegistrationsFn(allocID) + } + + return nil, nil +} diff --git a/client/consul_testing.go b/client/consul_testing.go deleted file mode 100644 index 1db5d4ccf..000000000 --- a/client/consul_testing.go +++ /dev/null @@ -1,86 +0,0 @@ -package client - -import ( - "fmt" - "log" - "sync" - - "github.com/hashicorp/nomad/command/agent/consul" - "github.com/hashicorp/nomad/helper/testlog" - "github.com/mitchellh/go-testing-interface" -) - -// mockConsulOp represents the register/deregister operations. -type mockConsulOp struct { - op string // add, remove, or update - allocID string - task string -} - -func newMockConsulOp(op, allocID, task string) mockConsulOp { - if op != "add" && op != "remove" && op != "update" && op != "alloc_registrations" { - panic(fmt.Errorf("invalid consul op: %s", op)) - } - return mockConsulOp{ - op: op, - allocID: allocID, - task: task, - } -} - -// mockConsulServiceClient implements the ConsulServiceAPI interface to record -// and log task registration/deregistration. -type mockConsulServiceClient struct { - ops []mockConsulOp - mu sync.Mutex - - logger *log.Logger - - // allocRegistrationsFn allows injecting return values for the - // AllocRegistrations function. - allocRegistrationsFn func(allocID string) (*consul.AllocRegistration, error) -} - -func newMockConsulServiceClient(t testing.T) *mockConsulServiceClient { - m := mockConsulServiceClient{ - ops: make([]mockConsulOp, 0, 20), - logger: testlog.Logger(t), - } - return &m -} - -func (m *mockConsulServiceClient) UpdateTask(old, new *consul.TaskServices) error { - m.mu.Lock() - defer m.mu.Unlock() - m.logger.Printf("[TEST] mock_consul: UpdateTask(alloc: %s, task: %s)", new.AllocID[:6], new.Name) - m.ops = append(m.ops, newMockConsulOp("update", new.AllocID, new.Name)) - return nil -} - -func (m *mockConsulServiceClient) RegisterTask(task *consul.TaskServices) error { - m.mu.Lock() - defer m.mu.Unlock() - m.logger.Printf("[TEST] mock_consul: RegisterTask(alloc: %s, task: %s)", task.AllocID, task.Name) - m.ops = append(m.ops, newMockConsulOp("add", task.AllocID, task.Name)) - return nil -} - -func (m *mockConsulServiceClient) RemoveTask(task *consul.TaskServices) { - m.mu.Lock() - defer m.mu.Unlock() - m.logger.Printf("[TEST] mock_consul: RemoveTask(%q, %q)", task.AllocID, task.Name) - m.ops = append(m.ops, newMockConsulOp("remove", task.AllocID, task.Name)) -} - -func (m *mockConsulServiceClient) AllocRegistrations(allocID string) (*consul.AllocRegistration, error) { - m.mu.Lock() - defer m.mu.Unlock() - m.logger.Printf("[TEST] mock_consul: AllocRegistrations(%q)", allocID) - m.ops = append(m.ops, newMockConsulOp("alloc_registrations", allocID, "")) - - if m.allocRegistrationsFn != nil { - return m.allocRegistrationsFn(allocID) - } - - return nil, nil -} diff --git a/client/fingerprint_manager_test.go b/client/fingerprint_manager_test.go index 1a8ccd9b3..464a5d186 100644 --- a/client/fingerprint_manager_test.go +++ b/client/fingerprint_manager_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" ) @@ -25,7 +26,7 @@ func TestFingerprintManager_Run_MockDriver(t *testing.T) { testClient.shutdownCh, testClient.updateNodeFromFingerprint, testClient.updateNodeFromDriver, - testLogger(), + testlog.Logger(t), ) err := fm.Run() @@ -43,7 +44,7 @@ func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) { require := require.New(t) testClient := TestClient(t, nil) - testClient.logger = testLogger() + testClient.logger = testlog.Logger(t) defer testClient.Shutdown() fm := NewFingerprintManager( @@ -70,7 +71,7 @@ func TestFingerprintManager_Fingerprint_Run(t *testing.T) { require := require.New(t) testClient := TestClient(t, nil) - testClient.logger = testLogger() + testClient.logger = testlog.Logger(t) defer testClient.Shutdown() fm := NewFingerprintManager( @@ -102,7 +103,7 @@ func TestFingerprintManager_Fingerprint_Periodic(t *testing.T) { } }) - testClient.logger = testLogger() + testClient.logger = testlog.Logger(t) defer testClient.Shutdown() fm := NewFingerprintManager( @@ -164,7 +165,7 @@ func TestFingerprintManager_HealthCheck_Driver(t *testing.T) { } }) - testClient.logger = testLogger() + testClient.logger = testlog.Logger(t) defer testClient.Shutdown() fm := NewFingerprintManager( @@ -264,7 +265,7 @@ func TestFingerprintManager_HealthCheck_Periodic(t *testing.T) { } }) - testClient.logger = testLogger() + testClient.logger = testlog.Logger(t) defer testClient.Shutdown() fm := NewFingerprintManager( @@ -360,7 +361,7 @@ func TestFimgerprintManager_Run_InWhitelist(t *testing.T) { } }) - testClient.logger = testLogger() + testClient.logger = testlog.Logger(t) defer testClient.Shutdown() fm := NewFingerprintManager( @@ -390,7 +391,7 @@ func TestFingerprintManager_Run_InBlacklist(t *testing.T) { } }) - testClient.logger = testLogger() + testClient.logger = testlog.Logger(t) defer testClient.Shutdown() fm := NewFingerprintManager( @@ -422,7 +423,7 @@ func TestFingerprintManager_Run_Combination(t *testing.T) { } }) - testClient.logger = testLogger() + testClient.logger = testlog.Logger(t) defer testClient.Shutdown() fm := NewFingerprintManager( @@ -455,7 +456,7 @@ func TestFingerprintManager_Run_WhitelistDrivers(t *testing.T) { } }) - testClient.logger = testLogger() + testClient.logger = testlog.Logger(t) defer testClient.Shutdown() fm := NewFingerprintManager( @@ -485,7 +486,7 @@ func TestFingerprintManager_Run_AllDriversBlacklisted(t *testing.T) { } }) - testClient.logger = testLogger() + testClient.logger = testlog.Logger(t) defer testClient.Shutdown() fm := NewFingerprintManager( @@ -519,7 +520,7 @@ func TestFingerprintManager_Run_DriversWhiteListBlacklistCombination(t *testing. } }) - testClient.logger = testLogger() + testClient.logger = testlog.Logger(t) defer testClient.Shutdown() fm := NewFingerprintManager( @@ -552,7 +553,7 @@ func TestFingerprintManager_Run_DriversInBlacklist(t *testing.T) { } }) - testClient.logger = testLogger() + testClient.logger = testlog.Logger(t) defer testClient.Shutdown() fm := NewFingerprintManager( diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index 4e90daf06..0b371324e 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -92,7 +92,17 @@ func TestFS_Stat(t *testing.T) { return false, fmt.Errorf("alloc doesn't exist") } - return len(ar.tasks) != 0, fmt.Errorf("tasks not running") + alloc := ar.Alloc() + running := false + for _, s := range alloc.TaskStates { + if s.State == structs.TaskStateRunning { + running = true + } else { + running = false + } + } + + return running, fmt.Errorf("tasks not running") }, func(err error) { t.Fatal(err) }) @@ -217,7 +227,17 @@ func TestFS_List(t *testing.T) { return false, fmt.Errorf("alloc doesn't exist") } - return len(ar.tasks) != 0, fmt.Errorf("tasks not running") + alloc := ar.Alloc() + running := false + for _, s := range alloc.TaskStates { + if s.State == structs.TaskStateRunning { + running = true + } else { + running = false + } + } + + return running, fmt.Errorf("tasks not running") }, func(err error) { t.Fatal(err) }) diff --git a/client/gc.go b/client/gc.go index 7eab6fe03..3981d8c96 100644 --- a/client/gc.go +++ b/client/gc.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/hashicorp/nomad/client/allocrunner" "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/nomad/structs" ) @@ -169,7 +170,7 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { // destroyAllocRunner is used to destroy an allocation runner. It will acquire a // lock to restrict parallelism and then destroy the alloc runner, returning // once the allocation has been destroyed. -func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason string) { +func (a *AllocGarbageCollector) destroyAllocRunner(ar *allocrunner.AllocRunner, reason string) { id := "" if alloc := ar.Alloc(); alloc != nil { id = alloc.ID @@ -327,7 +328,7 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e } // MarkForCollection starts tracking an allocation for Garbage Collection -func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) { +func (a *AllocGarbageCollector) MarkForCollection(ar *allocrunner.AllocRunner) { if ar.Alloc() == nil { a.destroyAllocRunner(ar, "alloc is nil") return @@ -342,7 +343,7 @@ func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) { // a PQ type GCAlloc struct { timeStamp time.Time - allocRunner *AllocRunner + allocRunner *allocrunner.AllocRunner index int } @@ -396,7 +397,7 @@ func NewIndexedGCAllocPQ() *IndexedGCAllocPQ { // Push an alloc runner into the GC queue. Returns true if alloc was added, // false if the alloc already existed. -func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) bool { +func (i *IndexedGCAllocPQ) Push(ar *allocrunner.AllocRunner) bool { i.pqLock.Lock() defer i.pqLock.Unlock() diff --git a/client/gc_test.go b/client/gc_test.go index f94b8bb7b..a356e528c 100644 --- a/client/gc_test.go +++ b/client/gc_test.go @@ -1,17 +1,6 @@ package client -import ( - "fmt" - "testing" - "time" - - "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/client/stats" - "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/testutil" -) - +/* func gcConfig() *GCConfig { return &GCConfig{ DiskUsageThreshold: 80, @@ -26,10 +15,10 @@ func TestIndexedGCAllocPQ(t *testing.T) { t.Parallel() pq := NewIndexedGCAllocPQ() - _, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) - _, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) - _, ar3 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) - _, ar4 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar3 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar4 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) pq.Push(ar1) pq.Push(ar2) @@ -105,10 +94,10 @@ func (m *MockStatsCollector) Stats() *stats.HostStats { func TestAllocGarbageCollector_MarkForCollection(t *testing.T) { t.Parallel() - logger := testLogger() + logger := testlog.Logger(t) gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &MockAllocCounter{}, gcConfig()) - _, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) gc.MarkForCollection(ar1) gcAlloc := gc.allocRunners.Pop() @@ -119,11 +108,11 @@ func TestAllocGarbageCollector_MarkForCollection(t *testing.T) { func TestAllocGarbageCollector_Collect(t *testing.T) { t.Parallel() - logger := testLogger() + logger := testlog.Logger(t) gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &MockAllocCounter{}, gcConfig()) - _, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) - _, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) gc.MarkForCollection(ar1) gc.MarkForCollection(ar2) @@ -140,11 +129,11 @@ func TestAllocGarbageCollector_Collect(t *testing.T) { func TestAllocGarbageCollector_CollectAll(t *testing.T) { t.Parallel() - logger := testLogger() + logger := testlog.Logger(t) gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &MockAllocCounter{}, gcConfig()) - _, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) - _, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) gc.MarkForCollection(ar1) gc.MarkForCollection(ar2) @@ -157,15 +146,15 @@ func TestAllocGarbageCollector_CollectAll(t *testing.T) { func TestAllocGarbageCollector_MakeRoomForAllocations_EnoughSpace(t *testing.T) { t.Parallel() - logger := testLogger() + logger := testlog.Logger(t) statsCollector := &MockStatsCollector{} conf := gcConfig() conf.ReservedDiskMB = 20 gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf) - _, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) close(ar1.waitCh) - _, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) close(ar2.waitCh) gc.MarkForCollection(ar1) gc.MarkForCollection(ar2) @@ -192,15 +181,15 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_EnoughSpace(t *testing.T) func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Partial(t *testing.T) { t.Parallel() - logger := testLogger() + logger := testlog.Logger(t) statsCollector := &MockStatsCollector{} conf := gcConfig() conf.ReservedDiskMB = 20 gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf) - _, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) close(ar1.waitCh) - _, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) close(ar2.waitCh) gc.MarkForCollection(ar1) gc.MarkForCollection(ar2) @@ -228,15 +217,15 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Partial(t *testing.T) { func TestAllocGarbageCollector_MakeRoomForAllocations_GC_All(t *testing.T) { t.Parallel() - logger := testLogger() + logger := testlog.Logger(t) statsCollector := &MockStatsCollector{} conf := gcConfig() conf.ReservedDiskMB = 20 gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf) - _, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) close(ar1.waitCh) - _, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) close(ar2.waitCh) gc.MarkForCollection(ar1) gc.MarkForCollection(ar2) @@ -260,15 +249,15 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_All(t *testing.T) { func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T) { t.Parallel() - logger := testLogger() + logger := testlog.Logger(t) statsCollector := &MockStatsCollector{} conf := gcConfig() conf.ReservedDiskMB = 20 gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf) - _, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) close(ar1.waitCh) - _, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) close(ar2.waitCh) gc.MarkForCollection(ar1) gc.MarkForCollection(ar2) @@ -419,15 +408,15 @@ func TestAllocGarbageCollector_MaxAllocs(t *testing.T) { func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) { t.Parallel() - logger := testLogger() + logger := testlog.Logger(t) statsCollector := &MockStatsCollector{} conf := gcConfig() conf.ReservedDiskMB = 20 gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf) - _, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) close(ar1.waitCh) - _, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) close(ar2.waitCh) gc.MarkForCollection(ar1) gc.MarkForCollection(ar2) @@ -451,15 +440,15 @@ func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) { func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) { t.Parallel() - logger := testLogger() + logger := testlog.Logger(t) statsCollector := &MockStatsCollector{} conf := gcConfig() conf.ReservedDiskMB = 20 gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf) - _, ar1 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar1 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) close(ar1.waitCh) - _, ar2 := testAllocRunnerFromAlloc(t, mock.Alloc(), false) + _, ar2 := allocrunner.TestAllocRunnerFromAlloc(t, mock.Alloc(), false) close(ar2.waitCh) gc.MarkForCollection(ar1) gc.MarkForCollection(ar2) @@ -482,3 +471,4 @@ func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) { t.Fatalf("gcAlloc: %v", gcAlloc) } } +*/ diff --git a/client/state_database.go b/client/state/state_database.go similarity index 83% rename from client/state_database.go rename to client/state/state_database.go index a9a36a5f9..4c3d9c2ed 100644 --- a/client/state_database.go +++ b/client/state/state_database.go @@ -1,4 +1,4 @@ -package client +package state import ( "bytes" @@ -25,7 +25,7 @@ var ( allocationsBucket = []byte("allocations") ) -func putObject(bkt *bolt.Bucket, key []byte, obj interface{}) error { +func PutObject(bkt *bolt.Bucket, key []byte, obj interface{}) error { if !bkt.Writable() { return fmt.Errorf("bucket must be writable") } @@ -43,7 +43,7 @@ func putObject(bkt *bolt.Bucket, key []byte, obj interface{}) error { return nil } -func putData(bkt *bolt.Bucket, key, value []byte) error { +func PutData(bkt *bolt.Bucket, key, value []byte) error { if !bkt.Writable() { return fmt.Errorf("bucket must be writable") } @@ -55,7 +55,7 @@ func putData(bkt *bolt.Bucket, key, value []byte) error { return nil } -func getObject(bkt *bolt.Bucket, key []byte, obj interface{}) error { +func GetObject(bkt *bolt.Bucket, key []byte, obj interface{}) error { // Get the data data := bkt.Get(key) if data == nil { @@ -70,11 +70,11 @@ func getObject(bkt *bolt.Bucket, key []byte, obj interface{}) error { return nil } -// getAllocationBucket returns the bucket used to persist state about a +// GetAllocationBucket returns the bucket used to persist state about a // particular allocation. If the root allocation bucket or the specific // allocation bucket doesn't exist, it will be created as long as the // transaction is writable. -func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) { +func GetAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) { var err error w := tx.Writable() @@ -108,12 +108,12 @@ func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) { return alloc, nil } -// getTaskBucket returns the bucket used to persist state about a +// GetTaskBucket returns the bucket used to persist state about a // particular task. If the root allocation bucket, the specific // allocation or task bucket doesn't exist, they will be created as long as the // transaction is writable. -func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error) { - alloc, err := getAllocationBucket(tx, allocID) +func GetTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error) { + alloc, err := GetAllocationBucket(tx, allocID) if err != nil { return nil, err } @@ -136,8 +136,8 @@ func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error) return task, nil } -// deleteAllocationBucket is used to delete an allocation bucket if it exists. -func deleteAllocationBucket(tx *bolt.Tx, allocID string) error { +// DeleteAllocationBucket is used to delete an allocation bucket if it exists. +func DeleteAllocationBucket(tx *bolt.Tx, allocID string) error { if !tx.Writable() { return fmt.Errorf("transaction must be writable") } @@ -157,8 +157,8 @@ func deleteAllocationBucket(tx *bolt.Tx, allocID string) error { return allocations.DeleteBucket(key) } -// deleteTaskBucket is used to delete a task bucket if it exists. -func deleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error { +// DeleteTaskBucket is used to delete a task bucket if it exists. +func DeleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error { if !tx.Writable() { return fmt.Errorf("transaction must be writable") } @@ -184,7 +184,7 @@ func deleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error { return alloc.DeleteBucket(key) } -func getAllAllocationIDs(tx *bolt.Tx) ([]string, error) { +func GetAllAllocationIDs(tx *bolt.Tx) ([]string, error) { allocationsBkt := tx.Bucket(allocationsBucket) if allocationsBkt == nil { return nil, nil diff --git a/client/testing.go b/client/testing.go index 4043da298..5b6b87659 100644 --- a/client/testing.go +++ b/client/testing.go @@ -2,6 +2,7 @@ package client import ( "github.com/hashicorp/nomad/client/config" + consulApi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" @@ -37,8 +38,8 @@ func TestClient(t testing.T, cb func(c *config.Config)) *Client { logger := testlog.Logger(t) catalog := consul.NewMockCatalog(logger) - mockService := newMockConsulServiceClient(t) - mockService.logger = logger + mockService := consulApi.NewMockConsulServiceClient(t) + mockService.Logger = logger client, err := NewClient(conf, catalog, mockService, logger) if err != nil { t.Fatalf("err: %v", err) diff --git a/client/util.go b/client/util.go index 32f765550..02ce36a5e 100644 --- a/client/util.go +++ b/client/util.go @@ -1,9 +1,7 @@ package client import ( - "encoding/json" "fmt" - "io/ioutil" "math/rand" "github.com/hashicorp/nomad/nomad/structs" @@ -73,16 +71,3 @@ func shuffleStrings(list []string) { list[i], list[j] = list[j], list[i] } } - -// pre060RestoreState is used to read back in the persisted state for pre v0.6.0 -// state -func pre060RestoreState(path string, data interface{}) error { - buf, err := ioutil.ReadFile(path) - if err != nil { - return err - } - if err := json.Unmarshal(buf, data); err != nil { - return fmt.Errorf("failed to decode state: %v", err) - } - return nil -} diff --git a/command/alloc_status.go b/command/alloc_status.go index 677200da9..773791c59 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -12,7 +12,7 @@ import ( "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/api/contexts" - "github.com/hashicorp/nomad/client" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts" "github.com/posener/complete" ) @@ -425,7 +425,7 @@ func buildDisplayMessage(event *api.TaskEvent) string { desc = strings.Join(parts, ", ") case api.TaskRestarting: in := fmt.Sprintf("Task restarting in %v", time.Duration(event.StartDelay)) - if event.RestartReason != "" && event.RestartReason != client.ReasonWithinPolicy { + if event.RestartReason != "" && event.RestartReason != restarts.ReasonWithinPolicy { desc = fmt.Sprintf("%s - %s", event.RestartReason, in) } else { desc = in