From eab9d2da928d9582aa805f68910cbb6aaf96c406 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 18 Jan 2018 14:49:01 -0600 Subject: [PATCH] Add reschedule policy to API, and HCL parsing support. --- api/jobs.go | 1 + api/jobs_test.go | 20 +++++ api/tasks.go | 95 +++++++++++++++++++-- api/tasks_test.go | 100 +++++++++++++++++++++++ command/agent/job_endpoint.go | 5 ++ jobspec/parse.go | 56 +++++++++++++ jobspec/parse_test.go | 34 ++++++++ jobspec/test-fixtures/basic.hcl | 5 ++ jobspec/test-fixtures/reschedule-job.hcl | 18 ++++ 9 files changed, 326 insertions(+), 8 deletions(-) create mode 100644 jobspec/test-fixtures/reschedule-job.hcl diff --git a/api/jobs.go b/api/jobs.go index e68bef1e7..59097b004 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -558,6 +558,7 @@ type Job struct { Periodic *PeriodicConfig ParameterizedJob *ParameterizedJobConfig Payload []byte + Reschedule *ReschedulePolicy Meta map[string]string VaultToken *string `mapstructure:"vault_token"` Status *string diff --git a/api/jobs_test.go b/api/jobs_test.go index da7bfc99b..5bbc85ae3 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -135,6 +135,10 @@ func TestJobs_Canonicalize(t *testing.T) { Interval: helper.TimeToPtr(1 * time.Minute), Mode: helper.StringToPtr("delay"), }, + ReschedulePolicy: &ReschedulePolicy{ + Attempts: helper.IntToPtr(2), + Interval: helper.TimeToPtr(1 * time.Hour), + }, Tasks: []*Task{ { KillTimeout: helper.TimeToPtr(5 * time.Second), @@ -197,6 +201,10 @@ func TestJobs_Canonicalize(t *testing.T) { Interval: helper.TimeToPtr(1 * time.Minute), Mode: helper.StringToPtr("delay"), }, + ReschedulePolicy: &ReschedulePolicy{ + Attempts: helper.IntToPtr(2), + Interval: helper.TimeToPtr(1 * time.Hour), + }, Tasks: []*Task{ { Name: "task1", @@ -326,6 +334,10 @@ func TestJobs_Canonicalize(t *testing.T) { Delay: helper.TimeToPtr(25 * time.Second), Mode: helper.StringToPtr("delay"), }, + ReschedulePolicy: &ReschedulePolicy{ + Attempts: helper.IntToPtr(2), + Interval: helper.TimeToPtr(1 * time.Hour), + }, EphemeralDisk: &EphemeralDisk{ Sticky: helper.BoolToPtr(false), Migrate: helper.BoolToPtr(false), @@ -537,6 +549,10 @@ func TestJobs_Canonicalize(t *testing.T) { Interval: helper.TimeToPtr(1 * time.Minute), Mode: helper.StringToPtr("delay"), }, + ReschedulePolicy: &ReschedulePolicy{ + Attempts: helper.IntToPtr(2), + Interval: helper.TimeToPtr(1 * time.Hour), + }, Update: &UpdateStrategy{ Stagger: helper.TimeToPtr(2 * time.Second), MaxParallel: helper.IntToPtr(2), @@ -569,6 +585,10 @@ func TestJobs_Canonicalize(t *testing.T) { Interval: helper.TimeToPtr(1 * time.Minute), Mode: helper.StringToPtr("delay"), }, + ReschedulePolicy: &ReschedulePolicy{ + Attempts: helper.IntToPtr(2), + Interval: helper.TimeToPtr(1 * time.Hour), + }, Update: &UpdateStrategy{ Stagger: helper.TimeToPtr(1 * time.Second), MaxParallel: helper.IntToPtr(1), diff --git a/api/tasks.go b/api/tasks.go index a7e3de40a..283ad70d3 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -8,6 +8,7 @@ import ( "time" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/structs" ) // MemoryStats holds memory usage related stats @@ -78,6 +79,48 @@ func (r *RestartPolicy) Merge(rp *RestartPolicy) { } } +// Reschedule configures how Tasks are rescheduled when they crash or fail. +type ReschedulePolicy struct { + // Attempts limits the number of rescheduling attempts that can occur in an interval. + Attempts *int `mapstructure:"attempts"` + + // Interval is a duration in which we can limit the number of reschedule attempts. + Interval *time.Duration `mapstructure:"interval"` +} + +func (r *ReschedulePolicy) Merge(rp *ReschedulePolicy) { + if rp.Interval != nil { + r.Interval = rp.Interval + } + if rp.Attempts != nil { + r.Attempts = rp.Attempts + } +} + +func (r *ReschedulePolicy) Copy() *ReschedulePolicy { + if r == nil { + return nil + } + nrp := new(ReschedulePolicy) + *nrp = *r + return nrp +} + +func (r *ReschedulePolicy) Empty() bool { + if r == nil { + return true + } + + if r.Attempts != nil && *r.Attempts != 0 { + return false + } + + if r.Interval != nil && *r.Interval != 0 { + return false + } + return true +} + // CheckRestart describes if and when a task should be restarted based on // failing health checks. type CheckRestart struct { @@ -222,14 +265,15 @@ func (e *EphemeralDisk) Canonicalize() { // TaskGroup is the unit of scheduling. type TaskGroup struct { - Name *string - Count *int - Constraints []*Constraint - Tasks []*Task - RestartPolicy *RestartPolicy - EphemeralDisk *EphemeralDisk - Update *UpdateStrategy - Meta map[string]string + Name *string + Count *int + Constraints []*Constraint + Tasks []*Task + RestartPolicy *RestartPolicy + ReschedulePolicy *ReschedulePolicy + EphemeralDisk *EphemeralDisk + Update *UpdateStrategy + Meta map[string]string } // NewTaskGroup creates a new TaskGroup. @@ -272,6 +316,41 @@ func (g *TaskGroup) Canonicalize(job *Job) { g.Update.Canonicalize() } + // Merge the reschedule policy from the job + if jr, tr := job.Reschedule != nil, g.ReschedulePolicy != nil; jr && tr { + jobReschedule := job.Reschedule.Copy() + jobReschedule.Merge(g.ReschedulePolicy) + g.ReschedulePolicy = jobReschedule + } else if jr && !job.Reschedule.Empty() { + jobReschedule := job.Reschedule.Copy() + g.ReschedulePolicy = jobReschedule + } + + // Merge with default reschedule policy + var defaultReschedulePolicy *ReschedulePolicy + switch *job.Type { + case "service": + defaultReschedulePolicy = &ReschedulePolicy{ + Attempts: helper.IntToPtr(structs.DefaultServiceJobReschedulePolicy.Attempts), + Interval: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.Interval), + } + case "batch": + defaultReschedulePolicy = &ReschedulePolicy{ + Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts), + Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval), + } + default: + defaultReschedulePolicy = &ReschedulePolicy{ + Attempts: helper.IntToPtr(0), + Interval: helper.TimeToPtr(0 * time.Second), + } + } + + if g.ReschedulePolicy != nil { + defaultReschedulePolicy.Merge(g.ReschedulePolicy) + } + g.ReschedulePolicy = defaultReschedulePolicy + var defaultRestartPolicy *RestartPolicy switch *job.Type { case "service", "system": diff --git a/api/tasks_test.go b/api/tasks_test.go index 7542c6094..7b2fab461 100644 --- a/api/tasks_test.go +++ b/api/tasks_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/assert" ) @@ -268,6 +269,105 @@ func TestTaskGroup_Canonicalize_Update(t *testing.T) { assert.Nil(t, tg.Update) } +// Verifies that reschedule policy is merged correctly +func TestTaskGroup_Canonicalize_ReschedulePolicy(t *testing.T) { + type testCase struct { + desc string + jobReschedulePolicy *ReschedulePolicy + taskReschedulePolicy *ReschedulePolicy + expected *ReschedulePolicy + } + + testCases := []testCase{ + { + desc: "Default", + jobReschedulePolicy: nil, + taskReschedulePolicy: nil, + expected: &ReschedulePolicy{ + Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts), + Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval), + }, + }, + { + desc: "Empty job reschedule policy", + jobReschedulePolicy: &ReschedulePolicy{ + Attempts: helper.IntToPtr(0), + Interval: helper.TimeToPtr(0), + }, + taskReschedulePolicy: nil, + expected: &ReschedulePolicy{ + Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts), + Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval), + }, + }, + { + desc: "Inherit from job", + jobReschedulePolicy: &ReschedulePolicy{ + Attempts: helper.IntToPtr(1), + Interval: helper.TimeToPtr(20 * time.Second), + }, + taskReschedulePolicy: nil, + expected: &ReschedulePolicy{ + Attempts: helper.IntToPtr(1), + Interval: helper.TimeToPtr(20 * time.Second), + }, + }, + { + desc: "Set in task", + jobReschedulePolicy: nil, + taskReschedulePolicy: &ReschedulePolicy{ + Attempts: helper.IntToPtr(5), + Interval: helper.TimeToPtr(2 * time.Minute), + }, + expected: &ReschedulePolicy{ + Attempts: helper.IntToPtr(5), + Interval: helper.TimeToPtr(2 * time.Minute), + }, + }, + { + desc: "Merge from job", + jobReschedulePolicy: &ReschedulePolicy{ + Attempts: helper.IntToPtr(1), + }, + taskReschedulePolicy: &ReschedulePolicy{ + Interval: helper.TimeToPtr(5 * time.Minute), + }, + expected: &ReschedulePolicy{ + Attempts: helper.IntToPtr(1), + Interval: helper.TimeToPtr(5 * time.Minute), + }, + }, + { + desc: "Attempts from job, default interval", + jobReschedulePolicy: &ReschedulePolicy{ + Attempts: helper.IntToPtr(1), + }, + taskReschedulePolicy: nil, + expected: &ReschedulePolicy{ + Attempts: helper.IntToPtr(1), + Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + job := &Job{ + ID: helper.StringToPtr("test"), + Reschedule: tc.jobReschedulePolicy, + Type: helper.StringToPtr(JobTypeBatch), + } + job.Canonicalize() + tg := &TaskGroup{ + Name: helper.StringToPtr("foo"), + ReschedulePolicy: tc.taskReschedulePolicy, + } + tg.Canonicalize(job) + assert.Equal(t, tc.expected, tg.ReschedulePolicy) + }) + } +} + // TestService_CheckRestart asserts Service.CheckRestart settings are properly // inherited by Checks. func TestService_CheckRestart(t *testing.T) { diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 6b0e3a565..c661e4b0b 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -638,6 +638,11 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) { Mode: *taskGroup.RestartPolicy.Mode, } + tg.ReschedulePolicy = &structs.ReschedulePolicy{ + Attempts: *taskGroup.ReschedulePolicy.Attempts, + Interval: *taskGroup.ReschedulePolicy.Interval, + } + tg.EphemeralDisk = &structs.EphemeralDisk{ Sticky: *taskGroup.EphemeralDisk.Sticky, SizeMB: *taskGroup.EphemeralDisk.SizeMB, diff --git a/jobspec/parse.go b/jobspec/parse.go index babe41b17..f96785a15 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -108,6 +108,7 @@ func parseJob(result *api.Job, list *ast.ObjectList) error { delete(m, "periodic") delete(m, "vault") delete(m, "parameterized") + delete(m, "reschedule") // Set the ID and name to the object key result.ID = helper.StringToPtr(obj.Keys[0].Token.Value().(string)) @@ -143,6 +144,7 @@ func parseJob(result *api.Job, list *ast.ObjectList) error { "task", "type", "update", + "reschedule", "vault", "vault_token", } @@ -178,6 +180,13 @@ func parseJob(result *api.Job, list *ast.ObjectList) error { } } + // If we have a reschedule stanza, then parse that + if o := listVal.Filter("reschedule"); len(o.Items) > 0 { + if err := parseReschedulePolicy(&result.Reschedule, o); err != nil { + return multierror.Prefix(err, fmt.Sprintf("'%s', reschedule ->")) + } + } + // Parse out meta fields. These are in HCL as a list so we need // to iterate over them and merge them. if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 { @@ -274,6 +283,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { "task", "ephemeral_disk", "update", + "reschedule", "vault", } if err := helper.CheckHCLKeys(listVal, valid); err != nil { @@ -313,6 +323,12 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { } } + // Parse reschedule policy + if o := listVal.Filter("reschedule"); len(o.Items) > 0 { + if err := parseReschedulePolicy(&g.ReschedulePolicy, o); err != nil { + return multierror.Prefix(err, fmt.Sprintf("'%s', reschedule ->", n)) + } + } // Parse ephemeral disk if o := listVal.Filter("ephemeral_disk"); len(o.Items) > 0 { g.EphemeralDisk = &api.EphemeralDisk{} @@ -417,6 +433,46 @@ func parseRestartPolicy(final **api.RestartPolicy, list *ast.ObjectList) error { return nil } +func parseReschedulePolicy(final **api.ReschedulePolicy, list *ast.ObjectList) error { + list = list.Elem() + if len(list.Items) > 1 { + return fmt.Errorf("only one 'reschedule' block allowed") + } + + // Get our job object + obj := list.Items[0] + + // Check for invalid keys + valid := []string{ + "attempts", + "interval", + } + if err := helper.CheckHCLKeys(obj.Val, valid); err != nil { + return err + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, obj.Val); err != nil { + return err + } + + var result api.ReschedulePolicy + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + WeaklyTypedInput: true, + Result: &result, + }) + if err != nil { + return err + } + if err := dec.Decode(m); err != nil { + return err + } + + *final = &result + return nil +} + func parseConstraints(result *[]*api.Constraint, list *ast.ObjectList) error { for _, o := range list.Elem().Items { // Check for invalid keys diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 4134e9ee4..90901ba16 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -94,6 +94,10 @@ func TestParse(t *testing.T) { Delay: helper.TimeToPtr(15 * time.Second), Mode: helper.StringToPtr("delay"), }, + ReschedulePolicy: &api.ReschedulePolicy{ + Interval: helper.TimeToPtr(12 * time.Hour), + Attempts: helper.IntToPtr(5), + }, EphemeralDisk: &api.EphemeralDisk{ Sticky: helper.BoolToPtr(true), SizeMB: helper.IntToPtr(150), @@ -667,6 +671,36 @@ func TestParse(t *testing.T) { }, false, }, + { + "reschedule-job.hcl", + &api.Job{ + ID: helper.StringToPtr("foo"), + Name: helper.StringToPtr("foo"), + Type: helper.StringToPtr("batch"), + Datacenters: []string{"dc1"}, + Reschedule: &api.ReschedulePolicy{ + Attempts: helper.IntToPtr(15), + Interval: helper.TimeToPtr(30 * time.Minute), + }, + TaskGroups: []*api.TaskGroup{ + { + Name: helper.StringToPtr("bar"), + Count: helper.IntToPtr(3), + Tasks: []*api.Task{ + { + Name: "bar", + Driver: "raw_exec", + Config: map[string]interface{}{ + "command": "bash", + "args": []interface{}{"-c", "echo hi"}, + }, + }, + }, + }, + }, + }, + false, + }, } for _, tc := range cases { diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index 81480bc23..9942e3dfc 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -48,6 +48,11 @@ job "binstore-storagelocker" { mode = "delay" } + reschedule { + attempts = 5 + interval = "12h" + } + ephemeral_disk { sticky = true size = 150 diff --git a/jobspec/test-fixtures/reschedule-job.hcl b/jobspec/test-fixtures/reschedule-job.hcl new file mode 100644 index 000000000..323fef882 --- /dev/null +++ b/jobspec/test-fixtures/reschedule-job.hcl @@ -0,0 +1,18 @@ +job "foo" { + datacenters = ["dc1"] + type = "batch" + reschedule { + attempts = 15 + interval = "30m" + } + group "bar" { + count = 3 + task "bar" { + driver = "raw_exec" + config { + command = "bash" + args = ["-c", "echo hi"] + } + } + } +}