server: stop after client disconnect (#7939)

* jobspec, api: add stop_after_client_disconnect

* nomad/state/state_store: error message typo

* structs: alloc methods to support stop_after_client_disconnect

1. a global AllocStates to track status changes with timestamps. We
   need this to track the time at which the alloc became lost
   originally.

2. ShouldClientStop() and WaitClientStop() to actually do the math

* scheduler/reconcile_util: delayByStopAfterClientDisconnect

* scheduler/reconcile: use delayByStopAfterClientDisconnect

* scheduler/util: updateNonTerminalAllocsToLost comments

This was setup to only update allocs to lost if the DesiredStatus had
already been set by the scheduler. It seems like the intention was to
update the status from any non-terminal state, and not all lost allocs
have been marked stop or evict by now

* scheduler/testing: AssertEvalStatus just use require

* scheduler/generic_sched: don't create a blocked eval if delayed

* scheduler/generic_sched_test: several scheduling cases
This commit is contained in:
Lang Martin
2020-05-13 16:39:04 -04:00
committed by GitHub
parent e3f9bebad1
commit cd6d34425f
16 changed files with 411 additions and 37 deletions

View File

@@ -411,23 +411,24 @@ func (vm *VolumeMount) Canonicalize() {
// TaskGroup is the unit of scheduling.
type TaskGroup struct {
Name *string
Count *int
Constraints []*Constraint
Affinities []*Affinity
Tasks []*Task
Spreads []*Spread
Volumes map[string]*VolumeRequest
RestartPolicy *RestartPolicy
ReschedulePolicy *ReschedulePolicy
EphemeralDisk *EphemeralDisk
Update *UpdateStrategy
Migrate *MigrateStrategy
Networks []*NetworkResource
Meta map[string]string
Services []*Service
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"`
Scaling *ScalingPolicy
Name *string
Count *int
Constraints []*Constraint
Affinities []*Affinity
Tasks []*Task
Spreads []*Spread
Volumes map[string]*VolumeRequest
RestartPolicy *RestartPolicy
ReschedulePolicy *ReschedulePolicy
EphemeralDisk *EphemeralDisk
Update *UpdateStrategy
Migrate *MigrateStrategy
Networks []*NetworkResource
Meta map[string]string
Services []*Service
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"`
StopAfterClientDisconnect *time.Duration `mapstructure:"stop_after_client_disconnect"`
Scaling *ScalingPolicy
}
// NewTaskGroup creates a new TaskGroup.

View File

@@ -91,7 +91,7 @@ func (h *heartbeatStop) watch() {
select {
case allocID := <-stop:
if err := h.stopAlloc(allocID); err != nil {
h.logger.Warn("stopping alloc %s on heartbeat timeout failed: %v", allocID, err)
h.logger.Warn("error stopping on heartbeat timeout", "alloc", allocID, "error", err)
continue
}
delete(h.allocInterval, allocID)
@@ -142,6 +142,8 @@ func (h *heartbeatStop) stopAlloc(allocID string) error {
return err
}
h.logger.Debug("stopping alloc for stop_after_client_disconnect", "alloc", allocID)
runner.Destroy()
return nil
}

View File

@@ -793,6 +793,10 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
tg.ShutdownDelay = taskGroup.ShutdownDelay
}
if taskGroup.StopAfterClientDisconnect != nil {
tg.StopAfterClientDisconnect = taskGroup.StopAfterClientDisconnect
}
if taskGroup.ReschedulePolicy != nil {
tg.ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: *taskGroup.ReschedulePolicy.Attempts,

View File

@@ -56,6 +56,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
"service",
"volume",
"scaling",
"stop_after_client_disconnect",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n))

View File

@@ -184,6 +184,7 @@ func TestParse(t *testing.T) {
},
},
},
StopAfterClientDisconnect: helper.TimeToPtr(120 * time.Second),
ReschedulePolicy: &api.ReschedulePolicy{
Interval: helper.TimeToPtr(12 * time.Hour),
Attempts: helper.IntToPtr(5),

View File

@@ -152,6 +152,8 @@ job "binstore-storagelocker" {
}
}
stop_after_client_disconnect = "120s"
task "binstore" {
driver = "docker"
user = "bob"

View File

@@ -4623,7 +4623,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
}
case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete:
default:
s.logger.Error("invalid old client status for allocatio",
s.logger.Error("invalid old client status for allocation",
"alloc_id", existingAlloc.ID, "client_status", existingAlloc.ClientStatus)
}
summaryChanged = true

View File

@@ -238,6 +238,20 @@ func (tg *TaskGroup) Diff(other *TaskGroup, contextual bool) (*TaskGroupDiff, er
}
}
// StopAfterClientDisconnect diff
if oldPrimitiveFlat != nil && newPrimitiveFlat != nil {
if tg.StopAfterClientDisconnect == nil {
oldPrimitiveFlat["StopAfterClientDisconnect"] = ""
} else {
oldPrimitiveFlat["StopAfterClientDisconnect"] = fmt.Sprintf("%d", *tg.StopAfterClientDisconnect)
}
if other.StopAfterClientDisconnect == nil {
newPrimitiveFlat["StopAfterClientDisconnect"] = ""
} else {
newPrimitiveFlat["StopAfterClientDisconnect"] = fmt.Sprintf("%d", *other.StopAfterClientDisconnect)
}
}
// Diff the primitive fields.
diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, false)

View File

@@ -3799,6 +3799,10 @@ func (j *Job) Validate() error {
mErr.Errors = append(mErr.Errors, errors.New("ShutdownDelay must be a positive value"))
}
if tg.StopAfterClientDisconnect != nil && *tg.StopAfterClientDisconnect < 0 {
mErr.Errors = append(mErr.Errors, errors.New("StopAfterClientDisconnect must be a positive value"))
}
if j.Type == "system" && tg.Count > 1 {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Job task group %s has count %d. Count cannot exceed 1 with system scheduler",
@@ -5265,6 +5269,10 @@ func (tg *TaskGroup) Copy() *TaskGroup {
ntg.ShutdownDelay = tg.ShutdownDelay
}
if tg.StopAfterClientDisconnect != nil {
ntg.StopAfterClientDisconnect = tg.StopAfterClientDisconnect
}
return ntg
}
@@ -6516,6 +6524,19 @@ func (t *Template) Warnings() error {
return mErr.ErrorOrNil()
}
// AllocState records a single event that changes the state of the whole allocation
type AllocStateField uint8
const (
AllocStateFieldClientStatus AllocStateField = iota
)
type AllocState struct {
Field AllocStateField
Value string
Time time.Time
}
// Set of possible states for a task.
const (
TaskStatePending = "pending" // The task is waiting to be run.
@@ -8152,6 +8173,9 @@ type Allocation struct {
// TaskStates stores the state of each task,
TaskStates map[string]*TaskState
// AllocStates track meta data associated with changes to the state of the whole allocation, like becoming lost
AllocStates []*AllocState
// PreviousAllocation is the allocation that this allocation is replacing
PreviousAllocation string
@@ -8420,6 +8444,49 @@ func (a *Allocation) NextRescheduleTime() (time.Time, bool) {
return nextRescheduleTime, rescheduleEligible
}
// ShouldClientStop tests an alloc for StopAfterClientDisconnect configuration
func (a *Allocation) ShouldClientStop() bool {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
if tg == nil ||
tg.StopAfterClientDisconnect == nil ||
*tg.StopAfterClientDisconnect == 0*time.Nanosecond {
return false
}
return true
}
// WaitClientStop uses the reschedule delay mechanism to block rescheduling until
// StopAfterClientDisconnect's block interval passes
func (a *Allocation) WaitClientStop() time.Time {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
// An alloc can only be marked lost once, so use the first lost transition
var t time.Time
for _, s := range a.AllocStates {
if s.Field == AllocStateFieldClientStatus &&
s.Value == AllocClientStatusLost {
t = s.Time
break
}
}
// On the first pass, the alloc hasn't been marked lost yet, and so we start
// counting from now
if t.IsZero() {
t = time.Now().UTC()
}
// Find the max kill timeout
kill := DefaultKillTimeout
for _, t := range tg.Tasks {
if t.KillTimeout > kill {
kill = t.KillTimeout
}
}
return t.Add(*tg.StopAfterClientDisconnect + kill)
}
// NextDelay returns a duration after which the allocation can be rescheduled.
// It is calculated according to the delay function and previous reschedule attempts.
func (a *Allocation) NextDelay() time.Duration {
@@ -8476,6 +8543,24 @@ func (a *Allocation) Terminated() bool {
return false
}
// SetStopped updates the allocation in place to a DesiredStatus stop, with the ClientStatus
func (a *Allocation) SetStop(clientStatus, clientDesc string) {
a.DesiredStatus = AllocDesiredStatusStop
a.ClientStatus = clientStatus
a.ClientDescription = clientDesc
a.AppendState(AllocStateFieldClientStatus, clientStatus)
}
// AppendState creates and appends an AllocState entry recording the time of the state
// transition. Used to mark the transition to lost
func (a *Allocation) AppendState(field AllocStateField, value string) {
a.AllocStates = append(a.AllocStates, &AllocState{
Field: field,
Value: value,
Time: time.Now().UTC(),
})
}
// RanSuccessfully returns whether the client has ran the allocation and all
// tasks finished successfully. Critically this function returns whether the
// allocation has ran to completion and not just that the alloc has converged to
@@ -9384,6 +9469,8 @@ func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus s
newAlloc.ClientStatus = clientStatus
}
newAlloc.AppendState(AllocStateFieldClientStatus, clientStatus)
node := alloc.NodeID
existing := p.NodeUpdate[node]
p.NodeUpdate[node] = append(existing, newAlloc)

View File

@@ -3595,13 +3595,21 @@ func TestPlan_AppendStoppedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) {
plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost)
appendedAlloc := plan.NodeUpdate[alloc.NodeID][0]
expectedAlloc := new(Allocation)
*expectedAlloc = *alloc
expectedAlloc.DesiredDescription = desiredDesc
expectedAlloc.DesiredStatus = AllocDesiredStatusStop
expectedAlloc.ClientStatus = AllocClientStatusLost
expectedAlloc.Job = nil
expectedAlloc.AllocStates = []*AllocState{{
Field: AllocStateFieldClientStatus,
Value: "lost",
}}
// This value is set to time.Now() in AppendStoppedAlloc, so clear it
appendedAlloc := plan.NodeUpdate[alloc.NodeID][0]
appendedAlloc.AllocStates[0].Time = time.Time{}
assert.Equal(t, expectedAlloc, appendedAlloc)
assert.Equal(t, alloc.Job, plan.Job)
}
@@ -4372,6 +4380,65 @@ func TestAllocation_NextDelay(t *testing.T) {
}
func TestAllocation_WaitClientStop(t *testing.T) {
type testCase struct {
desc string
stop time.Duration
status string
expectedShould bool
expectedRescheduleTime time.Time
}
now := time.Now().UTC()
testCases := []testCase{
{
desc: "running",
stop: 2 * time.Second,
status: AllocClientStatusRunning,
expectedShould: true,
},
{
desc: "no stop_after_client_disconnect",
status: AllocClientStatusLost,
expectedShould: false,
},
{
desc: "stop",
status: AllocClientStatusLost,
stop: 2 * time.Second,
expectedShould: true,
expectedRescheduleTime: now.Add((2 + 5) * time.Second),
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
j := testJob()
a := &Allocation{
ClientStatus: tc.status,
Job: j,
TaskStates: map[string]*TaskState{},
}
if tc.status == AllocClientStatusLost {
a.AppendState(AllocStateFieldClientStatus, AllocClientStatusLost)
}
j.TaskGroups[0].StopAfterClientDisconnect = &tc.stop
a.TaskGroup = j.TaskGroups[0].Name
require.Equal(t, tc.expectedShould, a.ShouldClientStop())
if !tc.expectedShould || tc.status != AllocClientStatusLost {
return
}
// the reschedTime is close to the expectedRescheduleTime
reschedTime := a.WaitClientStop()
e := reschedTime.Unix() - tc.expectedRescheduleTime.Unix()
require.Less(t, e, int64(2))
})
}
}
func TestAllocation_Canonicalize_Old(t *testing.T) {
alloc := MockAlloc()
alloc.AllocatedResources = nil

View File

@@ -259,8 +259,10 @@ func (s *GenericScheduler) process() (bool, error) {
// If there are failed allocations, we need to create a blocked evaluation
// to place the failed allocations when resources become available. If the
// current evaluation is already a blocked eval, we reuse it by submitting
// a new eval to the planner in createBlockedEval
if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil {
// a new eval to the planner in createBlockedEval. If the current eval is
// pending with WaitUntil set, it's delayed rather than blocked.
if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil &&
s.eval.WaitUntil.IsZero() {
if err := s.createBlockedEval(false); err != nil {
s.logger.Error("failed to make blocked eval", "error", err)
return false, err
@@ -338,7 +340,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
}
// Update the allocations which are in pending/running state on tainted
// nodes to lost
// nodes to lost, but only if the scheduler has already marked them
updateNonTerminalAllocsToLost(s.plan, tainted, allocs)
reconciler := NewAllocReconciler(s.logger,

View File

@@ -12,6 +12,7 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -2768,6 +2769,167 @@ func TestServiceSched_NodeDown(t *testing.T) {
}
}
func TestServiceSched_StopAfterClientDisconnect(t *testing.T) {
cases := []struct {
stop time.Duration
when time.Time
rescheduled bool
}{
{
rescheduled: true,
},
{
stop: 1 * time.Second,
rescheduled: false,
},
{
stop: 1 * time.Second,
when: time.Now().UTC().Add(-10 * time.Second),
rescheduled: true,
},
{
stop: 1 * time.Second,
when: time.Now().UTC().Add(10 * time.Minute),
rescheduled: false,
},
}
for i, tc := range cases {
t.Run(fmt.Sprintf(""), func(t *testing.T) {
h := NewHarness(t)
// Node, which is down
node := mock.Node()
node.Status = structs.NodeStatusDown
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node))
// Job with allocations and stop_after_client_disconnect
job := mock.Job()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].StopAfterClientDisconnect = &tc.stop
require.NoError(t, h.State.UpsertJob(h.NextIndex(), job))
// Alloc for the running group
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusRunning
if !tc.when.IsZero() {
alloc.AllocStates = []*structs.AllocState{{
Field: structs.AllocStateFieldClientStatus,
Value: structs.AllocClientStatusLost,
Time: tc.when,
}}
}
allocs := []*structs.Allocation{alloc}
require.NoError(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Create a mock evaluation to deal with drain
evals := []*structs.Evaluation{{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
}}
eval := evals[0]
require.NoError(t, h.State.UpsertEvals(h.NextIndex(), evals))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
require.Equal(t, h.Evals[0].Status, structs.EvalStatusComplete)
require.Len(t, h.Plans, 1, "plan")
// Followup eval created
require.True(t, len(h.CreateEvals) > 0)
e := h.CreateEvals[0]
require.Equal(t, eval.ID, e.PreviousEval)
if tc.rescheduled {
require.Equal(t, "blocked", e.Status)
} else {
require.Equal(t, "pending", e.Status)
require.NotEmpty(t, e.WaitUntil)
}
// This eval is still being inserted in the state store
ws := memdb.NewWatchSet()
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(ws, e.ID)
if err != nil {
return false, err
}
if found == nil {
return false, nil
}
return true, nil
}, func(err error) {
require.NoError(t, err)
})
alloc, err = h.State.AllocByID(ws, alloc.ID)
require.NoError(t, err)
// Allocations have been transitioned to lost
require.Equal(t, structs.AllocDesiredStatusStop, alloc.DesiredStatus)
require.Equal(t, structs.AllocClientStatusLost, alloc.ClientStatus)
// At least 1, 2 if we manually set the tc.when
require.NotEmpty(t, alloc.AllocStates)
if tc.rescheduled {
// Register a new node, leave it up, process the followup eval
node = mock.Node()
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node))
require.NoError(t, h.Process(NewServiceScheduler, eval))
as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
if err != nil {
return false, err
}
return len(as) == 2, nil
}, func(err error) {
require.NoError(t, err)
})
a2 := as[0]
if a2.ID == alloc.ID {
a2 = as[1]
}
require.Equal(t, structs.AllocClientStatusPending, a2.ClientStatus)
require.Equal(t, structs.AllocDesiredStatusRun, a2.DesiredStatus)
require.Equal(t, node.ID, a2.NodeID)
// No blocked evals
require.Empty(t, h.ReblockEvals)
require.Len(t, h.CreateEvals, 1)
require.Equal(t, h.CreateEvals[0].ID, e.ID)
} else {
// No new alloc was created
as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
require.Len(t, as, 1)
old := as[0]
require.Equal(t, alloc.ID, old.ID)
require.Equal(t, structs.AllocClientStatusLost, old.ClientStatus)
require.Equal(t, structs.AllocDesiredStatusStop, old.DesiredStatus)
}
})
}
}
func TestServiceSched_NodeUpdate(t *testing.T) {
h := NewHarness(t)

View File

@@ -353,10 +353,20 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, a.now, a.evalID, a.deployment)
// Find delays for any lost allocs that have stop_after_client_disconnect
lostLater := lost.delayByStopAfterClientDisconnect()
rescheduleLater = append(rescheduleLater, lostLater...)
// Create batched follow up evaluations for allocations that are
// reschedulable later and mark the allocations for in place updating
a.handleDelayedReschedules(rescheduleLater, all, tg.Name)
// Allocs that are lost and delayed have an attributeUpdate that correctly links to
// the eval, but incorrectly has the current (running) status
for _, d := range lostLater {
a.result.attributeUpdates[d.allocID].SetStop(structs.AllocClientStatusLost, structs.AllocClientStatusLost)
}
// Create a structure for choosing names. Seed with the taken names which is
// the union of untainted and migrating nodes (includes canaries)
nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, rescheduleNow))
@@ -413,9 +423,13 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// * The deployment is not paused or failed
// * Not placing any canaries
// * If there are any canaries that they have been promoted
place := a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow)
if !existingDeployment {
dstate.DesiredTotal += len(place)
// * There is no delayed stop_after_client_disconnect alloc
var place []allocPlaceResult
if len(lostLater) == 0 {
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
}
// deploymentPlaceReady tracks whether the deployment is in a state where

View File

@@ -370,6 +370,28 @@ func (a allocSet) filterByDeployment(id string) (match, nonmatch allocSet) {
return
}
// delayByStopAfterClientDisconnect returns a delay for any lost allocation that's got a
// stop_after_client_disconnect configured
func (as allocSet) delayByStopAfterClientDisconnect() (later []*delayedRescheduleInfo) {
now := time.Now().UTC()
for _, a := range as {
if !a.ShouldClientStop() {
continue
}
t := a.WaitClientStop()
if t.After(now) {
later = append(later, &delayedRescheduleInfo{
allocID: a.ID,
alloc: a,
rescheduleTime: t,
})
}
}
return later
}
// allocNameIndex is used to select allocation names for placement or removal
// given an existing set of placed allocations.
type allocNameIndex struct {

View File

@@ -6,6 +6,7 @@ import (
"time"
testing "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/testlog"
@@ -272,12 +273,7 @@ func (h *Harness) Process(factory Factory, eval *structs.Evaluation) error {
}
func (h *Harness) AssertEvalStatus(t testing.T, state string) {
if len(h.Evals) != 1 {
t.Fatalf("bad: %#v", h.Evals)
}
require.Len(t, h.Evals, 1)
update := h.Evals[0]
if update.Status != state {
t.Fatalf("bad: %#v", update)
}
require.Equal(t, state, update.Status)
}

View File

@@ -812,8 +812,8 @@ func adjustQueuedAllocations(logger log.Logger, result *structs.PlanResult, queu
}
}
// updateNonTerminalAllocsToLost updates the allocations which are in pending/running state on tainted node
// to lost
// updateNonTerminalAllocsToLost updates the allocations which are in pending/running state
// on tainted node to lost, but only for allocs already DesiredStatus stop or evict
func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*structs.Node, allocs []*structs.Allocation) {
for _, alloc := range allocs {
node, ok := tainted[alloc.NodeID]
@@ -826,8 +826,7 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc
continue
}
// If the scheduler has marked it as stop or evict already but the alloc
// wasn't terminal on the client change the status to lost.
// If the alloc is already correctly marked lost, we're done
if (alloc.DesiredStatus == structs.AllocDesiredStatusStop ||
alloc.DesiredStatus == structs.AllocDesiredStatusEvict) &&
(alloc.ClientStatus == structs.AllocClientStatusRunning ||