Merge branch 'main' into f-NMD-763-identity

This commit is contained in:
James Rasell
2025-07-17 07:35:16 +01:00
696 changed files with 26311 additions and 6252 deletions

View File

@@ -730,16 +730,21 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
ar.preKillHooks()
// generate task event for given task runner
taskEventFn := func(tr *taskrunner.TaskRunner) (te *structs.TaskEvent) {
te = structs.NewTaskEvent(structs.TaskKilling).
taskEventFn := func(tr *taskrunner.TaskRunner) *structs.TaskEvent {
// if the task has already finished, do not
// generate an event
if !tr.TaskState().FinishedAt.IsZero() {
return nil
}
te := structs.NewTaskEvent(structs.TaskKilling).
SetKillTimeout(tr.Task().KillTimeout, ar.clientConfig.MaxKillTimeout)
// if the task is not set failed, the task has not finished,
// the job type is batch, and the allocation is being migrated
// then mark the task as failed. this ensures the task is recreated
// if no eligible nodes are immediately available.
// if the task is not set failed, the job type is batch, and the
// allocation is being migrated then mark the task as failed. this
// ensures the task is recreated if no eligible nodes are immediately
// available.
if !tr.TaskState().Failed &&
tr.TaskState().FinishedAt.IsZero() &&
ar.alloc.Job.Type == structs.JobTypeBatch &&
ar.alloc.DesiredTransition.Migrate != nil &&
*ar.alloc.DesiredTransition.Migrate {
@@ -747,7 +752,8 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
ar.logger.Trace("marking migrating batch job task failed on kill", "task_name", tr.Task().Name)
te.SetFailsTask()
}
return
return te
}
// Kill leader first, synchronously

View File

@@ -12,6 +12,7 @@ import (
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -254,6 +255,7 @@ func (ar *allocRunner) update(update *structs.Allocation) error {
}
// postrun is used to run the runners postrun hooks.
// all hooks will run, even if any of them fail, and return a multierror, single error, or nil.
func (ar *allocRunner) postrun() error {
if ar.logger.IsTrace() {
start := time.Now()
@@ -264,6 +266,7 @@ func (ar *allocRunner) postrun() error {
}()
}
var merr multierror.Error
for _, hook := range ar.runnerHooks {
post, ok := hook.(interfaces.RunnerPostrunHook)
if !ok {
@@ -278,7 +281,7 @@ func (ar *allocRunner) postrun() error {
}
if err := post.Postrun(); err != nil {
return fmt.Errorf("hook %q failed: %v", name, err)
merr.Errors = append(merr.Errors, fmt.Errorf("post-run hook %q failed: %w", name, err))
}
if ar.logger.IsTrace() {
@@ -287,7 +290,7 @@ func (ar *allocRunner) postrun() error {
}
}
return nil
return helper.FlattenMultierror(merr.ErrorOrNil())
}
// destroy is used to run the runners destroy hooks. All hooks are run and

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"
@@ -1587,6 +1588,44 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) {
require.Contains(t, last.Message, "by healthy_deadline")
}
// TestAllocRunner_Postrun asserts that all postrun hooks run even when one of them fails
func TestAllocRunner_Postrun(t *testing.T) {
ci.Parallel(t)
alloc := mock.BatchAlloc() // batch alloc runs to completion without a stop signal
conf, cleanup := testAllocRunnerConfig(t, alloc)
t.Cleanup(cleanup)
ar, err := NewAllocRunner(conf)
must.NoError(t, err)
// set up test hooks
good1, good2 := &allocPostrunHook{}, &allocPostrunHook{}
sadErr := errors.New("sad day")
bad := &allocPostrunHook{err: sadErr}
ar.(*allocRunner).runnerHooks = []interfaces.RunnerHook{
good1, bad, good2,
}
go ar.Run()
select {
case <-time.After(500 * time.Millisecond):
t.Errorf("allocrunner timeout")
case <-ar.WaitCh():
}
must.True(t, good1.ran, must.Sprint("first hook should run"))
must.True(t, bad.ran, must.Sprint("second hook should run"))
must.True(t, good2.ran, must.Sprint("third hook should run, even after second failed"))
// check postrun error return directly
err = ar.(*allocRunner).postrun()
must.ErrorIs(t, err, sadErr)
must.Eq(t, `post-run hook "test_postrun" failed: sad day`, err.Error())
}
// TestAllocRunner_Destroy asserts that Destroy kills and cleans up a running
// alloc.
func TestAllocRunner_Destroy(t *testing.T) {
@@ -1958,6 +1997,117 @@ func TestAllocRunner_Batch_KillTG(t *testing.T) {
})
}
// Test that alloc runner kills tasks in task group when stopping and
// fails tasks when job is batch job type and migrating
func TestAllocRunner_KillTG_DeadTasks(t *testing.T) {
ci.Parallel(t)
alloc := mock.BatchAlloc()
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "10s"
alloc.AllocatedResources.Tasks[task.Name] = tr
task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "task 2"
task2.Driver = "mock_driver"
task2.Config["run_for"] = "1ms"
alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2)
alloc.AllocatedResources.Tasks[task2.Name] = tr
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
must.NoError(t, err)
defer destroy(ar)
go ar.Run()
upd := conf.StateUpdater.(*MockStateUpdater)
// Wait for running
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
must.NoError(t, err)
})
// Wait for completed task
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
// task should not have finished yet, task2 should be finished
if !last.TaskStates[task.Name].FinishedAt.IsZero() {
return false, fmt.Errorf("task should not be finished")
}
if last.TaskStates[task2.Name].FinishedAt.IsZero() {
return false, fmt.Errorf("task should be finished")
}
return true, nil
}, func(err error) {
must.NoError(t, err)
})
update := ar.Alloc().Copy()
migrate := true
update.DesiredTransition.Migrate = &migrate
update.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(update)
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.ClientStatus != structs.AllocClientStatusFailed {
return false, fmt.Errorf("got client status %q; want %q", last.ClientStatus, structs.AllocClientStatusFailed)
}
// task should be failed since it was killed, task2 should not
// be failed since it was already completed
if !last.TaskStates[task.Name].Failed {
return false, fmt.Errorf("task should be failed")
}
if last.TaskStates[task2.Name].Failed {
return false, fmt.Errorf("task should not be failed")
}
taskEvtSize := len(last.TaskStates[task.Name].Events)
task2EvtSize := len(last.TaskStates[task2.Name].Events)
if last.TaskStates[task.Name].Events[taskEvtSize-1].Type != structs.TaskKilled {
return false, fmt.Errorf("got last task event type %q; want %q",
last.TaskStates[task.Name].Events[taskEvtSize-1].Type, structs.TaskKilled)
}
if last.TaskStates[task2.Name].Events[task2EvtSize-1].Type != structs.TaskTerminated {
return false, fmt.Errorf("got last task event type %q; want %q",
last.TaskStates[task.Name].Events[task2EvtSize-1].Type, structs.TaskTerminated)
}
return true, nil
}, func(err error) {
must.NoError(t, err)
})
}
// Test that alloc runner kills tasks in task group when another task fails
func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
ci.Parallel(t)
@@ -2716,3 +2866,20 @@ func TestAllocRunner_setHookStatsHandler(t *testing.T) {
must.True(t, ok)
must.NotNil(t, noopHandler)
}
type allocPostrunHook struct {
mut sync.Mutex
err error
ran bool
}
func (h *allocPostrunHook) Name() string {
return "test_postrun"
}
func (h *allocPostrunHook) Postrun() error {
h.mut.Lock()
defer h.mut.Unlock()
h.ran = true
return h.err
}

View File

@@ -30,8 +30,8 @@ type RunnerPreKillHook interface {
}
// A RunnerPostrunHook is executed after calling TaskRunner.Run, even for
// terminal allocations. Therefore Postrun hooks must be safe to call without
// first calling Prerun hooks.
// terminal allocations, and all Postrun hooks will be run even if any of them error.
// Therefore, Postrun hooks must be safe to call without first calling Prerun hooks.
type RunnerPostrunHook interface {
RunnerHook
Postrun() error

View File

@@ -1649,6 +1649,12 @@ func (c *Client) setupNode() error {
node.NodeResources.MinDynamicPort = newConfig.MinDynamicPort
node.NodeResources.MaxDynamicPort = newConfig.MaxDynamicPort
node.NodeResources.Processors = newConfig.Node.NodeResources.Processors
if node.NodeResources.Processors.Empty() {
node.NodeResources.Processors = structs.NodeProcessorResources{
Topology: &numalib.Topology{},
}
}
}
if node.ReservedResources == nil {
node.ReservedResources = &structs.NodeReservedResources{}
@@ -2227,7 +2233,7 @@ func (c *Client) updateNodeStatus() error {
c.triggerDiscovery()
return fmt.Errorf("failed to update status: %v", err)
}
end := time.Now()
endTime := time.Now()
if len(resp.EvalIDs) != 0 {
c.logger.Debug("evaluations triggered by node update", "num_evals", len(resp.EvalIDs))
@@ -2238,7 +2244,7 @@ func (c *Client) updateNodeStatus() error {
last := c.lastHeartbeat()
oldTTL := c.heartbeatTTL
haveHeartbeated := c.haveHeartbeated
c.heartbeatStop.setLastOk(time.Now())
c.heartbeatStop.setLastOk(endTime)
c.heartbeatTTL = resp.HeartbeatTTL
c.haveHeartbeated = true
c.heartbeatLock.Unlock()
@@ -2260,7 +2266,7 @@ func (c *Client) updateNodeStatus() error {
// We have potentially missed our TTL log how delayed we were
if haveHeartbeated {
c.logger.Warn("missed heartbeat",
"req_latency", end.Sub(start), "heartbeat_ttl", oldTTL, "since_last_heartbeat", time.Since(last))
"req_latency", endTime.Sub(start), "heartbeat_ttl", oldTTL, "since_last_heartbeat", time.Since(last))
}
}

View File

@@ -138,6 +138,15 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
// Chown note: A uid or gid of -1 means to not change that value.
if err = os.Chown(path, params.Uid, params.Gid); err != nil {
log.Error("error changing owner/group", "error", err, "uid", params.Uid, "gid", params.Gid)
// Failing to change ownership is fatal for this plugin. Since we have
// already created the directory, we should attempt to clean it.
// Otherwise, the operator must do this manually.
if err := os.RemoveAll(path); err != nil {
log.Error("failed to remove directory after create failure",
"error", err)
}
return nil, fmt.Errorf("error changing owner/group: %w", err)
}

View File

@@ -18,7 +18,7 @@ type SystemScanner interface {
// a single Topology, which can then be used to answer questions about the CPU
// topology of the system.
func Scan(scanners []SystemScanner) *Topology {
top := new(Topology)
top := &Topology{}
for _, scanner := range scanners {
scanner.ScanSystem(top)
}