diff --git a/client/client.go b/client/client.go index 314b9dda4..8bf2ce40e 100644 --- a/client/client.go +++ b/client/client.go @@ -1333,6 +1333,8 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) { // send the fields that are updatable by the client. stripped := new(structs.Allocation) stripped.ID = alloc.ID + stripped.JobID = alloc.JobID + stripped.Namespace = alloc.Namespace stripped.NodeID = c.NodeID() stripped.TaskStates = alloc.TaskStates stripped.ClientStatus = alloc.ClientStatus diff --git a/nomad/fsm.go b/nomad/fsm.go index 61c14bfe4..ec1d81a38 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -476,13 +476,16 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} { if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + return n.upsertEvals(index, req.Evals) +} - if err := n.state.UpsertEvals(index, req.Evals); err != nil { +func (n *nomadFSM) upsertEvals(index uint64, evals []*structs.Evaluation) error { + if err := n.state.UpsertEvals(index, evals); err != nil { n.logger.Printf("[ERR] nomad.fsm: UpsertEvals failed: %v", err) return err } - for _, eval := range req.Evals { + for _, eval := range evals { if eval.ShouldEnqueue() { n.evalBroker.Enqueue(eval) } else if eval.ShouldBlock() { @@ -582,6 +585,14 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return err } + // Update any evals + if len(req.Evals) > 0 { + if err := n.upsertEvals(index, req.Evals); err != nil { + n.logger.Printf("[ERR] nomad.fsm: UpdateAllocFromClient failed: %v", err) + return err + } + } + // Unblock evals for the nodes computed node class if the client has // finished running an allocation. for _, alloc := range req.Alloc { diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 7f4265fb9..a31269c8e 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -820,10 +820,48 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene return fmt.Errorf("must update at least one allocation") } + // Ensure that evals field is empty. + if len(args.Evals) != 0 { + return fmt.Errorf("evals field must not be set ") + } + // Update modified timestamp for client initiated allocation updates now := time.Now().UTC().UnixNano() + var evals []*structs.Evaluation + for _, alloc := range args.Alloc { alloc.ModifyTime = now + + // Add an evaluation if this is a failed alloc that is eligible for rescheduling + if alloc.ClientStatus == structs.AllocClientStatusFailed { + ws := memdb.NewWatchSet() + job, err := n.srv.State().JobByID(ws, alloc.Namespace, alloc.JobID) + if err != nil { + n.srv.logger.Printf("[ERR] nomad.client: Unable to find jobid %v", alloc.JobID) + return err + } + if job == nil { + return fmt.Errorf("[ERR] nomad.client: Unable to find jobid %v", alloc.JobID) + } + // Only create evaluations if this is an existing alloc and eligible as per its task group's ReschedulePolicy + if existingAlloc, _ := n.srv.State().AllocByID(ws, alloc.ID); existingAlloc != nil { + taskGroup := job.LookupTaskGroup(existingAlloc.TaskGroup) + if taskGroup != nil && existingAlloc.RescheduleEligible(taskGroup.ReschedulePolicy) { + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: alloc.Namespace, + TriggeredBy: structs.EvalTriggerRetryFailedAlloc, + JobID: alloc.JobID, + Type: job.Type, + Status: structs.EvalStatusPending, + } + evals = append(evals, eval) + } + } + } + } + if len(evals) > 0 { + n.srv.logger.Printf("[DEBUG] nomad.client: Adding %v evaluations for rescheduling", len(evals)) } // Add this to the batch n.updatesLock.Lock() @@ -845,7 +883,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene n.updatesLock.Unlock() // Perform the batch update - n.batchUpdate(future, updates) + n.batchUpdate(future, updates, evals) }) } n.updatesLock.Unlock() @@ -861,10 +899,11 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene } // batchUpdate is used to update all the allocations -func (n *Node) batchUpdate(future *batchFuture, updates []*structs.Allocation) { +func (n *Node) batchUpdate(future *batchFuture, updates []*structs.Allocation, evals []*structs.Evaluation) { // Prepare the batch update batch := &structs.AllocUpdateRequest{ Alloc: updates, + Evals: evals, WriteRequest: structs.WriteRequest{Region: n.srv.config.Region}, } diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 909e2a637..eee73885f 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1662,7 +1662,7 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) { t.Fatalf("err: %v", err) } - // Inject fake evaluations + // Inject fake allocations alloc := mock.Alloc() alloc.NodeID = node.ID state := s1.fsm.State() @@ -1672,6 +1672,14 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) { t.Fatalf("err: %v", err) } + // Inject mock job + job := mock.Job() + job.ID = alloc.JobID + err = state.UpsertJob(101, job) + if err != nil { + t.Fatalf("err: %v", err) + } + // Attempt update clientAlloc := new(structs.Allocation) *clientAlloc = *alloc @@ -1747,7 +1755,7 @@ func TestClientEndpoint_BatchUpdate(t *testing.T) { // Call to do the batch update bf := NewBatchFuture() endpoint := s1.endpoints.Node - endpoint.batchUpdate(bf, []*structs.Allocation{clientAlloc}) + endpoint.batchUpdate(bf, []*structs.Allocation{clientAlloc}, nil) if err := bf.Wait(); err != nil { t.Fatalf("err: %v", err) } @@ -1806,6 +1814,14 @@ func TestClientEndpoint_UpdateAlloc_Vault(t *testing.T) { t.Fatalf("err: %v", err) } + // Inject mock job + job := mock.Job() + job.ID = alloc.JobID + err := state.UpsertJob(101, job) + if err != nil { + t.Fatalf("err: %v", err) + } + // Attempt update clientAlloc := new(structs.Allocation) *clientAlloc = *alloc diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 44f78e2c8..149661694 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -393,7 +393,7 @@ func correctDeploymentCanaries(result *structs.PlanResult) { } } -// evaluateNodePlan is used to evalute the plan for a single node, +// evaluateNodePlan is used to evaluate the plan for a single node, // returning if the plan is valid or if an error is encountered func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID string) (bool, string, error) { // If this is an evict-only plan, it always 'fits' since we are removing things. diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 57e3fdb91..2a894d7a3 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -533,6 +533,10 @@ type AllocUpdateRequest struct { // Alloc is the list of new allocations to assign Alloc []*Allocation + // Evals is the list of new evaluations to create + // Evals are valid only when used in the Raft RPC + Evals []*Evaluation + // Job is the shared parent job of the allocations. // It is pulled out since it is common to reduce payload size. Job *Job