mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 09:55:44 +03:00
Edge trigger evaluation when allocations client status is failed
This commit is contained in:
@@ -1333,6 +1333,8 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
|
||||
// send the fields that are updatable by the client.
|
||||
stripped := new(structs.Allocation)
|
||||
stripped.ID = alloc.ID
|
||||
stripped.JobID = alloc.JobID
|
||||
stripped.Namespace = alloc.Namespace
|
||||
stripped.NodeID = c.NodeID()
|
||||
stripped.TaskStates = alloc.TaskStates
|
||||
stripped.ClientStatus = alloc.ClientStatus
|
||||
|
||||
15
nomad/fsm.go
15
nomad/fsm.go
@@ -476,13 +476,16 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} {
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
return n.upsertEvals(index, req.Evals)
|
||||
}
|
||||
|
||||
if err := n.state.UpsertEvals(index, req.Evals); err != nil {
|
||||
func (n *nomadFSM) upsertEvals(index uint64, evals []*structs.Evaluation) error {
|
||||
if err := n.state.UpsertEvals(index, evals); err != nil {
|
||||
n.logger.Printf("[ERR] nomad.fsm: UpsertEvals failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
for _, eval := range req.Evals {
|
||||
for _, eval := range evals {
|
||||
if eval.ShouldEnqueue() {
|
||||
n.evalBroker.Enqueue(eval)
|
||||
} else if eval.ShouldBlock() {
|
||||
@@ -582,6 +585,14 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
|
||||
return err
|
||||
}
|
||||
|
||||
// Update any evals
|
||||
if len(req.Evals) > 0 {
|
||||
if err := n.upsertEvals(index, req.Evals); err != nil {
|
||||
n.logger.Printf("[ERR] nomad.fsm: UpdateAllocFromClient failed: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Unblock evals for the nodes computed node class if the client has
|
||||
// finished running an allocation.
|
||||
for _, alloc := range req.Alloc {
|
||||
|
||||
@@ -820,10 +820,48 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
|
||||
return fmt.Errorf("must update at least one allocation")
|
||||
}
|
||||
|
||||
// Ensure that evals field is empty.
|
||||
if len(args.Evals) != 0 {
|
||||
return fmt.Errorf("evals field must not be set ")
|
||||
}
|
||||
|
||||
// Update modified timestamp for client initiated allocation updates
|
||||
now := time.Now().UTC().UnixNano()
|
||||
var evals []*structs.Evaluation
|
||||
|
||||
for _, alloc := range args.Alloc {
|
||||
alloc.ModifyTime = now
|
||||
|
||||
// Add an evaluation if this is a failed alloc that is eligible for rescheduling
|
||||
if alloc.ClientStatus == structs.AllocClientStatusFailed {
|
||||
ws := memdb.NewWatchSet()
|
||||
job, err := n.srv.State().JobByID(ws, alloc.Namespace, alloc.JobID)
|
||||
if err != nil {
|
||||
n.srv.logger.Printf("[ERR] nomad.client: Unable to find jobid %v", alloc.JobID)
|
||||
return err
|
||||
}
|
||||
if job == nil {
|
||||
return fmt.Errorf("[ERR] nomad.client: Unable to find jobid %v", alloc.JobID)
|
||||
}
|
||||
// Only create evaluations if this is an existing alloc and eligible as per its task group's ReschedulePolicy
|
||||
if existingAlloc, _ := n.srv.State().AllocByID(ws, alloc.ID); existingAlloc != nil {
|
||||
taskGroup := job.LookupTaskGroup(existingAlloc.TaskGroup)
|
||||
if taskGroup != nil && existingAlloc.RescheduleEligible(taskGroup.ReschedulePolicy) {
|
||||
eval := &structs.Evaluation{
|
||||
ID: uuid.Generate(),
|
||||
Namespace: alloc.Namespace,
|
||||
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
|
||||
JobID: alloc.JobID,
|
||||
Type: job.Type,
|
||||
Status: structs.EvalStatusPending,
|
||||
}
|
||||
evals = append(evals, eval)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(evals) > 0 {
|
||||
n.srv.logger.Printf("[DEBUG] nomad.client: Adding %v evaluations for rescheduling", len(evals))
|
||||
}
|
||||
// Add this to the batch
|
||||
n.updatesLock.Lock()
|
||||
@@ -845,7 +883,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
|
||||
n.updatesLock.Unlock()
|
||||
|
||||
// Perform the batch update
|
||||
n.batchUpdate(future, updates)
|
||||
n.batchUpdate(future, updates, evals)
|
||||
})
|
||||
}
|
||||
n.updatesLock.Unlock()
|
||||
@@ -861,10 +899,11 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
|
||||
}
|
||||
|
||||
// batchUpdate is used to update all the allocations
|
||||
func (n *Node) batchUpdate(future *batchFuture, updates []*structs.Allocation) {
|
||||
func (n *Node) batchUpdate(future *batchFuture, updates []*structs.Allocation, evals []*structs.Evaluation) {
|
||||
// Prepare the batch update
|
||||
batch := &structs.AllocUpdateRequest{
|
||||
Alloc: updates,
|
||||
Evals: evals,
|
||||
WriteRequest: structs.WriteRequest{Region: n.srv.config.Region},
|
||||
}
|
||||
|
||||
|
||||
@@ -1662,7 +1662,7 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Inject fake evaluations
|
||||
// Inject fake allocations
|
||||
alloc := mock.Alloc()
|
||||
alloc.NodeID = node.ID
|
||||
state := s1.fsm.State()
|
||||
@@ -1672,6 +1672,14 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Inject mock job
|
||||
job := mock.Job()
|
||||
job.ID = alloc.JobID
|
||||
err = state.UpsertJob(101, job)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Attempt update
|
||||
clientAlloc := new(structs.Allocation)
|
||||
*clientAlloc = *alloc
|
||||
@@ -1747,7 +1755,7 @@ func TestClientEndpoint_BatchUpdate(t *testing.T) {
|
||||
// Call to do the batch update
|
||||
bf := NewBatchFuture()
|
||||
endpoint := s1.endpoints.Node
|
||||
endpoint.batchUpdate(bf, []*structs.Allocation{clientAlloc})
|
||||
endpoint.batchUpdate(bf, []*structs.Allocation{clientAlloc}, nil)
|
||||
if err := bf.Wait(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -1806,6 +1814,14 @@ func TestClientEndpoint_UpdateAlloc_Vault(t *testing.T) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Inject mock job
|
||||
job := mock.Job()
|
||||
job.ID = alloc.JobID
|
||||
err := state.UpsertJob(101, job)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Attempt update
|
||||
clientAlloc := new(structs.Allocation)
|
||||
*clientAlloc = *alloc
|
||||
|
||||
@@ -393,7 +393,7 @@ func correctDeploymentCanaries(result *structs.PlanResult) {
|
||||
}
|
||||
}
|
||||
|
||||
// evaluateNodePlan is used to evalute the plan for a single node,
|
||||
// evaluateNodePlan is used to evaluate the plan for a single node,
|
||||
// returning if the plan is valid or if an error is encountered
|
||||
func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID string) (bool, string, error) {
|
||||
// If this is an evict-only plan, it always 'fits' since we are removing things.
|
||||
|
||||
@@ -533,6 +533,10 @@ type AllocUpdateRequest struct {
|
||||
// Alloc is the list of new allocations to assign
|
||||
Alloc []*Allocation
|
||||
|
||||
// Evals is the list of new evaluations to create
|
||||
// Evals are valid only when used in the Raft RPC
|
||||
Evals []*Evaluation
|
||||
|
||||
// Job is the shared parent job of the allocations.
|
||||
// It is pulled out since it is common to reduce payload size.
|
||||
Job *Job
|
||||
|
||||
Reference in New Issue
Block a user