New delayed rescheduling options, validation function and unit tests

This commit is contained in:
Preetha Appan
2018-02-22 17:43:07 -06:00
parent dc183ad209
commit 3dd8cd1bfa
4 changed files with 455 additions and 56 deletions

View File

@@ -92,8 +92,10 @@ func Job() *structs.Job {
Mode: structs.RestartPolicyModeDelay,
},
ReschedulePolicy: &structs.ReschedulePolicy{
Attempts: 2,
Interval: 10 * time.Minute,
Attempts: 2,
Interval: 10 * time.Minute,
Delay: 5 * time.Second,
DelayFunction: "linear",
},
Tasks: []*structs.Task{
{

View File

@@ -1499,8 +1499,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: &TaskGroup{},
New: &TaskGroup{
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 15 * time.Second,
Attempts: 1,
Interval: 15 * time.Second,
Delay: 5 * time.Second,
DelayCeiling: 20 * time.Second,
DelayFunction: "exponential",
Unlimited: false,
},
},
Expected: &TaskGroupDiff{
@@ -1516,12 +1520,36 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "",
New: "1",
},
{
Type: DiffTypeAdded,
Name: "Delay",
Old: "",
New: "5000000000",
},
{
Type: DiffTypeAdded,
Name: "DelayCeiling",
Old: "",
New: "20000000000",
},
{
Type: DiffTypeAdded,
Name: "DelayFunction",
Old: "",
New: "exponential",
},
{
Type: DiffTypeAdded,
Name: "Interval",
Old: "",
New: "15000000000",
},
{
Type: DiffTypeAdded,
Name: "Unlimited",
Old: "",
New: "false",
},
},
},
},
@@ -1531,8 +1559,12 @@ func TestTaskGroupDiff(t *testing.T) {
// ReschedulePolicy deleted
Old: &TaskGroup{
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 15 * time.Second,
Attempts: 1,
Interval: 15 * time.Second,
Delay: 5 * time.Second,
DelayCeiling: 20 * time.Second,
DelayFunction: "exponential",
Unlimited: false,
},
},
New: &TaskGroup{},
@@ -1549,12 +1581,36 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "1",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Delay",
Old: "5000000000",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "DelayCeiling",
Old: "20000000000",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "DelayFunction",
Old: "exponential",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Interval",
Old: "15000000000",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Unlimited",
Old: "false",
New: "",
},
},
},
},
@@ -1564,14 +1620,22 @@ func TestTaskGroupDiff(t *testing.T) {
// ReschedulePolicy edited
Old: &TaskGroup{
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 1 * time.Second,
Attempts: 1,
Interval: 1 * time.Second,
DelayFunction: "exponential",
Delay: 20 * time.Second,
DelayCeiling: 1 * time.Minute,
Unlimited: false,
},
},
New: &TaskGroup{
ReschedulePolicy: &ReschedulePolicy{
Attempts: 2,
Interval: 2 * time.Second,
Attempts: 2,
Interval: 2 * time.Second,
DelayFunction: "linear",
Delay: 30 * time.Second,
DelayCeiling: 1 * time.Minute,
Unlimited: true,
},
},
Expected: &TaskGroupDiff{
@@ -1587,12 +1651,30 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "1",
New: "2",
},
{
Type: DiffTypeEdited,
Name: "Delay",
Old: "20000000000",
New: "30000000000",
},
{
Type: DiffTypeEdited,
Name: "DelayFunction",
Old: "exponential",
New: "linear",
},
{
Type: DiffTypeEdited,
Name: "Interval",
Old: "1000000000",
New: "2000000000",
},
{
Type: DiffTypeEdited,
Name: "Unlimited",
Old: "false",
New: "true",
},
},
},
},
@@ -1625,12 +1707,36 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "1",
New: "1",
},
{
Type: DiffTypeNone,
Name: "Delay",
Old: "0",
New: "0",
},
{
Type: DiffTypeNone,
Name: "DelayCeiling",
Old: "0",
New: "0",
},
{
Type: DiffTypeNone,
Name: "DelayFunction",
Old: "",
New: "",
},
{
Type: DiffTypeEdited,
Name: "Interval",
Old: "1000000000",
New: "2000000000",
},
{
Type: DiffTypeNone,
Name: "Unlimited",
Old: "false",
New: "false",
},
},
},
},

View File

@@ -35,6 +35,8 @@ import (
"github.com/mitchellh/copystructure"
"github.com/ugorji/go/codec"
"math"
hcodec "github.com/hashicorp/go-msgpack/codec"
)
@@ -2656,12 +2658,16 @@ var (
var (
DefaultServiceJobReschedulePolicy = ReschedulePolicy{
Attempts: 2,
Interval: 1 * time.Hour,
Delay: 30 * time.Second,
DelayFunction: "exponential",
DelayCeiling: 1 * time.Hour,
Unlimited: true,
}
DefaultBatchJobReschedulePolicy = ReschedulePolicy{
Attempts: 1,
Interval: 24 * time.Hour,
Attempts: 1,
Interval: 24 * time.Hour,
Delay: 5 * time.Second,
DelayFunction: "linear",
}
)
@@ -2744,6 +2750,9 @@ func NewRestartPolicy(jobType string) *RestartPolicy {
}
const ReschedulePolicyMinInterval = 15 * time.Second
const ReschedulePolicyMinDelay = 5 * time.Second
var RescheduleDelayFunctions = [...]string{"linear", "exponential", "fibonacci"}
// ReschedulePolicy configures how Tasks are rescheduled when they crash or fail.
type ReschedulePolicy struct {
@@ -2753,7 +2762,20 @@ type ReschedulePolicy struct {
// Interval is a duration in which we can limit the number of reschedule attempts.
Interval time.Duration
//TODO delay
// Delay is a minimum duration to wait between reschedule attempts.
// The delay function determines how much subsequent reschedule attempts are delayed by.
Delay time.Duration
// DelayFunction determines how the delay progressively changes on subsequent reschedule
// attempts. Valid values are "exponential", "linear", and "fibonacci".
DelayFunction string
// DelayCeiling is an upper bound on the delay.
DelayCeiling time.Duration
// Unlimited allows infinite rescheduling attempts. Only allowed when delay is set between reschedule
// attempts.
Unlimited bool
}
func (r *ReschedulePolicy) Copy() *ReschedulePolicy {
@@ -2765,17 +2787,151 @@ func (r *ReschedulePolicy) Copy() *ReschedulePolicy {
return nrp
}
// Validate uses different criteria to validate the reschedule policy
// Delay must be a minimum of 5 seconds
// Delay Ceiling is ignored if Delay Function is "linear"
// Number of possible attempts is validated, given the interval, delay and delay function
func (r *ReschedulePolicy) Validate() error {
if r != nil && r.Attempts > 0 {
var mErr multierror.Error
// Check for ambiguous/confusing settings
if r.Interval.Nanoseconds() < ReschedulePolicyMinInterval.Nanoseconds() {
multierror.Append(&mErr, fmt.Errorf("Interval cannot be less than %v (got %v)", RestartPolicyMinInterval, r.Interval))
enabled := r != nil && (r.Attempts > 0 || r.Unlimited)
if !enabled {
return nil
}
var mErr multierror.Error
// Check for ambiguous/confusing settings
delayPreCheck := true
// Delay should be bigger than the default
if r.Delay.Nanoseconds() < ReschedulePolicyMinDelay.Nanoseconds() {
multierror.Append(&mErr, fmt.Errorf("Delay cannot be less than %v (got %v)", ReschedulePolicyMinDelay, r.Delay))
delayPreCheck = false
}
// Must use a valid delay function
if !isValidDelayFunction(r.DelayFunction) {
multierror.Append(&mErr, fmt.Errorf("Invalid delay function %q, must be one of %q", r.DelayFunction, RescheduleDelayFunctions))
delayPreCheck = false
}
// Validate DelayCeiling if not using linear delay progression
if r.DelayFunction != "linear" {
if r.DelayCeiling.Nanoseconds() < ReschedulePolicyMinDelay.Nanoseconds() {
multierror.Append(&mErr, fmt.Errorf("Delay Ceiling cannot be less than %v (got %v)", ReschedulePolicyMinDelay, r.Delay))
delayPreCheck = false
}
if r.DelayCeiling < r.Delay {
multierror.Append(&mErr, fmt.Errorf("Delay Ceiling cannot be less than Delay %v (got %v)", r.Delay, r.DelayCeiling))
delayPreCheck = false
}
return mErr.ErrorOrNil()
}
return nil
//Validate Interval and other delay parameters if attempts are limited
if !r.Unlimited {
if r.Interval.Nanoseconds() < ReschedulePolicyMinInterval.Nanoseconds() {
multierror.Append(&mErr, fmt.Errorf("Interval cannot be less than %v (got %v)", ReschedulePolicyMinInterval, r.Interval))
}
if !delayPreCheck {
// We can't cross validate the rest of the delay params if delayPreCheck fails, so return early
return mErr.ErrorOrNil()
}
crossValidationErr := r.validateDelayParams()
if crossValidationErr != nil {
multierror.Append(&mErr, crossValidationErr)
}
}
return mErr.ErrorOrNil()
}
func isValidDelayFunction(delayFunc string) bool {
for _, value := range RescheduleDelayFunctions {
if value == delayFunc {
return true
}
}
return false
}
func (r *ReschedulePolicy) validateDelayParams() error {
ok, possibleAttempts, recommendedInterval := r.viableAttempts()
if ok {
return nil
}
var mErr multierror.Error
if r.DelayFunction == "linear" {
multierror.Append(&mErr, fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v and "+
"delay function %q", possibleAttempts, r.Interval, r.Delay, r.DelayFunction))
} else {
multierror.Append(&mErr, fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v, "+
"delay function %q, and delay ceiling %v", possibleAttempts, r.Interval, r.Delay, r.DelayFunction, r.DelayCeiling))
}
multierror.Append(&mErr, fmt.Errorf("Set the interval to at least %v to accommodate %v attempts", recommendedInterval.Round(time.Minute), r.Attempts))
return mErr.ErrorOrNil()
}
func (r *ReschedulePolicy) viableAttempts() (bool, int, time.Duration) {
var possibleAttempts int
var recommendedInterval time.Duration
valid := true
switch r.DelayFunction {
case "linear":
recommendedInterval = time.Duration(r.Attempts) * r.Delay
if r.Interval < recommendedInterval {
possibleAttempts = int(r.Interval / r.Delay)
valid = false
}
case "exponential":
for i := 0; i < r.Attempts; i++ {
nextDelay := time.Duration(math.Pow(2, float64(i))) * r.Delay
if nextDelay > r.DelayCeiling {
nextDelay = r.DelayCeiling
recommendedInterval += nextDelay
} else {
recommendedInterval = nextDelay
}
if recommendedInterval < r.Interval {
possibleAttempts++
}
}
if possibleAttempts < r.Attempts {
valid = false
}
case "fibonacci":
var slots []time.Duration
slots = append(slots, r.Delay)
slots = append(slots, r.Delay)
reachedCeiling := false
for i := 2; i < r.Attempts; i++ {
var nextDelay time.Duration
if reachedCeiling {
//switch to linear
nextDelay = slots[i-1] + r.DelayCeiling
} else {
nextDelay = slots[i-1] + slots[i-2]
if nextDelay > r.DelayCeiling {
nextDelay = r.DelayCeiling
reachedCeiling = true
}
}
slots = append(slots, nextDelay)
}
recommendedInterval = slots[len(slots)-1]
if r.Interval < recommendedInterval {
valid = false
// calculate possible attempts
for i := 0; i < len(slots); i++ {
if slots[i] > r.Interval {
possibleAttempts = i
break
}
}
}
default:
return false, 0, 0
}
if possibleAttempts < 0 { // can happen if delay is bigger than interval
possibleAttempts = 0
}
return valid, possibleAttempts, recommendedInterval
}
func NewReschedulePolicy(jobType string) *ReschedulePolicy {
@@ -2920,12 +3076,14 @@ func (tg *TaskGroup) Validate(j *Job) error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a restart policy", tg.Name))
}
if tg.ReschedulePolicy != nil {
if err := tg.ReschedulePolicy.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
if j.Type != JobTypeSystem {
if tg.ReschedulePolicy != nil {
if err := tg.ReschedulePolicy.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
} else {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a reschedule policy", tg.Name))
}
} else {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a reschedule policy", tg.Name))
}
if tg.EphemeralDisk != nil {

View File

@@ -570,8 +570,10 @@ func testJob() *Job {
Delay: 1 * time.Minute,
},
ReschedulePolicy: &ReschedulePolicy{
Interval: 5 * time.Minute,
Attempts: 10,
Interval: 5 * time.Minute,
Attempts: 10,
Delay: 5 * time.Second,
DelayFunction: "linear",
},
Tasks: []*Task{
{
@@ -930,6 +932,7 @@ func TestTaskGroup_Validate(t *testing.T) {
ReschedulePolicy: &ReschedulePolicy{
Interval: 5 * time.Minute,
Attempts: 5,
Delay: 5 * time.Second,
},
}
err := tg.Validate(j)
@@ -1012,8 +1015,10 @@ func TestTaskGroup_Validate(t *testing.T) {
Mode: RestartPolicyModeDelay,
},
ReschedulePolicy: &ReschedulePolicy{
Interval: 5 * time.Minute,
Attempts: 10,
Interval: 5 * time.Minute,
Attempts: 10,
Delay: 5 * time.Second,
DelayFunction: "linear",
},
}
@@ -2424,45 +2429,173 @@ func TestRestartPolicy_Validate(t *testing.T) {
func TestReschedulePolicy_Validate(t *testing.T) {
type testCase struct {
desc string
ReschedulePolicy *ReschedulePolicy
err error
errors []error
}
testCases := []testCase{
{
desc: "Nil",
},
{
desc: "Disabled",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 0,
Interval: 0 * time.Second},
err: nil,
},
{
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 5 * time.Minute},
err: nil,
},
{
desc: "Disabled",
ReschedulePolicy: &ReschedulePolicy{
Attempts: -1,
Interval: 5 * time.Minute},
err: nil,
},
{
desc: "Valid Linear Delay",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 1 * time.Second},
err: fmt.Errorf("Interval cannot be less than %v (got %v)", RestartPolicyMinInterval, time.Second),
Attempts: 1,
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
DelayFunction: "linear"},
},
{
desc: "Valid Exponential Delay",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 5,
Interval: 1 * time.Hour,
Delay: 30 * time.Second,
DelayCeiling: 5 * time.Minute,
DelayFunction: "exponential"},
},
{
desc: "Valid Fibonacci Delay",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 5,
Interval: 15 * time.Minute,
Delay: 10 * time.Second,
DelayCeiling: 5 * time.Minute,
DelayFunction: "fibonacci"},
},
{
desc: "Invalid delay function",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 1 * time.Second,
DelayFunction: "blah"},
errors: []error{
fmt.Errorf("Interval cannot be less than %v (got %v)", ReschedulePolicyMinInterval, time.Second),
fmt.Errorf("Delay cannot be less than %v (got %v)", ReschedulePolicyMinDelay, 0*time.Second),
fmt.Errorf("Invalid delay function %q, must be one of %q", "blah", RescheduleDelayFunctions),
},
},
{
desc: "Invalid delay ceiling",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 8 * time.Second,
DelayFunction: "exponential",
Delay: 15 * time.Second,
DelayCeiling: 5 * time.Second},
errors: []error{
fmt.Errorf("Delay Ceiling cannot be less than Delay %v (got %v)", 15*time.Second, 5*time.Second),
},
},
{
desc: "Invalid delay and interval",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 1 * time.Second,
DelayFunction: "linear"},
errors: []error{
fmt.Errorf("Interval cannot be less than %v (got %v)", ReschedulePolicyMinInterval, time.Second),
fmt.Errorf("Delay cannot be less than %v (got %v)", ReschedulePolicyMinDelay, 0*time.Second),
},
}, {
// Should suggest 2h40m as the interval
desc: "Invalid Attempts - linear delay",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 10,
Interval: 1 * time.Hour,
Delay: 20 * time.Minute,
DelayFunction: "linear",
},
errors: []error{
fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v and delay function %q", 3, time.Hour, 20*time.Minute, "linear"),
fmt.Errorf("Set the interval to at least %v to accommodate %v attempts", 200*time.Minute, 10),
},
},
{
// Should suggest 4h40m as the interval
// Delay progression in minutes {5, 10, 20, 40, 40, 40, 40, 40, 40, 40}
desc: "Invalid Attempts - exponential delay",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 10,
Interval: 30 * time.Minute,
Delay: 5 * time.Minute,
DelayCeiling: 40 * time.Minute,
DelayFunction: "exponential",
},
errors: []error{
fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v, "+
"delay function %q, and delay ceiling %v", 3, 30*time.Minute, 5*time.Minute, "exponential", 40*time.Minute),
fmt.Errorf("Set the interval to at least %v to accommodate %v attempts", 280*time.Minute, 10),
},
},
{
// Should suggest 8h as the interval
// Delay progression in minutes {20, 20, 40, 60, 80, 80, 80, 80, 80, 80}
desc: "Invalid Attempts - fibonacci delay",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 10,
Interval: 1 * time.Hour,
Delay: 20 * time.Minute,
DelayCeiling: 80 * time.Minute,
DelayFunction: "fibonacci",
},
errors: []error{
fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v, "+
"delay function %q, and delay ceiling %v", 4, 1*time.Hour, 20*time.Minute, "fibonacci", 80*time.Minute),
fmt.Errorf("Set the interval to at least %v to accommodate %v attempts", 480*time.Minute, 10),
},
},
{
desc: "Valid Unlimited config",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Unlimited: true,
DelayFunction: "exponential",
Delay: 5 * time.Minute,
DelayCeiling: 1 * time.Hour,
},
},
{
desc: "Invalid Unlimited config",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 1 * time.Second,
Unlimited: true,
DelayFunction: "exponential",
},
errors: []error{
fmt.Errorf("Delay cannot be less than %v (got %v)", ReschedulePolicyMinDelay, 0*time.Second),
fmt.Errorf("Delay Ceiling cannot be less than %v (got %v)", ReschedulePolicyMinDelay, 0*time.Second),
},
},
}
assert := assert.New(t)
for _, tc := range testCases {
if tc.err != nil {
assert.Contains(tc.ReschedulePolicy.Validate().Error(), tc.err.Error())
} else {
assert.Nil(tc.err)
}
t.Run(tc.desc, func(t *testing.T) {
require := require.New(t)
gotErr := tc.ReschedulePolicy.Validate()
if tc.errors != nil {
// Validate all errors
for _, err := range tc.errors {
require.Contains(gotErr.Error(), err.Error())
}
} else {
require.Nil(gotErr)
}
})
}
}
@@ -2719,7 +2852,7 @@ func TestAllocation_ShouldReschedule(t *testing.T) {
ClientStatus: AllocClientStatusFailed,
DesiredStatus: AllocDesiredStatusRun,
FailTime: fail,
ReschedulePolicy: &ReschedulePolicy{0, 1 * time.Minute},
ReschedulePolicy: &ReschedulePolicy{Attempts: 0, Interval: 1 * time.Minute},
ShouldReschedule: false,
},
{
@@ -2751,7 +2884,7 @@ func TestAllocation_ShouldReschedule(t *testing.T) {
ClientStatus: AllocClientStatusComplete,
DesiredStatus: AllocDesiredStatusRun,
FailTime: fail,
ReschedulePolicy: &ReschedulePolicy{1, 1 * time.Minute},
ReschedulePolicy: &ReschedulePolicy{Attempts: 1, Interval: 1 * time.Minute},
ShouldReschedule: false,
},
{
@@ -2759,14 +2892,14 @@ func TestAllocation_ShouldReschedule(t *testing.T) {
ClientStatus: AllocClientStatusFailed,
DesiredStatus: AllocDesiredStatusRun,
FailTime: fail,
ReschedulePolicy: &ReschedulePolicy{1, 1 * time.Minute},
ReschedulePolicy: &ReschedulePolicy{Attempts: 1, Interval: 1 * time.Minute},
ShouldReschedule: true,
},
{
Desc: "Reschedule with leftover attempts",
ClientStatus: AllocClientStatusFailed,
DesiredStatus: AllocDesiredStatusRun,
ReschedulePolicy: &ReschedulePolicy{2, 5 * time.Minute},
ReschedulePolicy: &ReschedulePolicy{Attempts: 2, Interval: 5 * time.Minute},
FailTime: fail,
RescheduleTrackers: []*RescheduleEvent{
{
@@ -2780,7 +2913,7 @@ func TestAllocation_ShouldReschedule(t *testing.T) {
ClientStatus: AllocClientStatusFailed,
DesiredStatus: AllocDesiredStatusRun,
FailTime: fail,
ReschedulePolicy: &ReschedulePolicy{1, 5 * time.Minute},
ReschedulePolicy: &ReschedulePolicy{Attempts: 1, Interval: 5 * time.Minute},
RescheduleTrackers: []*RescheduleEvent{
{
RescheduleTime: fail.Add(-6 * time.Minute).UTC().UnixNano(),
@@ -2793,7 +2926,7 @@ func TestAllocation_ShouldReschedule(t *testing.T) {
ClientStatus: AllocClientStatusFailed,
DesiredStatus: AllocDesiredStatusRun,
FailTime: fail,
ReschedulePolicy: &ReschedulePolicy{2, 5 * time.Minute},
ReschedulePolicy: &ReschedulePolicy{Attempts: 2, Interval: 5 * time.Minute},
RescheduleTrackers: []*RescheduleEvent{
{
RescheduleTime: fail.Add(-3 * time.Minute).UTC().UnixNano(),