diff --git a/client/alloc_runner.go b/client/alloc_runner.go index f1c33fb72..67df55177 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -31,16 +31,15 @@ type AllocRunner struct { logger *log.Logger consulService *ConsulService - alloc *structs.Allocation - allocLock sync.Mutex - - // Explicit status of allocation. Set when there are failures - allocClientStatus string + alloc *structs.Allocation + allocClientStatus string // Explicit status of allocation. Set when there are failures allocClientDescription string + allocLock sync.Mutex dirtyCh chan struct{} ctx *driver.ExecContext + ctxLock sync.Mutex tasks map[string]*TaskRunner taskStates map[string]*structs.TaskState restored map[string]struct{} @@ -76,7 +75,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStat consulService: consulService, dirtyCh: make(chan struct{}, 1), tasks: make(map[string]*TaskRunner), - taskStates: alloc.TaskStates, + taskStates: copyTaskStates(alloc.TaskStates), restored: make(map[string]struct{}), updateCh: make(chan *structs.Allocation, 8), destroyCh: make(chan struct{}), @@ -112,7 +111,7 @@ func (r *AllocRunner) RestoreState() error { r.restored[name] = struct{}{} task := &structs.Task{Name: name} - tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc, + tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(), task, r.consulService) r.tasks[name] = tr @@ -153,16 +152,27 @@ func (r *AllocRunner) SaveState() error { } func (r *AllocRunner) saveAllocRunnerState() error { + // Create the snapshot. r.taskStatusLock.RLock() - defer r.taskStatusLock.RUnlock() + states := copyTaskStates(r.taskStates) + r.taskStatusLock.RUnlock() + + alloc := r.Alloc() r.allocLock.Lock() - defer r.allocLock.Unlock() + allocClientStatus := r.allocClientStatus + allocClientDescription := r.allocClientDescription + r.allocLock.Unlock() + + r.ctxLock.Lock() + ctx := r.ctx + r.ctxLock.Unlock() + snap := allocRunnerState{ - Alloc: r.alloc, - Context: r.ctx, - AllocClientStatus: r.allocClientStatus, - AllocClientDescription: r.allocClientDescription, - TaskStates: r.taskStates, + Alloc: alloc, + Context: ctx, + AllocClientStatus: allocClientStatus, + AllocClientDescription: allocClientDescription, + TaskStates: states, } return persistState(r.stateFilePath(), &snap) } @@ -186,16 +196,33 @@ func (r *AllocRunner) DestroyContext() error { return r.ctx.AllocDir.Destroy() } +// copyTaskStates returns a copy of the passed task states. +func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState { + copy := make(map[string]*structs.TaskState, len(states)) + for task, state := range states { + copy[task] = state.Copy() + } + return copy +} + // Alloc returns the associated allocation func (r *AllocRunner) Alloc() *structs.Allocation { r.allocLock.Lock() alloc := r.alloc.Copy() + + // The status has explicitely been set. + if r.allocClientStatus != "" || r.allocClientDescription != "" { + alloc.ClientStatus = r.allocClientStatus + alloc.ClientDescription = r.allocClientDescription + r.allocLock.Unlock() + return alloc + } r.allocLock.Unlock() // Scan the task states to determine the status of the alloc var pending, running, dead, failed bool r.taskStatusLock.RLock() - alloc.TaskStates = r.taskStates + alloc.TaskStates = copyTaskStates(r.taskStates) for _, state := range r.taskStates { switch state.State { case structs.TaskStateRunning: @@ -213,13 +240,6 @@ func (r *AllocRunner) Alloc() *structs.Allocation { } r.taskStatusLock.RUnlock() - // The status has explicitely been set. - if r.allocClientStatus != "" || r.allocClientDescription != "" { - alloc.ClientStatus = r.allocClientStatus - alloc.ClientDescription = r.allocClientDescription - return alloc - } - // Determine the alloc status if failed { alloc.ClientStatus = structs.AllocClientStatusFailed @@ -276,8 +296,10 @@ func (r *AllocRunner) syncStatus() error { // setStatus is used to update the allocation status func (r *AllocRunner) setStatus(status, desc string) { - r.alloc.ClientStatus = status - r.alloc.ClientDescription = desc + r.allocLock.Lock() + r.allocClientStatus = status + r.allocClientDescription = desc + r.allocLock.Unlock() select { case r.dirtyCh <- struct{}{}: default: @@ -336,6 +358,7 @@ func (r *AllocRunner) Run() { } // Create the execution context + r.ctxLock.Lock() if r.ctx == nil { allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID)) if err := allocDir.Build(tg.Tasks); err != nil { @@ -345,6 +368,7 @@ func (r *AllocRunner) Run() { } r.ctx = driver.NewExecContext(allocDir, r.alloc.ID) } + r.ctxLock.Unlock() // Check if the allocation is in a terminal status. In this case, we don't // start any of the task runners and directly wait for the destroy signal to @@ -364,8 +388,8 @@ func (r *AllocRunner) Run() { continue } - tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc, - task, r.consulService) + tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(), + task.Copy(), r.consulService) r.tasks[task.Name] = tr go tr.Run() } diff --git a/client/client.go b/client/client.go index 2828b1468..2dbfd0ea0 100644 --- a/client/client.go +++ b/client/client.go @@ -72,8 +72,9 @@ func DefaultConfig() *config.Config { // are expected to register as a schedulable node to the servers, and to // run allocations as determined by the servers. type Client struct { - config *config.Config - start time.Time + config *config.Config + configLock sync.RWMutex + start time.Time logger *log.Logger @@ -409,7 +410,9 @@ func (c *Client) restoreState() error { for _, entry := range list { id := entry.Name() alloc := &structs.Allocation{ID: id} - ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulService) + c.configLock.RLock() + ar := NewAllocRunner(c.logger, c.config.Copy(), c.updateAllocStatus, alloc, c.consulService) + c.configLock.RUnlock() c.allocs[id] = ar if err := ar.RestoreState(); err != nil { c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err) @@ -524,7 +527,10 @@ func (c *Client) fingerprint() error { if err != nil { return err } - applies, err := fingerprint.FingerprintLocked(f, c.config, c.config.Node) + + c.configLock.Lock() + applies, err := f.Fingerprint(c.config, c.config.Node) + c.configLock.Unlock() if err != nil { return err } @@ -552,9 +558,11 @@ func (c *Client) fingerprintPeriodic(name string, f fingerprint.Fingerprint, d t for { select { case <-time.After(d): - if _, err := fingerprint.FingerprintLocked(f, c.config, c.config.Node); err != nil { + c.configLock.Lock() + if _, err := f.Fingerprint(c.config, c.config.Node); err != nil { c.logger.Printf("[DEBUG] client: periodic fingerprinting for %v failed: %v", name, err) } + c.configLock.Unlock() case <-c.shutdownCh: return } @@ -582,7 +590,9 @@ func (c *Client) setupDrivers() error { if err != nil { return err } - applies, err := fingerprint.FingerprintLocked(d, c.config, c.config.Node) + c.configLock.Lock() + applies, err := d.Fingerprint(c.config, c.config.Node) + c.configLock.Unlock() if err != nil { return err } @@ -664,6 +674,8 @@ func (c *Client) run() { // determine if the node properties have changed. It returns the new hash values // in case they are different from the old hash values. func (c *Client) hasNodeChanged(oldAttrHash uint64, oldMetaHash uint64) (bool, uint64, uint64) { + c.configLock.RLock() + defer c.configLock.RUnlock() newAttrHash, err := hashstructure.Hash(c.config.Node.Attributes, nil) if err != nil { c.logger.Printf("[DEBUG] client: unable to calculate node attributes hash: %v", err) @@ -919,7 +931,7 @@ func (c *Client) runAllocs(update *allocUpdates) { c.allocLock.RLock() exist := make([]*structs.Allocation, 0, len(c.allocs)) for _, ar := range c.allocs { - exist = append(exist, ar.Alloc()) + exist = append(exist, ar.alloc) } c.allocLock.RUnlock() @@ -988,7 +1000,9 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error { func (c *Client) addAlloc(alloc *structs.Allocation) error { c.allocLock.Lock() defer c.allocLock.Unlock() - ar := NewAllocRunner(c.logger, c.config, c.updateAllocStatus, alloc, c.consulService) + c.configLock.RLock() + ar := NewAllocRunner(c.logger, c.config.Copy(), c.updateAllocStatus, alloc, c.consulService) + c.configLock.RUnlock() c.allocs[alloc.ID] = ar go ar.Run() return nil diff --git a/client/config/config.go b/client/config/config.go index dd2c2b967..c6a81fdfb 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -8,6 +8,7 @@ import ( "time" "github.com/hashicorp/nomad/nomad/structs" + "github.com/mitchellh/copystructure" ) // RPCHandler can be provided to the Client if there is a local server @@ -72,6 +73,19 @@ type Config struct { Options map[string]string } +func (c *Config) Copy() *Config { + log := c.LogOutput + c.LogOutput = nil + i, err := copystructure.Copy(c) + c.LogOutput = log + if err != nil { + return nil + } + copy := i.(*Config) + copy.LogOutput = log + return copy +} + // Read returns the specified configuration value or "". func (c *Config) Read(id string) string { val, ok := c.Options[id] diff --git a/client/fingerprint/fingerprint.go b/client/fingerprint/fingerprint.go index 111a7062e..c9195253b 100644 --- a/client/fingerprint/fingerprint.go +++ b/client/fingerprint/fingerprint.go @@ -3,7 +3,6 @@ package fingerprint import ( "fmt" "log" - "sync" "time" "github.com/hashicorp/nomad/client/config" @@ -28,12 +27,6 @@ var BuiltinFingerprints = []string{ "storage", } -var ( - // NodeLock ensures that only a single fingerprinter is running at a time - // when using the FingerprintLocked method. - NodeLock sync.Mutex -) - // builtinFingerprintMap contains the built in registered fingerprints // which are available, corresponding to a key found in BuiltinFingerprints var builtinFingerprintMap = map[string]Factory{ @@ -88,10 +81,3 @@ type StaticFingerprinter struct{} func (s *StaticFingerprinter) Periodic() (bool, time.Duration) { return false, EmptyDuration } - -// FingerprintLocked is used to fingerprint in a thread-safe manner. -func FingerprintLocked(f Fingerprint, config *config.Config, node *structs.Node) (bool, error) { - NodeLock.Lock() - defer NodeLock.Unlock() - return f.Fingerprint(config, node) -} diff --git a/client/task_runner.go b/client/task_runner.go index 83a29bb65..f27a38107 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -29,9 +29,10 @@ type TaskRunner struct { restartTracker *RestartTracker consulService *ConsulService - task *structs.Task - updateCh chan *structs.Allocation - handle driver.DriverHandle + task *structs.Task + updateCh chan *structs.Allocation + handle driver.DriverHandle + handleLock sync.Mutex destroy bool destroyCh chan struct{} @@ -127,7 +128,9 @@ func (r *TaskRunner) RestoreState() error { r.task.Name, r.alloc.ID, err) return nil } + r.handleLock.Lock() r.handle = handle + r.handleLock.Unlock() } return nil } @@ -139,9 +142,11 @@ func (r *TaskRunner) SaveState() error { snap := taskRunnerState{ Task: r.task, } + r.handleLock.Lock() if r.handle != nil { snap.HandleID = r.handle.ID() } + r.handleLock.Unlock() return persistState(r.stateFilePath(), &snap) } @@ -163,7 +168,10 @@ func (r *TaskRunner) setState(state string, event *structs.TaskEvent) { // createDriver makes a driver for the task func (r *TaskRunner) createDriver() (driver.Driver, error) { - taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, r.config.Node, r.task) + // Create a copy of the node. + // TODO REMOVE + node := r.config.Node.Copy() + taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, node, r.task) if err != nil { err = fmt.Errorf("failed to create driver '%s' for alloc %s: %v", r.task.Driver, r.alloc.ID, err) @@ -203,7 +211,9 @@ func (r *TaskRunner) startTask() error { r.setState(structs.TaskStateDead, e) return err } + r.handleLock.Lock() r.handle = handle + r.handleLock.Unlock() r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) return nil } @@ -222,7 +232,10 @@ func (r *TaskRunner) run() { var forceStart bool for { // Start the task if not yet started or it is being forced. - if r.handle == nil || forceStart { + r.handleLock.Lock() + handleEmpty := r.handle == nil + r.handleLock.Unlock() + if handleEmpty || forceStart { forceStart = false if err := r.startTask(); err != nil { return @@ -339,11 +352,13 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { // Update will update resources and store the new kill timeout. var mErr multierror.Error + r.handleLock.Lock() if r.handle != nil { if err := r.handle.Update(updatedTask); err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err)) } } + r.handleLock.Unlock() // Update the restart policy. if r.restartTracker != nil { diff --git a/demo/vagrant/client1.hcl b/demo/vagrant/client1.hcl index 3e8a2a0c7..61a46ebdf 100644 --- a/demo/vagrant/client1.hcl +++ b/demo/vagrant/client1.hcl @@ -1,9 +1,11 @@ # Increase log verbosity -log_level = "DEBUG" +log_level = "INFO" # Setup data dir data_dir = "/tmp/client1" +enable_debug = true + # Enable the client client { enabled = true @@ -13,6 +15,9 @@ client { # like Consul used for service discovery. servers = ["127.0.0.1:4647"] node_class = "foo" + options { + "driver.raw_exec.enable" = "1" + } } # Modify our port to avoid a collision with server1 diff --git a/demo/vagrant/server.hcl b/demo/vagrant/server.hcl index 653b2c037..280f44a66 100644 --- a/demo/vagrant/server.hcl +++ b/demo/vagrant/server.hcl @@ -1,5 +1,5 @@ # Increase log verbosity -log_level = "DEBUG" +log_level = "INFO" # Setup data dir data_dir = "/tmp/server1" diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e16250941..da544b346 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -542,6 +542,14 @@ type Node struct { ModifyIndex uint64 } +func (n *Node) Copy() *Node { + i, err := copystructure.Copy(n) + if err != nil { + return nil + } + return i.(*Node) +} + // TerminalStatus returns if the current status is terminal and // will no longer transition. func (n *Node) TerminalStatus() bool { @@ -1478,6 +1486,14 @@ type Task struct { KillTimeout time.Duration `mapstructure:"kill_timeout"` } +func (t *Task) Copy() *Task { + i, err := copystructure.Copy(t) + if err != nil { + return nil + } + return i.(*Task) +} + // InitFields initializes fields in the task. func (t *Task) InitFields(job *Job, tg *TaskGroup) { t.InitServiceFields(job.Name, tg.Name)