diff --git a/api/compose_test.go b/api/compose_test.go index 68801519f..2a509bc55 100644 --- a/api/compose_test.go +++ b/api/compose_test.go @@ -69,6 +69,7 @@ func TestCompose(t *testing.T) { Operand: "=", }, }, + RestartPolicy: NewRestartPolicy(), Tasks: []*Task{ &Task{ Name: "task1", diff --git a/api/tasks.go b/api/tasks.go index c1d5bf2ff..2535d5ec5 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -1,19 +1,42 @@ package api +import ( + "time" +) + +// RestartPolicy defines how the Nomad client restarts +// tasks in a taskgroup when they fail +type RestartPolicy struct { + Interval time.Duration + Attempts int + Delay time.Duration +} + +func NewRestartPolicy() *RestartPolicy { + return &RestartPolicy{ + Attempts: 10, + Interval: 3 * time.Minute, + Delay: 5 * time.Second, + } +} + // TaskGroup is the unit of scheduling. type TaskGroup struct { - Name string - Count int - Constraints []*Constraint - Tasks []*Task - Meta map[string]string + Name string + Count int + Constraints []*Constraint + Tasks []*Task + RestartPolicy *RestartPolicy + Meta map[string]string } // NewTaskGroup creates a new TaskGroup. func NewTaskGroup(name string, count int) *TaskGroup { + restartPolicy := NewRestartPolicy() return &TaskGroup{ - Name: name, - Count: count, + Name: name, + Count: count, + RestartPolicy: restartPolicy, } } diff --git a/api/tasks_test.go b/api/tasks_test.go index 877f84d5c..945fdf9bf 100644 --- a/api/tasks_test.go +++ b/api/tasks_test.go @@ -8,8 +8,9 @@ import ( func TestTaskGroup_NewTaskGroup(t *testing.T) { grp := NewTaskGroup("grp1", 2) expect := &TaskGroup{ - Name: "grp1", - Count: 2, + Name: "grp1", + Count: 2, + RestartPolicy: NewRestartPolicy(), } if !reflect.DeepEqual(grp, expect) { t.Fatalf("expect: %#v, got: %#v", expect, grp) diff --git a/command/init.go b/command/init.go index 0b9be934b..356337ae8 100644 --- a/command/init.go +++ b/command/init.go @@ -104,6 +104,17 @@ job "example" { # Defaults to 1 # count = 1 + # Restart Policy - This block defines the restart policy for TaskGroups, + # the attempts value defines the number of restarts Nomad will do if Tasks + # in this TaskGroup fails in a rolling window of interval duration + # The delay value makes Nomad wait for that duration to restart after a Task + # fails or crashes. + restart { + interval = "5m" + attempts = 10 + delay = "25s" + } + # Define a task to run task "redis" { # Use Docker to run the task. diff --git a/jobspec/parse.go b/jobspec/parse.go index f63ac5294..77f9b819f 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -124,7 +124,7 @@ func parseJob(result *structs.Job, obj *hclobj.Object) error { } } - // If we have tasks outside, do those + // If we have tasks outside, create TaskGroups for them if o := obj.Get("task", false); o != nil { var tasks []*structs.Task if err := parseTasks(&tasks, o); err != nil { @@ -134,9 +134,10 @@ func parseJob(result *structs.Job, obj *hclobj.Object) error { result.TaskGroups = make([]*structs.TaskGroup, len(tasks), len(tasks)*2) for i, t := range tasks { result.TaskGroups[i] = &structs.TaskGroup{ - Name: t.Name, - Count: 1, - Tasks: []*structs.Task{t}, + Name: t.Name, + Count: 1, + Tasks: []*structs.Task{t}, + RestartPolicy: structs.NewRestartPolicy(result.Type), } } } @@ -180,6 +181,7 @@ func parseGroups(result *structs.Job, obj *hclobj.Object) error { delete(m, "constraint") delete(m, "meta") delete(m, "task") + delete(m, "restart") // Default count to 1 if not specified if _, ok := m["count"]; !ok { @@ -199,6 +201,11 @@ func parseGroups(result *structs.Job, obj *hclobj.Object) error { return err } } + g.RestartPolicy = structs.NewRestartPolicy(result.Type) + + if err := parseRestartPolicy(g.RestartPolicy, o); err != nil { + return err + } // Parse out meta fields. These are in HCL as a list so we need // to iterate over them and merge them. @@ -228,6 +235,42 @@ func parseGroups(result *structs.Job, obj *hclobj.Object) error { return nil } +func parseRestartPolicy(result *structs.RestartPolicy, obj *hclobj.Object) error { + var restartHclObj *hclobj.Object + var m map[string]interface{} + if restartHclObj = obj.Get("restart", false); restartHclObj == nil { + return nil + } + if err := hcl.DecodeObject(&m, restartHclObj); err != nil { + return err + } + + if delay, ok := m["delay"]; ok { + d, err := toDuration(delay) + if err != nil { + return fmt.Errorf("Invalid Delay time in restart policy: %v", err) + } + result.Delay = d + } + + if interval, ok := m["interval"]; ok { + i, err := toDuration(interval) + if err != nil { + return fmt.Errorf("Invalid Interval time in restart policy: %v", err) + } + result.Interval = i + } + + if attempts, ok := m["attempts"]; ok { + a, err := toInteger(attempts) + if err != nil { + return fmt.Errorf("Invalid value in attempts: %v", err) + } + result.Attempts = a + } + return nil +} + func parseConstraints(result *[]*structs.Constraint, obj *hclobj.Object) error { for _, o := range obj.Elem(false) { var m map[string]interface{} @@ -455,19 +498,11 @@ func parseUpdate(result *structs.UpdateStrategy, obj *hclobj.Object) error { } for _, key := range []string{"stagger", "Stagger"} { if raw, ok := m[key]; ok { - switch v := raw.(type) { - case string: - dur, err := time.ParseDuration(v) - if err != nil { - return fmt.Errorf("invalid stagger time '%s'", raw) - } - m[key] = dur - case int: - m[key] = time.Duration(v) * time.Second - default: - return fmt.Errorf("invalid type for stagger time '%s'", - raw) + staggerTime, err := toDuration(raw) + if err != nil { + return fmt.Errorf("Invalid stagger time: %v", err) } + m[key] = staggerTime } } @@ -477,3 +512,35 @@ func parseUpdate(result *structs.UpdateStrategy, obj *hclobj.Object) error { } return nil } + +func toDuration(value interface{}) (time.Duration, error) { + var dur time.Duration + var err error + switch v := value.(type) { + case string: + dur, err = time.ParseDuration(v) + case int: + dur = time.Duration(v) * time.Second + default: + err = fmt.Errorf("Invalid time %s", value) + } + + return dur, err +} + +func toInteger(value interface{}) (int, error) { + var integer int + var err error + switch v := value.(type) { + case string: + var i int64 + i, err = strconv.ParseInt(v, 10, 32) + integer = int(i) + case int: + integer = v + default: + err = fmt.Errorf("Value: %v can't be parsed into int", value) + } + + return integer, err +} diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index f91789ddb..e785443b7 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -48,6 +48,11 @@ func TestParse(t *testing.T) { &structs.TaskGroup{ Name: "outside", Count: 1, + RestartPolicy: &structs.RestartPolicy{ + Attempts: 2, + Interval: 1 * time.Minute, + Delay: 15 * time.Second, + }, Tasks: []*structs.Task{ &structs.Task{ Name: "outside", @@ -77,6 +82,11 @@ func TestParse(t *testing.T) { "elb_interval": "10", "elb_checks": "3", }, + RestartPolicy: &structs.RestartPolicy{ + Interval: 10 * time.Minute, + Attempts: 5, + Delay: 15 * time.Second, + }, Tasks: []*structs.Task{ &structs.Task{ Name: "binstore", diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index 941272b2d..bf81a6ae7 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -31,6 +31,11 @@ job "binstore-storagelocker" { group "binsl" { count = 5 + restart { + attempts = 5 + interval = "10m" + delay = "15s" + } task "binstore" { driver = "docker" config { diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 87c426dce..329ecd872 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -1,6 +1,9 @@ package mock -import "github.com/hashicorp/nomad/nomad/structs" +import ( + "github.com/hashicorp/nomad/nomad/structs" + "time" +) func Node() *structs.Node { node := &structs.Node{ @@ -71,6 +74,11 @@ func Job() *structs.Job { &structs.TaskGroup{ Name: "web", Count: 10, + RestartPolicy: &structs.RestartPolicy{ + Attempts: 3, + Interval: 10 * time.Minute, + Delay: 1 * time.Minute, + }, Tasks: []*structs.Task{ &structs.Task{ Name: "web", @@ -131,6 +139,11 @@ func SystemJob() *structs.Job { &structs.TaskGroup{ Name: "web", Count: 1, + RestartPolicy: &structs.RestartPolicy{ + Attempts: 3, + Interval: 10 * time.Minute, + Delay: 1 * time.Minute, + }, Tasks: []*structs.Task{ &structs.Task{ Name: "web", diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index f5d20552a..15e58d333 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -16,6 +16,15 @@ import ( var ( ErrNoLeader = fmt.Errorf("No cluster leader") ErrNoRegionPath = fmt.Errorf("No path to region") + defaultServiceJobRestartPolicy = RestartPolicy{ + Delay: 15 * time.Second, + Attempts: 2, + Interval: 1 * time.Minute, + } + defaultBatchJobRestartPolicy = RestartPolicy{ + Delay: 15 * time.Second, + Attempts: 15, + } ) type MessageType uint8 @@ -898,6 +907,33 @@ func (u *UpdateStrategy) Rolling() bool { return u.Stagger > 0 && u.MaxParallel > 0 } +// RestartPolicy influences how Nomad restarts Tasks when they +// crash or fail. +type RestartPolicy struct { + Attempts int + Interval time.Duration + Delay time.Duration +} + +func (r *RestartPolicy) Validate() error { + if time.Duration(r.Attempts)*r.Delay > r.Interval { + return fmt.Errorf("Nomad can't restart the TaskGroup %v times in an interval of %v with a delay of %v", r.Attempts, r.Interval, r.Delay) + } + return nil +} + +func NewRestartPolicy(jobType string) *RestartPolicy { + switch jobType { + case JobTypeService: + rp := defaultServiceJobRestartPolicy + return &rp + case JobTypeBatch: + rp := defaultBatchJobRestartPolicy + return &rp + } + return nil +} + // TaskGroup is an atomic unit of placement. Each task group belongs to // a job and may contain any number of tasks. A task group support running // in many replicas using the same configuration.. @@ -913,6 +949,9 @@ type TaskGroup struct { // all the tasks contained. Constraints []*Constraint + //RestartPolicy of a TaskGroup + RestartPolicy *RestartPolicy + // Tasks are the collection of tasks that this task group needs to run Tasks []*Task @@ -940,6 +979,10 @@ func (tg *TaskGroup) Validate() error { } } + if err := tg.RestartPolicy.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + // Check for duplicate tasks tasks := make(map[string]int) for idx, task := range tg.Tasks { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index cabf83dfa..1f107b095 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1,11 +1,11 @@ package structs import ( + "github.com/hashicorp/go-multierror" "reflect" "strings" "testing" - - "github.com/hashicorp/go-multierror" + "time" ) func TestJob_Validate(t *testing.T) { @@ -44,11 +44,27 @@ func TestJob_Validate(t *testing.T) { TaskGroups: []*TaskGroup{ &TaskGroup{ Name: "web", + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + }, }, &TaskGroup{ Name: "web", + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + }, + }, + &TaskGroup{ + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + }, }, - &TaskGroup{}, }, } err = j.Validate() @@ -65,7 +81,13 @@ func TestJob_Validate(t *testing.T) { } func TestTaskGroup_Validate(t *testing.T) { - tg := &TaskGroup{} + tg := &TaskGroup{ + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + }, + } err := tg.Validate() mErr := err.(*multierror.Error) if !strings.Contains(mErr.Errors[0].Error(), "group name") { @@ -86,6 +108,11 @@ func TestTaskGroup_Validate(t *testing.T) { &Task{Name: "web"}, &Task{}, }, + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + }, } err = tg.Validate() mErr = err.(*multierror.Error)