diff --git a/client/allocrunnerv2/alloc_runner.go b/client/allocrunnerv2/alloc_runner.go index b4b47326f..8b1ace5f4 100644 --- a/client/allocrunnerv2/alloc_runner.go +++ b/client/allocrunnerv2/alloc_runner.go @@ -56,9 +56,7 @@ type allocRunner struct { alloc *structs.Allocation allocLock sync.RWMutex - //XXX implement for local state - // state captures the state of the alloc runner - state *state.State + state *state.State // alloc runner state stateLock sync.RWMutex stateDB cstate.StateDB @@ -305,20 +303,22 @@ func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *st ar.stateLock.RLock() defer ar.stateLock.RUnlock() + // store task states for AllocState to expose + ar.state.TaskStates = taskStates + a := &structs.Allocation{ ID: ar.id, TaskStates: taskStates, } - s := ar.state - if d := s.DeploymentStatus; d != nil { + if d := ar.state.DeploymentStatus; d != nil { a.DeploymentStatus = d.Copy() } // Compute the ClientStatus - if s.ClientStatus != "" { + if ar.state.ClientStatus != "" { // The client status is being forced - a.ClientStatus, a.ClientDescription = s.ClientStatus, s.ClientDescription + a.ClientStatus, a.ClientDescription = ar.state.ClientStatus, ar.state.ClientDescription } else { a.ClientStatus, a.ClientDescription = getClientStatus(taskStates) } @@ -388,6 +388,27 @@ func getClientStatus(taskStates map[string]*structs.TaskState) (status, descript return "", "" } +// AllocState returns a copy of allocation state including a snapshot of task +// states. +func (ar *allocRunner) AllocState() *state.State { + // Must acquire write-lock in case TaskStates needs to be set. + ar.stateLock.Lock() + defer ar.stateLock.Unlock() + + // If TaskStateUpdated has not been called yet, ar.state.TaskStates + // won't be set as it is not the canonical source of TaskStates. + if len(ar.state.TaskStates) == 0 { + ar.tasksLock.RLock() + ar.state.TaskStates = make(map[string]*structs.TaskState, len(ar.tasks)) + for k, tr := range ar.tasks { + ar.state.TaskStates[k] = tr.TaskState() + } + ar.tasksLock.RUnlock() + } + + return ar.state.Copy() +} + // Update the running allocation with a new version received from the server. // // This method sends the updated alloc to Run for serially processing updates. diff --git a/client/allocrunnerv2/state/state.go b/client/allocrunnerv2/state/state.go index 85ec7c3da..9754e369c 100644 --- a/client/allocrunnerv2/state/state.go +++ b/client/allocrunnerv2/state/state.go @@ -6,7 +6,6 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -// XXX Why its own package? // State captures the state of the allocation runner. type State struct { // ClientStatus captures the overall state of the allocation @@ -18,6 +17,9 @@ type State struct { // DeploymentStatus captures the status of the deployment DeploymentStatus *structs.AllocDeploymentStatus + + // TaskStates is a snapshot of task states. + TaskStates map[string]*structs.TaskState } // SetDeploymentStatus is a helper for updating the client-controlled @@ -43,3 +45,17 @@ func (s *State) ClearDeploymentStatus() { s.DeploymentStatus.Healthy = nil s.DeploymentStatus.Timestamp = time.Time{} } + +// Copy returns a deep copy of State. +func (s *State) Copy() *State { + taskStates := make(map[string]*structs.TaskState, len(s.TaskStates)) + for k, v := range s.TaskStates { + taskStates[k] = v.Copy() + } + return &State{ + ClientStatus: s.ClientStatus, + ClientDescription: s.ClientDescription, + DeploymentStatus: s.DeploymentStatus.Copy(), + TaskStates: taskStates, + } +} diff --git a/client/allocrunnerv2/taskrunner/task_runner.go b/client/allocrunnerv2/taskrunner/task_runner.go index 84578621b..c206860e2 100644 --- a/client/allocrunnerv2/taskrunner/task_runner.go +++ b/client/allocrunnerv2/taskrunner/task_runner.go @@ -177,17 +177,16 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { ) tr := &TaskRunner{ - alloc: config.Alloc, - allocID: config.Alloc.ID, - clientConfig: config.ClientConfig, - task: config.Task, - taskDir: config.TaskDir, - taskName: config.Task.Name, - envBuilder: envBuilder, - consulClient: config.Consul, - vaultClient: config.VaultClient, - //XXX Make a Copy to avoid races? - state: config.Alloc.TaskStates[config.Task.Name], + alloc: config.Alloc, + allocID: config.Alloc.ID, + clientConfig: config.ClientConfig, + task: config.Task, + taskDir: config.TaskDir, + taskName: config.Task.Name, + envBuilder: envBuilder, + consulClient: config.Consul, + vaultClient: config.VaultClient, + state: config.Alloc.TaskStates[config.Task.Name].Copy(), localState: config.LocalState, stateDB: config.StateDB, stateUpdater: config.StateUpdater, diff --git a/client/client.go b/client/client.go index 1056520fd..2fbeaf7c5 100644 --- a/client/client.go +++ b/client/client.go @@ -18,6 +18,7 @@ import ( consulapi "github.com/hashicorp/consul/api" hclog "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" + arstate "github.com/hashicorp/nomad/client/allocrunnerv2/state" consulApi "github.com/hashicorp/nomad/client/consul" cstructs "github.com/hashicorp/nomad/client/structs" hstats "github.com/hashicorp/nomad/helper/stats" @@ -103,6 +104,7 @@ type ClientStatsReporter interface { //TODO Create via factory to allow testing Client with mock AllocRunners. type AllocRunner interface { Alloc() *structs.Allocation + AllocState() *arstate.State Destroy() GetAllocDir() *allocdir.AllocDir IsDestroyed() bool @@ -637,8 +639,9 @@ func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) { return ar.GetAllocDir(), nil } -// GetClientAlloc returns the allocation from the client -func (c *Client) GetClientAlloc(allocID string) (*structs.Allocation, error) { +// GetAllocState returns a copy of an allocation's state on this client. It +// returns either an AllocState or an unknown allocation error. +func (c *Client) GetAllocState(allocID string) (*arstate.State, error) { c.allocLock.RLock() ar, ok := c.allocs[allocID] c.allocLock.RUnlock() @@ -646,7 +649,7 @@ func (c *Client) GetClientAlloc(allocID string) (*structs.Allocation, error) { return nil, structs.NewErrUnknownAllocation(allocID) } - return ar.Alloc(), nil + return ar.AllocState(), nil } // GetServers returns the list of nomad servers this client is aware of. diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 95f1a9695..ded3ae0aa 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -380,7 +380,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { return } - alloc, err := f.c.GetClientAlloc(req.AllocID) + allocState, err := f.c.GetAllocState(req.AllocID) if err != nil { code := helper.Int64ToPtr(500) if structs.IsErrUnknownAllocation(err) { @@ -392,19 +392,23 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { } // Check that the task is there - tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) - if tg == nil { - f.handleStreamResultError(fmt.Errorf("Failed to lookup task group for allocation"), - helper.Int64ToPtr(500), encoder) - return - } else if taskStruct := tg.LookupTask(req.Task); taskStruct == nil { + taskState := allocState.TaskStates[req.Task] + if taskState == nil { f.handleStreamResultError( - fmt.Errorf("task group %q does not have task with name %q", alloc.TaskGroup, req.Task), + fmt.Errorf("unknown task name %q", req.Task), helper.Int64ToPtr(400), encoder) return } + if taskState.StartedAt.IsZero() { + f.handleStreamResultError( + fmt.Errorf("task %q not started yet. No logs available", req.Task), + helper.Int64ToPtr(404), + encoder) + return + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/command/agent/helpers.go b/command/agent/helpers.go index 50542416c..7248ea5f8 100644 --- a/command/agent/helpers.go +++ b/command/agent/helpers.go @@ -12,7 +12,7 @@ func (s *HTTPServer) rpcHandlerForAlloc(allocID string) (localClient, remoteClie if c != nil { // If there is an error it means that the client doesn't have the // allocation so we can't use the local client - _, err := c.GetClientAlloc(allocID) + _, err := c.GetAllocState(allocID) if err == nil { localAlloc = true }