diff --git a/command/init.go b/command/init.go index 0b9be934b..8827f5e9d 100644 --- a/command/init.go +++ b/command/init.go @@ -104,6 +104,17 @@ job "example" { # Defaults to 1 # count = 1 + # Restart Policy - This block defines the restart policy for TaskGroups + # attempts defines the number of restarts Nomad will do if Tasks + # in this TaskGroup fails in a rolling window of interval duration + # The delay value makes Nomad wait for that duration to restart after a Task + # fails or crashes. + restart { + interval = 5m + attempts = 10 + delay = 25s + } + # Define a task to run task "redis" { # Use Docker to run the task. diff --git a/jobspec/parse.go b/jobspec/parse.go index f63ac5294..c3c71ac9c 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -30,6 +30,7 @@ func Parse(r io.Reader) (*structs.Job, error) { // Parse the buffer obj, err := hcl.Parse(buf.String()) + if err != nil { return nil, fmt.Errorf("error parsing: %s", err) } @@ -124,7 +125,7 @@ func parseJob(result *structs.Job, obj *hclobj.Object) error { } } - // If we have tasks outside, do those + // If we have tasks outside, create TaskGroups for them if o := obj.Get("task", false); o != nil { var tasks []*structs.Task if err := parseTasks(&tasks, o); err != nil { @@ -134,9 +135,10 @@ func parseJob(result *structs.Job, obj *hclobj.Object) error { result.TaskGroups = make([]*structs.TaskGroup, len(tasks), len(tasks)*2) for i, t := range tasks { result.TaskGroups[i] = &structs.TaskGroup{ - Name: t.Name, - Count: 1, - Tasks: []*structs.Task{t}, + Name: t.Name, + Count: 1, + Tasks: []*structs.Task{t}, + RestartPolicy: structs.NewRestartPolicy(result.Type), } } } @@ -180,6 +182,7 @@ func parseGroups(result *structs.Job, obj *hclobj.Object) error { delete(m, "constraint") delete(m, "meta") delete(m, "task") + delete(m, "restart") // Default count to 1 if not specified if _, ok := m["count"]; !ok { @@ -200,6 +203,10 @@ func parseGroups(result *structs.Job, obj *hclobj.Object) error { } } + if err := parseRestartPolicy(structs.NewRestartPolicy(result.Type), o); err != nil { + return err + } + // Parse out meta fields. These are in HCL as a list so we need // to iterate over them and merge them. if metaO := o.Get("meta", false); metaO != nil { @@ -228,6 +235,42 @@ func parseGroups(result *structs.Job, obj *hclobj.Object) error { return nil } +func parseRestartPolicy(result *structs.RestartPolicy, obj *hclobj.Object) error { + var restartHclObj *hclobj.Object + var m map[string]interface{} + if restartHclObj = obj.Get("restart", false); restartHclObj == nil { + return nil + } + if err := hcl.DecodeObject(&m, restartHclObj); err != nil { + return err + } + + if delay, ok := m["delay"]; ok { + d, err := toDuration(delay) + if err != nil { + return fmt.Errorf("Invalid Delay time in restart policy: %v", err) + } + result.Delay = d + } + + if interval, ok := m["interval"]; ok { + i, err := toDuration(interval) + if err != nil { + return fmt.Errorf("Invalid Interval time in restart policy: %v", err) + } + result.Interval = i + } + + if attempts, ok := m["attempts"]; ok { + a, err := toInteger(attempts) + if err != nil { + return fmt.Errorf("Invalid value in attempts: %v", err) + } + result.Attempts = a + } + return nil +} + func parseConstraints(result *[]*structs.Constraint, obj *hclobj.Object) error { for _, o := range obj.Elem(false) { var m map[string]interface{} @@ -477,3 +520,35 @@ func parseUpdate(result *structs.UpdateStrategy, obj *hclobj.Object) error { } return nil } + +func toDuration(value interface{}) (time.Duration, error) { + var dur time.Duration + var err error + switch v := value.(type) { + case string: + dur, err = time.ParseDuration(v) + case int: + dur = time.Duration(v) * time.Second + default: + err = fmt.Errorf("Invalid time %s", value) + } + + return dur, err +} + +func toInteger(value interface{}) (int, error) { + var integer int + var err error + switch v := value.(type) { + case string: + var i int64 + i, err = strconv.ParseInt(v, 10, 32) + integer = int(i) + case int: + integer = v + default: + err = fmt.Errorf("Value: %v can't be parsed into int", value) + } + + return integer, err +} diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index f91789ddb..c3b91e785 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -48,6 +48,11 @@ func TestParse(t *testing.T) { &structs.TaskGroup{ Name: "outside", Count: 1, + RestartPolicy: &structs.RestartPolicy{ + Attempts: 2, + Interval: 1 * time.Minute, + Delay: 15 * time.Second, + }, Tasks: []*structs.Task{ &structs.Task{ Name: "outside", diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index 941272b2d..bf81a6ae7 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -31,6 +31,11 @@ job "binstore-storagelocker" { group "binsl" { count = 5 + restart { + attempts = 5 + interval = "10m" + delay = "15s" + } task "binstore" { driver = "docker" config { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index f5d20552a..8afe1c452 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -898,6 +898,37 @@ func (u *UpdateStrategy) Rolling() bool { return u.Stagger > 0 && u.MaxParallel > 0 } +// RestartPolicy influences how Nomad restarts Tasks when they +// crash or fail. +type RestartPolicy struct { + Attempts int + Interval time.Duration + Delay time.Duration +} + +func (r *RestartPolicy) Validate() error { + if time.Duration(r.Attempts)*r.Delay > r.Interval { + return fmt.Errorf("Nomad can't restart the TaskGroup %v times in an interval of %v with a delay of %v", r.Attempts, r.Interval, r.Delay) + } + return nil +} + +func NewRestartPolicy(jobType string) *RestartPolicy { + defaultDelayBetweenRestarts := 15 * time.Second + defaultAttempts := 15 + var defaultRestartInterval time.Duration + + if jobType == "service" { + defaultRestartInterval = 1 * time.Minute + defaultAttempts = 2 + } + return &RestartPolicy{ + Attempts: defaultAttempts, + Interval: defaultRestartInterval, + Delay: defaultDelayBetweenRestarts, + } +} + // TaskGroup is an atomic unit of placement. Each task group belongs to // a job and may contain any number of tasks. A task group support running // in many replicas using the same configuration.. @@ -913,6 +944,9 @@ type TaskGroup struct { // all the tasks contained. Constraints []*Constraint + //RestartPolicy of a TaskGroup + RestartPolicy *RestartPolicy + // Tasks are the collection of tasks that this task group needs to run Tasks []*Task @@ -940,6 +974,10 @@ func (tg *TaskGroup) Validate() error { } } + if err := tg.RestartPolicy.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + // Check for duplicate tasks tasks := make(map[string]int) for idx, task := range tg.Tasks { @@ -954,6 +992,7 @@ func (tg *TaskGroup) Validate() error { // Validate the tasks for idx, task := range tg.Tasks { + if err := task.Validate(); err != nil { outer := fmt.Errorf("Task %d validation failed: %s", idx+1, err) mErr.Errors = append(mErr.Errors, outer)