Add reschedule policy to API, and HCL parsing support.

This commit is contained in:
Preetha Appan
2018-01-18 14:49:01 -06:00
parent dd00e637ad
commit eab9d2da92
9 changed files with 326 additions and 8 deletions

View File

@@ -558,6 +558,7 @@ type Job struct {
Periodic *PeriodicConfig
ParameterizedJob *ParameterizedJobConfig
Payload []byte
Reschedule *ReschedulePolicy
Meta map[string]string
VaultToken *string `mapstructure:"vault_token"`
Status *string

View File

@@ -135,6 +135,10 @@ func TestJobs_Canonicalize(t *testing.T) {
Interval: helper.TimeToPtr(1 * time.Minute),
Mode: helper.StringToPtr("delay"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
},
Tasks: []*Task{
{
KillTimeout: helper.TimeToPtr(5 * time.Second),
@@ -197,6 +201,10 @@ func TestJobs_Canonicalize(t *testing.T) {
Interval: helper.TimeToPtr(1 * time.Minute),
Mode: helper.StringToPtr("delay"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
},
Tasks: []*Task{
{
Name: "task1",
@@ -326,6 +334,10 @@ func TestJobs_Canonicalize(t *testing.T) {
Delay: helper.TimeToPtr(25 * time.Second),
Mode: helper.StringToPtr("delay"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
},
EphemeralDisk: &EphemeralDisk{
Sticky: helper.BoolToPtr(false),
Migrate: helper.BoolToPtr(false),
@@ -537,6 +549,10 @@ func TestJobs_Canonicalize(t *testing.T) {
Interval: helper.TimeToPtr(1 * time.Minute),
Mode: helper.StringToPtr("delay"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
},
Update: &UpdateStrategy{
Stagger: helper.TimeToPtr(2 * time.Second),
MaxParallel: helper.IntToPtr(2),
@@ -569,6 +585,10 @@ func TestJobs_Canonicalize(t *testing.T) {
Interval: helper.TimeToPtr(1 * time.Minute),
Mode: helper.StringToPtr("delay"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
},
Update: &UpdateStrategy{
Stagger: helper.TimeToPtr(1 * time.Second),
MaxParallel: helper.IntToPtr(1),

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
// MemoryStats holds memory usage related stats
@@ -78,6 +79,48 @@ func (r *RestartPolicy) Merge(rp *RestartPolicy) {
}
}
// Reschedule configures how Tasks are rescheduled when they crash or fail.
type ReschedulePolicy struct {
// Attempts limits the number of rescheduling attempts that can occur in an interval.
Attempts *int `mapstructure:"attempts"`
// Interval is a duration in which we can limit the number of reschedule attempts.
Interval *time.Duration `mapstructure:"interval"`
}
func (r *ReschedulePolicy) Merge(rp *ReschedulePolicy) {
if rp.Interval != nil {
r.Interval = rp.Interval
}
if rp.Attempts != nil {
r.Attempts = rp.Attempts
}
}
func (r *ReschedulePolicy) Copy() *ReschedulePolicy {
if r == nil {
return nil
}
nrp := new(ReschedulePolicy)
*nrp = *r
return nrp
}
func (r *ReschedulePolicy) Empty() bool {
if r == nil {
return true
}
if r.Attempts != nil && *r.Attempts != 0 {
return false
}
if r.Interval != nil && *r.Interval != 0 {
return false
}
return true
}
// CheckRestart describes if and when a task should be restarted based on
// failing health checks.
type CheckRestart struct {
@@ -222,14 +265,15 @@ func (e *EphemeralDisk) Canonicalize() {
// TaskGroup is the unit of scheduling.
type TaskGroup struct {
Name *string
Count *int
Constraints []*Constraint
Tasks []*Task
RestartPolicy *RestartPolicy
EphemeralDisk *EphemeralDisk
Update *UpdateStrategy
Meta map[string]string
Name *string
Count *int
Constraints []*Constraint
Tasks []*Task
RestartPolicy *RestartPolicy
ReschedulePolicy *ReschedulePolicy
EphemeralDisk *EphemeralDisk
Update *UpdateStrategy
Meta map[string]string
}
// NewTaskGroup creates a new TaskGroup.
@@ -272,6 +316,41 @@ func (g *TaskGroup) Canonicalize(job *Job) {
g.Update.Canonicalize()
}
// Merge the reschedule policy from the job
if jr, tr := job.Reschedule != nil, g.ReschedulePolicy != nil; jr && tr {
jobReschedule := job.Reschedule.Copy()
jobReschedule.Merge(g.ReschedulePolicy)
g.ReschedulePolicy = jobReschedule
} else if jr && !job.Reschedule.Empty() {
jobReschedule := job.Reschedule.Copy()
g.ReschedulePolicy = jobReschedule
}
// Merge with default reschedule policy
var defaultReschedulePolicy *ReschedulePolicy
switch *job.Type {
case "service":
defaultReschedulePolicy = &ReschedulePolicy{
Attempts: helper.IntToPtr(structs.DefaultServiceJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.Interval),
}
case "batch":
defaultReschedulePolicy = &ReschedulePolicy{
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
}
default:
defaultReschedulePolicy = &ReschedulePolicy{
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0 * time.Second),
}
}
if g.ReschedulePolicy != nil {
defaultReschedulePolicy.Merge(g.ReschedulePolicy)
}
g.ReschedulePolicy = defaultReschedulePolicy
var defaultRestartPolicy *RestartPolicy
switch *job.Type {
case "service", "system":

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
)
@@ -268,6 +269,105 @@ func TestTaskGroup_Canonicalize_Update(t *testing.T) {
assert.Nil(t, tg.Update)
}
// Verifies that reschedule policy is merged correctly
func TestTaskGroup_Canonicalize_ReschedulePolicy(t *testing.T) {
type testCase struct {
desc string
jobReschedulePolicy *ReschedulePolicy
taskReschedulePolicy *ReschedulePolicy
expected *ReschedulePolicy
}
testCases := []testCase{
{
desc: "Default",
jobReschedulePolicy: nil,
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
},
},
{
desc: "Empty job reschedule policy",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
},
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
},
},
{
desc: "Inherit from job",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(20 * time.Second),
},
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(20 * time.Second),
},
},
{
desc: "Set in task",
jobReschedulePolicy: nil,
taskReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(2 * time.Minute),
},
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(2 * time.Minute),
},
},
{
desc: "Merge from job",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
},
taskReschedulePolicy: &ReschedulePolicy{
Interval: helper.TimeToPtr(5 * time.Minute),
},
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(5 * time.Minute),
},
},
{
desc: "Attempts from job, default interval",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
},
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
job := &Job{
ID: helper.StringToPtr("test"),
Reschedule: tc.jobReschedulePolicy,
Type: helper.StringToPtr(JobTypeBatch),
}
job.Canonicalize()
tg := &TaskGroup{
Name: helper.StringToPtr("foo"),
ReschedulePolicy: tc.taskReschedulePolicy,
}
tg.Canonicalize(job)
assert.Equal(t, tc.expected, tg.ReschedulePolicy)
})
}
}
// TestService_CheckRestart asserts Service.CheckRestart settings are properly
// inherited by Checks.
func TestService_CheckRestart(t *testing.T) {

View File

@@ -638,6 +638,11 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
Mode: *taskGroup.RestartPolicy.Mode,
}
tg.ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: *taskGroup.ReschedulePolicy.Attempts,
Interval: *taskGroup.ReschedulePolicy.Interval,
}
tg.EphemeralDisk = &structs.EphemeralDisk{
Sticky: *taskGroup.EphemeralDisk.Sticky,
SizeMB: *taskGroup.EphemeralDisk.SizeMB,

View File

@@ -108,6 +108,7 @@ func parseJob(result *api.Job, list *ast.ObjectList) error {
delete(m, "periodic")
delete(m, "vault")
delete(m, "parameterized")
delete(m, "reschedule")
// Set the ID and name to the object key
result.ID = helper.StringToPtr(obj.Keys[0].Token.Value().(string))
@@ -143,6 +144,7 @@ func parseJob(result *api.Job, list *ast.ObjectList) error {
"task",
"type",
"update",
"reschedule",
"vault",
"vault_token",
}
@@ -178,6 +180,13 @@ func parseJob(result *api.Job, list *ast.ObjectList) error {
}
}
// If we have a reschedule stanza, then parse that
if o := listVal.Filter("reschedule"); len(o.Items) > 0 {
if err := parseReschedulePolicy(&result.Reschedule, o); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s', reschedule ->"))
}
}
// Parse out meta fields. These are in HCL as a list so we need
// to iterate over them and merge them.
if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 {
@@ -274,6 +283,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
"task",
"ephemeral_disk",
"update",
"reschedule",
"vault",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
@@ -313,6 +323,12 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
}
}
// Parse reschedule policy
if o := listVal.Filter("reschedule"); len(o.Items) > 0 {
if err := parseReschedulePolicy(&g.ReschedulePolicy, o); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s', reschedule ->", n))
}
}
// Parse ephemeral disk
if o := listVal.Filter("ephemeral_disk"); len(o.Items) > 0 {
g.EphemeralDisk = &api.EphemeralDisk{}
@@ -417,6 +433,46 @@ func parseRestartPolicy(final **api.RestartPolicy, list *ast.ObjectList) error {
return nil
}
func parseReschedulePolicy(final **api.ReschedulePolicy, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'reschedule' block allowed")
}
// Get our job object
obj := list.Items[0]
// Check for invalid keys
valid := []string{
"attempts",
"interval",
}
if err := helper.CheckHCLKeys(obj.Val, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, obj.Val); err != nil {
return err
}
var result api.ReschedulePolicy
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &result,
})
if err != nil {
return err
}
if err := dec.Decode(m); err != nil {
return err
}
*final = &result
return nil
}
func parseConstraints(result *[]*api.Constraint, list *ast.ObjectList) error {
for _, o := range list.Elem().Items {
// Check for invalid keys

View File

@@ -94,6 +94,10 @@ func TestParse(t *testing.T) {
Delay: helper.TimeToPtr(15 * time.Second),
Mode: helper.StringToPtr("delay"),
},
ReschedulePolicy: &api.ReschedulePolicy{
Interval: helper.TimeToPtr(12 * time.Hour),
Attempts: helper.IntToPtr(5),
},
EphemeralDisk: &api.EphemeralDisk{
Sticky: helper.BoolToPtr(true),
SizeMB: helper.IntToPtr(150),
@@ -667,6 +671,36 @@ func TestParse(t *testing.T) {
},
false,
},
{
"reschedule-job.hcl",
&api.Job{
ID: helper.StringToPtr("foo"),
Name: helper.StringToPtr("foo"),
Type: helper.StringToPtr("batch"),
Datacenters: []string{"dc1"},
Reschedule: &api.ReschedulePolicy{
Attempts: helper.IntToPtr(15),
Interval: helper.TimeToPtr(30 * time.Minute),
},
TaskGroups: []*api.TaskGroup{
{
Name: helper.StringToPtr("bar"),
Count: helper.IntToPtr(3),
Tasks: []*api.Task{
{
Name: "bar",
Driver: "raw_exec",
Config: map[string]interface{}{
"command": "bash",
"args": []interface{}{"-c", "echo hi"},
},
},
},
},
},
},
false,
},
}
for _, tc := range cases {

View File

@@ -48,6 +48,11 @@ job "binstore-storagelocker" {
mode = "delay"
}
reschedule {
attempts = 5
interval = "12h"
}
ephemeral_disk {
sticky = true
size = 150

View File

@@ -0,0 +1,18 @@
job "foo" {
datacenters = ["dc1"]
type = "batch"
reschedule {
attempts = 15
interval = "30m"
}
group "bar" {
count = 3
task "bar" {
driver = "raw_exec"
config {
command = "bash"
args = ["-c", "echo hi"]
}
}
}
}