mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
client: expose task state to client
The interesting decision in this commit was to expose AR's state and not a fully materialized Allocation struct. AR.clientAlloc builds an Alloc that contains the task state, so I considered simply memoizing and exposing that method. However, that would lead to AR having two awkwardly similar methods: - Alloc() - which returns the server-sent alloc - ClientAlloc() - which returns the fully materialized client alloc Since ClientAlloc() could be memoized it would be just as cheap to call as Alloc(), so why not replace Alloc() entirely? Replacing Alloc() entirely would require Update() to immediately materialize the task states on server-sent Allocs as there may have been local task state changes since the server received an Alloc update. This quickly becomes difficult to reason about: should Update hooks use the TaskStates? Are state changes caused by TR Update hooks immediately reflected in the Alloc? Should AR persist its copy of the Alloc? If so, are its TaskStates canonical or the TaskStates on TR? So! Forget that. Let's separate the static Allocation from the dynamic AR & TR state! - AR.Alloc() is for static Allocation access (often for the Job) - AR.AllocState() is for the dynamic AR & TR runtime state (deployment status, task states, etc). If code needs to know the status of a task: AllocState() If code needs to know the names of tasks: Alloc() It should be very easy for a developer to reason about which method they should call and what they can do with the return values.
This commit is contained in:
@@ -56,9 +56,7 @@ type allocRunner struct {
|
||||
alloc *structs.Allocation
|
||||
allocLock sync.RWMutex
|
||||
|
||||
//XXX implement for local state
|
||||
// state captures the state of the alloc runner
|
||||
state *state.State
|
||||
state *state.State // alloc runner state
|
||||
stateLock sync.RWMutex
|
||||
|
||||
stateDB cstate.StateDB
|
||||
@@ -305,20 +303,22 @@ func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *st
|
||||
ar.stateLock.RLock()
|
||||
defer ar.stateLock.RUnlock()
|
||||
|
||||
// store task states for AllocState to expose
|
||||
ar.state.TaskStates = taskStates
|
||||
|
||||
a := &structs.Allocation{
|
||||
ID: ar.id,
|
||||
TaskStates: taskStates,
|
||||
}
|
||||
|
||||
s := ar.state
|
||||
if d := s.DeploymentStatus; d != nil {
|
||||
if d := ar.state.DeploymentStatus; d != nil {
|
||||
a.DeploymentStatus = d.Copy()
|
||||
}
|
||||
|
||||
// Compute the ClientStatus
|
||||
if s.ClientStatus != "" {
|
||||
if ar.state.ClientStatus != "" {
|
||||
// The client status is being forced
|
||||
a.ClientStatus, a.ClientDescription = s.ClientStatus, s.ClientDescription
|
||||
a.ClientStatus, a.ClientDescription = ar.state.ClientStatus, ar.state.ClientDescription
|
||||
} else {
|
||||
a.ClientStatus, a.ClientDescription = getClientStatus(taskStates)
|
||||
}
|
||||
@@ -388,6 +388,27 @@ func getClientStatus(taskStates map[string]*structs.TaskState) (status, descript
|
||||
return "", ""
|
||||
}
|
||||
|
||||
// AllocState returns a copy of allocation state including a snapshot of task
|
||||
// states.
|
||||
func (ar *allocRunner) AllocState() *state.State {
|
||||
// Must acquire write-lock in case TaskStates needs to be set.
|
||||
ar.stateLock.Lock()
|
||||
defer ar.stateLock.Unlock()
|
||||
|
||||
// If TaskStateUpdated has not been called yet, ar.state.TaskStates
|
||||
// won't be set as it is not the canonical source of TaskStates.
|
||||
if len(ar.state.TaskStates) == 0 {
|
||||
ar.tasksLock.RLock()
|
||||
ar.state.TaskStates = make(map[string]*structs.TaskState, len(ar.tasks))
|
||||
for k, tr := range ar.tasks {
|
||||
ar.state.TaskStates[k] = tr.TaskState()
|
||||
}
|
||||
ar.tasksLock.RUnlock()
|
||||
}
|
||||
|
||||
return ar.state.Copy()
|
||||
}
|
||||
|
||||
// Update the running allocation with a new version received from the server.
|
||||
//
|
||||
// This method sends the updated alloc to Run for serially processing updates.
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// XXX Why its own package?
|
||||
// State captures the state of the allocation runner.
|
||||
type State struct {
|
||||
// ClientStatus captures the overall state of the allocation
|
||||
@@ -18,6 +17,9 @@ type State struct {
|
||||
|
||||
// DeploymentStatus captures the status of the deployment
|
||||
DeploymentStatus *structs.AllocDeploymentStatus
|
||||
|
||||
// TaskStates is a snapshot of task states.
|
||||
TaskStates map[string]*structs.TaskState
|
||||
}
|
||||
|
||||
// SetDeploymentStatus is a helper for updating the client-controlled
|
||||
@@ -43,3 +45,17 @@ func (s *State) ClearDeploymentStatus() {
|
||||
s.DeploymentStatus.Healthy = nil
|
||||
s.DeploymentStatus.Timestamp = time.Time{}
|
||||
}
|
||||
|
||||
// Copy returns a deep copy of State.
|
||||
func (s *State) Copy() *State {
|
||||
taskStates := make(map[string]*structs.TaskState, len(s.TaskStates))
|
||||
for k, v := range s.TaskStates {
|
||||
taskStates[k] = v.Copy()
|
||||
}
|
||||
return &State{
|
||||
ClientStatus: s.ClientStatus,
|
||||
ClientDescription: s.ClientDescription,
|
||||
DeploymentStatus: s.DeploymentStatus.Copy(),
|
||||
TaskStates: taskStates,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -177,17 +177,16 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
|
||||
)
|
||||
|
||||
tr := &TaskRunner{
|
||||
alloc: config.Alloc,
|
||||
allocID: config.Alloc.ID,
|
||||
clientConfig: config.ClientConfig,
|
||||
task: config.Task,
|
||||
taskDir: config.TaskDir,
|
||||
taskName: config.Task.Name,
|
||||
envBuilder: envBuilder,
|
||||
consulClient: config.Consul,
|
||||
vaultClient: config.VaultClient,
|
||||
//XXX Make a Copy to avoid races?
|
||||
state: config.Alloc.TaskStates[config.Task.Name],
|
||||
alloc: config.Alloc,
|
||||
allocID: config.Alloc.ID,
|
||||
clientConfig: config.ClientConfig,
|
||||
task: config.Task,
|
||||
taskDir: config.TaskDir,
|
||||
taskName: config.Task.Name,
|
||||
envBuilder: envBuilder,
|
||||
consulClient: config.Consul,
|
||||
vaultClient: config.VaultClient,
|
||||
state: config.Alloc.TaskStates[config.Task.Name].Copy(),
|
||||
localState: config.LocalState,
|
||||
stateDB: config.StateDB,
|
||||
stateUpdater: config.StateUpdater,
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
consulapi "github.com/hashicorp/consul/api"
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
arstate "github.com/hashicorp/nomad/client/allocrunnerv2/state"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
hstats "github.com/hashicorp/nomad/helper/stats"
|
||||
@@ -103,6 +104,7 @@ type ClientStatsReporter interface {
|
||||
//TODO Create via factory to allow testing Client with mock AllocRunners.
|
||||
type AllocRunner interface {
|
||||
Alloc() *structs.Allocation
|
||||
AllocState() *arstate.State
|
||||
Destroy()
|
||||
GetAllocDir() *allocdir.AllocDir
|
||||
IsDestroyed() bool
|
||||
@@ -637,8 +639,9 @@ func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) {
|
||||
return ar.GetAllocDir(), nil
|
||||
}
|
||||
|
||||
// GetClientAlloc returns the allocation from the client
|
||||
func (c *Client) GetClientAlloc(allocID string) (*structs.Allocation, error) {
|
||||
// GetAllocState returns a copy of an allocation's state on this client. It
|
||||
// returns either an AllocState or an unknown allocation error.
|
||||
func (c *Client) GetAllocState(allocID string) (*arstate.State, error) {
|
||||
c.allocLock.RLock()
|
||||
ar, ok := c.allocs[allocID]
|
||||
c.allocLock.RUnlock()
|
||||
@@ -646,7 +649,7 @@ func (c *Client) GetClientAlloc(allocID string) (*structs.Allocation, error) {
|
||||
return nil, structs.NewErrUnknownAllocation(allocID)
|
||||
}
|
||||
|
||||
return ar.Alloc(), nil
|
||||
return ar.AllocState(), nil
|
||||
}
|
||||
|
||||
// GetServers returns the list of nomad servers this client is aware of.
|
||||
|
||||
@@ -380,7 +380,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
|
||||
return
|
||||
}
|
||||
|
||||
alloc, err := f.c.GetClientAlloc(req.AllocID)
|
||||
allocState, err := f.c.GetAllocState(req.AllocID)
|
||||
if err != nil {
|
||||
code := helper.Int64ToPtr(500)
|
||||
if structs.IsErrUnknownAllocation(err) {
|
||||
@@ -392,19 +392,23 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
|
||||
}
|
||||
|
||||
// Check that the task is there
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
if tg == nil {
|
||||
f.handleStreamResultError(fmt.Errorf("Failed to lookup task group for allocation"),
|
||||
helper.Int64ToPtr(500), encoder)
|
||||
return
|
||||
} else if taskStruct := tg.LookupTask(req.Task); taskStruct == nil {
|
||||
taskState := allocState.TaskStates[req.Task]
|
||||
if taskState == nil {
|
||||
f.handleStreamResultError(
|
||||
fmt.Errorf("task group %q does not have task with name %q", alloc.TaskGroup, req.Task),
|
||||
fmt.Errorf("unknown task name %q", req.Task),
|
||||
helper.Int64ToPtr(400),
|
||||
encoder)
|
||||
return
|
||||
}
|
||||
|
||||
if taskState.StartedAt.IsZero() {
|
||||
f.handleStreamResultError(
|
||||
fmt.Errorf("task %q not started yet. No logs available", req.Task),
|
||||
helper.Int64ToPtr(404),
|
||||
encoder)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ func (s *HTTPServer) rpcHandlerForAlloc(allocID string) (localClient, remoteClie
|
||||
if c != nil {
|
||||
// If there is an error it means that the client doesn't have the
|
||||
// allocation so we can't use the local client
|
||||
_, err := c.GetClientAlloc(allocID)
|
||||
_, err := c.GetAllocState(allocID)
|
||||
if err == nil {
|
||||
localAlloc = true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user