mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
client: implement all-or-nothing alloc restoration
Restoring calls NewAR -> Restore -> Run NewAR now calls NewTR AR.Restore calls TR.Restore AR.Run calls TR.Run
This commit is contained in:
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/allocrunnerv2/state"
|
||||
"github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
clientstate "github.com/hashicorp/nomad/client/state"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
@@ -57,12 +58,18 @@ type allocRunner struct {
|
||||
}
|
||||
|
||||
// NewAllocRunner returns a new allocation runner.
|
||||
func NewAllocRunner(config *Config) *allocRunner {
|
||||
func NewAllocRunner(config *Config) (*allocRunner, error) {
|
||||
alloc := config.Alloc
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
if tg == nil {
|
||||
return nil, fmt.Errorf("failed to lookup task group %q", alloc.TaskGroup)
|
||||
}
|
||||
|
||||
ar := &allocRunner{
|
||||
alloc: alloc,
|
||||
clientConfig: config.ClientConfig,
|
||||
vaultClient: config.Vault,
|
||||
alloc: config.Alloc,
|
||||
tasks: make(map[string]*taskrunner.TaskRunner),
|
||||
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
|
||||
waitCh: make(chan struct{}),
|
||||
updateCh: make(chan *structs.Allocation),
|
||||
stateDB: config.StateDB,
|
||||
@@ -70,15 +77,44 @@ func NewAllocRunner(config *Config) *allocRunner {
|
||||
|
||||
// Create alloc dir
|
||||
//XXX update AllocDir to hc log
|
||||
ar.allocDir = allocdir.NewAllocDir(nil, filepath.Join(config.ClientConfig.AllocDir, config.Alloc.ID))
|
||||
ar.allocDir = allocdir.NewAllocDir(nil, filepath.Join(config.ClientConfig.AllocDir, alloc.ID))
|
||||
|
||||
// Create the logger based on the allocation ID
|
||||
ar.logger = config.Logger.With("alloc_id", config.Alloc.ID)
|
||||
ar.logger = config.Logger.With("alloc_id", alloc.ID)
|
||||
|
||||
// Initialize the runners hooks.
|
||||
ar.initRunnerHooks()
|
||||
|
||||
return ar
|
||||
// Create the TaskRunners
|
||||
if err := ar.initTaskRunners(tg.Tasks); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ar, nil
|
||||
}
|
||||
|
||||
// initTaskRunners creates task runners but does *not* run them.
|
||||
func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
|
||||
for _, task := range tasks {
|
||||
config := &taskrunner.Config{
|
||||
Alloc: ar.alloc,
|
||||
ClientConfig: ar.clientConfig,
|
||||
Task: task,
|
||||
TaskDir: ar.allocDir.NewTaskDir(task.Name),
|
||||
Logger: ar.logger,
|
||||
StateDB: ar.stateDB,
|
||||
VaultClient: ar.vaultClient,
|
||||
}
|
||||
|
||||
// Create, but do not Run, the task runner
|
||||
tr, err := taskrunner.NewTaskRunner(config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed creating runner for task %q: %v", task.Name, err)
|
||||
}
|
||||
|
||||
ar.tasks[task.Name] = tr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ar *allocRunner) WaitCh() <-chan struct{} {
|
||||
@@ -91,7 +127,6 @@ func (ar *allocRunner) Run() {
|
||||
// Close the wait channel
|
||||
defer close(ar.waitCh)
|
||||
|
||||
var err error
|
||||
var taskWaitCh <-chan struct{}
|
||||
|
||||
// Run the prestart hooks
|
||||
@@ -102,10 +137,7 @@ func (ar *allocRunner) Run() {
|
||||
}
|
||||
|
||||
// Run the runners
|
||||
taskWaitCh, err = ar.runImpl()
|
||||
if err != nil {
|
||||
ar.logger.Error("starting tasks failed", "error", err)
|
||||
}
|
||||
taskWaitCh = ar.runImpl()
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -130,20 +162,9 @@ POST:
|
||||
}
|
||||
|
||||
// runImpl is used to run the runners.
|
||||
func (ar *allocRunner) runImpl() (<-chan struct{}, error) {
|
||||
// Grab the task group
|
||||
alloc := ar.Alloc()
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
if tg == nil {
|
||||
// XXX Fail and exit
|
||||
ar.logger.Error("failed to lookup task group", "task_group", alloc.TaskGroup)
|
||||
return nil, fmt.Errorf("failed to lookup task group %q", alloc.TaskGroup)
|
||||
}
|
||||
|
||||
for _, task := range tg.Tasks {
|
||||
if err := ar.runTask(alloc, task); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
func (ar *allocRunner) runImpl() <-chan struct{} {
|
||||
for _, task := range ar.tasks {
|
||||
go task.Run()
|
||||
}
|
||||
|
||||
// Return a combined WaitCh that is closed when all task runners have
|
||||
@@ -156,32 +177,7 @@ func (ar *allocRunner) runImpl() (<-chan struct{}, error) {
|
||||
}
|
||||
}()
|
||||
|
||||
return waitCh, nil
|
||||
}
|
||||
|
||||
// runTask is used to run a task.
|
||||
func (ar *allocRunner) runTask(alloc *structs.Allocation, task *structs.Task) error {
|
||||
// Create the runner
|
||||
config := &taskrunner.Config{
|
||||
Alloc: alloc,
|
||||
ClientConfig: ar.clientConfig,
|
||||
Task: task,
|
||||
TaskDir: ar.allocDir.NewTaskDir(task.Name),
|
||||
Logger: ar.logger,
|
||||
StateDB: ar.stateDB,
|
||||
VaultClient: ar.vaultClient,
|
||||
}
|
||||
tr, err := taskrunner.NewTaskRunner(config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start the runner
|
||||
go tr.Run()
|
||||
|
||||
// Store the runner
|
||||
ar.tasks[task.Name] = tr
|
||||
return nil
|
||||
return waitCh
|
||||
}
|
||||
|
||||
// Alloc returns the current allocation being run by this runner.
|
||||
@@ -193,9 +189,29 @@ func (ar *allocRunner) Alloc() *structs.Allocation {
|
||||
}
|
||||
|
||||
// SaveState does all the state related stuff. Who knows. FIXME
|
||||
//XXX
|
||||
//XXX do we need to do periodic syncing? if Saving is only called *before* Run
|
||||
// *and* within Run -- *and* Updates are applid within Run -- we may be able to
|
||||
// skip quite a bit of locking? maybe?
|
||||
func (ar *allocRunner) SaveState() error {
|
||||
return nil
|
||||
return ar.stateDB.Update(func(tx *bolt.Tx) error {
|
||||
//XXX Track EvalID to only write alloc on change?
|
||||
// Write the allocation
|
||||
return clientstate.PutAllocation(tx, ar.Alloc())
|
||||
})
|
||||
}
|
||||
|
||||
// Restore state from database. Must be called after NewAllocRunner but before
|
||||
// Run.
|
||||
func (ar *allocRunner) Restore() error {
|
||||
return ar.stateDB.View(func(tx *bolt.Tx) error {
|
||||
// Restore task runners
|
||||
for _, tr := range ar.tasks {
|
||||
if err := tr.Restore(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Update the running allocation with a new version received from the server.
|
||||
|
||||
@@ -5,6 +5,13 @@ import (
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
)
|
||||
|
||||
var (
|
||||
// taskRunnerStateAllKey holds all the task runners state. At the moment
|
||||
// there is no need to split it
|
||||
//XXX refactor out of client/state and taskrunner
|
||||
taskRunnerStateAllKey = []byte("simple-all")
|
||||
)
|
||||
|
||||
// LocalState is Task state which is persisted for use when restarting Nomad
|
||||
// agents.
|
||||
type LocalState struct {
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/driver"
|
||||
"github.com/hashicorp/nomad/client/driver/env"
|
||||
oldstate "github.com/hashicorp/nomad/client/state"
|
||||
clientstate "github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/ugorji/go/codec"
|
||||
@@ -42,6 +42,7 @@ const (
|
||||
var (
|
||||
// taskRunnerStateAllKey holds all the task runners state. At the moment
|
||||
// there is no need to split it
|
||||
//XXX refactor out of clientstate and new state
|
||||
taskRunnerStateAllKey = []byte("simple-all")
|
||||
)
|
||||
|
||||
@@ -446,16 +447,15 @@ func (tr *TaskRunner) persistLocalState() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start the transaction.
|
||||
return tr.stateDB.Batch(func(tx *bolt.Tx) error {
|
||||
return tr.stateDB.Update(func(tx *bolt.Tx) error {
|
||||
// Grab the task bucket
|
||||
//XXX move into new state pkg
|
||||
taskBkt, err := oldstate.GetTaskBucket(tx, tr.allocID, tr.taskName)
|
||||
taskBkt, err := clientstate.GetTaskBucket(tx, tr.allocID, tr.taskName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
|
||||
}
|
||||
|
||||
if err := oldstate.PutData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil {
|
||||
if err := clientstate.PutData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil {
|
||||
return fmt.Errorf("failed to write task_runner state: %v", err)
|
||||
}
|
||||
|
||||
@@ -468,6 +468,24 @@ func (tr *TaskRunner) persistLocalState() error {
|
||||
})
|
||||
}
|
||||
|
||||
// Restore task runner state. Called by AllocRunner.Restore after NewTaskRunner
|
||||
// but before Run.
|
||||
func (tr *TaskRunner) Restore(tx *bolt.Tx) error {
|
||||
bkt, err := clientstate.GetTaskBucket(tx, tr.allocID, tr.taskName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get task %q bucket: %v", tr.taskName, err)
|
||||
}
|
||||
|
||||
//XXX set persisted hash to avoid immediate write on first use?
|
||||
var ls state.LocalState
|
||||
if err := clientstate.GetObject(bkt, taskRunnerStateAllKey, &ls); err != nil {
|
||||
return fmt.Errorf("failed to read task runner state: %v", err)
|
||||
}
|
||||
tr.localState = &ls
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetState sets the task runners allocation state.
|
||||
func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) {
|
||||
// Ensure the event is populated with human readable strings
|
||||
|
||||
@@ -124,8 +124,9 @@ func (tr *TaskRunner) prestart() error {
|
||||
}
|
||||
tr.localStateLock.Unlock()
|
||||
|
||||
// Persist local state if the hook state has changed
|
||||
// Store and persist local state if the hook state has changed
|
||||
if !hookState.Equal(origHookState) {
|
||||
tr.localState.Hooks[name] = hookState
|
||||
if err := tr.persistLocalState(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
121
client/client.go
121
client/client.go
@@ -32,7 +32,7 @@ import (
|
||||
"github.com/hashicorp/nomad/client/allocrunnerv2"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/servers"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
clientstate "github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/stats"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
@@ -110,6 +110,8 @@ type AllocRunner interface {
|
||||
SaveState() error
|
||||
Update(*structs.Allocation)
|
||||
Alloc() *structs.Allocation
|
||||
Restore() error
|
||||
Run()
|
||||
}
|
||||
|
||||
// Client is used to implement the client interaction with Nomad. Clients
|
||||
@@ -725,6 +727,7 @@ func (c *Client) restoreState() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
//XXX REMOVED! make a note in backward compat / upgrading doc
|
||||
// COMPAT: Remove in 0.7.0
|
||||
// 0.6.0 transitioned from individual state files to a single bolt-db.
|
||||
// The upgrade path is to:
|
||||
@@ -732,74 +735,72 @@ func (c *Client) restoreState() error {
|
||||
// If so, restore from that and delete old state
|
||||
// Restore using state database
|
||||
|
||||
// Allocs holds the IDs of the allocations being restored
|
||||
var allocs []string
|
||||
|
||||
// Upgrading tracks whether this is a pre 0.6.0 upgrade path
|
||||
var upgrading bool
|
||||
|
||||
// Scan the directory
|
||||
allocDir := filepath.Join(c.config.StateDir, "alloc")
|
||||
list, err := ioutil.ReadDir(allocDir)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed to list alloc state: %v", err)
|
||||
} else if err == nil && len(list) != 0 {
|
||||
upgrading = true
|
||||
for _, entry := range list {
|
||||
allocs = append(allocs, entry.Name())
|
||||
}
|
||||
} else {
|
||||
// Normal path
|
||||
err := c.stateDB.View(func(tx *bolt.Tx) error {
|
||||
allocs, err = state.GetAllAllocationIDs(tx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to list allocations: %v", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
// Restore allocations
|
||||
var allocs []*structs.Allocation
|
||||
var err error
|
||||
err = c.stateDB.View(func(tx *bolt.Tx) error {
|
||||
allocs, err = clientstate.GetAllAllocations(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("failed to list allocations: %v", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Load each alloc back
|
||||
var mErr multierror.Error
|
||||
for _, id := range allocs {
|
||||
alloc := &structs.Allocation{ID: id}
|
||||
|
||||
// don't worry about blocking/migrating when restoring
|
||||
watcher := allocrunner.NoopPrevAlloc{}
|
||||
for _, alloc := range allocs {
|
||||
//XXX FIXME create a root logger
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Name: "nomad",
|
||||
Level: hclog.LevelFromString(c.configCopy.LogLevel),
|
||||
TimeFormat: time.RFC3339,
|
||||
})
|
||||
|
||||
c.configLock.RLock()
|
||||
ar := allocrunner.NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, watcher)
|
||||
arConf := &allocrunnerv2.Config{
|
||||
Alloc: alloc,
|
||||
Logger: logger,
|
||||
ClientConfig: c.config,
|
||||
StateDB: c.stateDB,
|
||||
}
|
||||
c.configLock.RUnlock()
|
||||
|
||||
ar, err := allocrunnerv2.NewAllocRunner(arConf)
|
||||
if err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to create alloc %q: %v", alloc.ID, err)
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Restore state
|
||||
if err := ar.Restore(); err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to restore alloc %q: %v", alloc.ID, err)
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
continue
|
||||
}
|
||||
|
||||
//XXX is this locking necessary?
|
||||
c.allocLock.Lock()
|
||||
c.allocs[id] = ar
|
||||
c.allocs[alloc.ID] = ar
|
||||
c.allocLock.Unlock()
|
||||
|
||||
if err := ar.RestoreState(); err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to restore state for alloc %q: %v", id, err)
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
} else {
|
||||
go ar.Run()
|
||||
|
||||
if upgrading {
|
||||
if err := ar.SaveState(); err != nil {
|
||||
c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", id, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete all the entries
|
||||
if upgrading {
|
||||
if err := os.RemoveAll(allocDir); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
// Don't run any allocs if there were any failures
|
||||
//XXX removing this check would switch from all-or-nothing restores to
|
||||
// best-effort. went with all-or-nothing for now
|
||||
if err := mErr.ErrorOrNil(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
// All allocs restored successfully, run them!
|
||||
for _, ar := range c.allocs {
|
||||
go ar.Run()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// saveState is used to snapshot our state into the data dir.
|
||||
@@ -1918,10 +1919,11 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
|
||||
// The long term fix is to pass in the config and node separately and then
|
||||
// we don't have to do a copy.
|
||||
//ar := allocrunner.NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc)
|
||||
//XXX FIXME
|
||||
//XXX FIXME create a root logger
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Name: "nomad",
|
||||
Level: hclog.LevelFromString(c.configCopy.LogLevel),
|
||||
Name: "nomad",
|
||||
Level: hclog.LevelFromString(c.configCopy.LogLevel),
|
||||
TimeFormat: time.RFC3339,
|
||||
})
|
||||
|
||||
c.configLock.RLock()
|
||||
@@ -1934,12 +1936,15 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
|
||||
}
|
||||
c.configLock.RUnlock()
|
||||
|
||||
ar := allocrunnerv2.NewAllocRunner(arConf)
|
||||
ar, err := allocrunnerv2.NewAllocRunner(arConf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Store the alloc runner.
|
||||
c.allocs[alloc.ID] = ar
|
||||
|
||||
//XXX(schmichael) Why do we do this?
|
||||
// Initialize local state
|
||||
if err := ar.SaveState(); err != nil {
|
||||
c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err)
|
||||
}
|
||||
|
||||
@@ -184,20 +184,65 @@ func DeleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error {
|
||||
return alloc.DeleteBucket(key)
|
||||
}
|
||||
|
||||
func GetAllAllocationIDs(tx *bolt.Tx) ([]string, error) {
|
||||
//XXX duplicated in arv2?!
|
||||
var (
|
||||
// The following are the key paths written to the state database
|
||||
allocRunnerStateAllocKey = []byte("alloc")
|
||||
)
|
||||
|
||||
type allocRunnerAllocState struct {
|
||||
Alloc *structs.Allocation
|
||||
}
|
||||
|
||||
func GetAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, error) {
|
||||
allocationsBkt := tx.Bucket(allocationsBucket)
|
||||
if allocationsBkt == nil {
|
||||
// No allocs
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var allocs []*structs.Allocation
|
||||
|
||||
// Create a cursor for iteration.
|
||||
var allocIDs []string
|
||||
c := allocationsBkt.Cursor()
|
||||
|
||||
// Iterate over all the buckets
|
||||
// Iterate over all the allocation buckets
|
||||
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
||||
allocIDs = append(allocIDs, string(k))
|
||||
allocID := string(k)
|
||||
allocBkt := allocationsBkt.Bucket(k)
|
||||
if allocBkt == nil {
|
||||
//XXX merr?
|
||||
return nil, fmt.Errorf("alloc %q missing", allocID)
|
||||
}
|
||||
|
||||
var allocState allocRunnerAllocState
|
||||
if err := GetObject(allocBkt, allocRunnerStateAllocKey, &allocState); err != nil {
|
||||
//XXX merr?
|
||||
return nil, fmt.Errorf("failed to restore alloc %q: %v", allocID, err)
|
||||
}
|
||||
allocs = append(allocs, allocState.Alloc)
|
||||
}
|
||||
|
||||
return allocIDs, nil
|
||||
return allocs, nil
|
||||
}
|
||||
|
||||
// PutAllocation stores an allocation given a writable transaction.
|
||||
func PutAllocation(tx *bolt.Tx, alloc *structs.Allocation) error {
|
||||
// Retrieve the root allocations bucket
|
||||
allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Retrieve the specific allocations bucket
|
||||
key := []byte(alloc.ID)
|
||||
allocBkt, err := allocsBkt.CreateBucketIfNotExists(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
allocState := allocRunnerAllocState{
|
||||
Alloc: alloc,
|
||||
}
|
||||
return PutObject(allocBkt, allocRunnerStateAllocKey, &allocState)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user