mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
Added support for parsing restart blocks
This commit is contained in:
@@ -104,6 +104,17 @@ job "example" {
|
||||
# Defaults to 1
|
||||
# count = 1
|
||||
|
||||
# Restart Policy - This block defines the restart policy for TaskGroups
|
||||
# attempts defines the number of restarts Nomad will do if Tasks
|
||||
# in this TaskGroup fails in a rolling window of interval duration
|
||||
# The delay value makes Nomad wait for that duration to restart after a Task
|
||||
# fails or crashes.
|
||||
restart {
|
||||
interval = 5m
|
||||
attempts = 10
|
||||
delay = 25s
|
||||
}
|
||||
|
||||
# Define a task to run
|
||||
task "redis" {
|
||||
# Use Docker to run the task.
|
||||
|
||||
@@ -30,6 +30,7 @@ func Parse(r io.Reader) (*structs.Job, error) {
|
||||
|
||||
// Parse the buffer
|
||||
obj, err := hcl.Parse(buf.String())
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing: %s", err)
|
||||
}
|
||||
@@ -124,7 +125,7 @@ func parseJob(result *structs.Job, obj *hclobj.Object) error {
|
||||
}
|
||||
}
|
||||
|
||||
// If we have tasks outside, do those
|
||||
// If we have tasks outside, create TaskGroups for them
|
||||
if o := obj.Get("task", false); o != nil {
|
||||
var tasks []*structs.Task
|
||||
if err := parseTasks(&tasks, o); err != nil {
|
||||
@@ -134,9 +135,10 @@ func parseJob(result *structs.Job, obj *hclobj.Object) error {
|
||||
result.TaskGroups = make([]*structs.TaskGroup, len(tasks), len(tasks)*2)
|
||||
for i, t := range tasks {
|
||||
result.TaskGroups[i] = &structs.TaskGroup{
|
||||
Name: t.Name,
|
||||
Count: 1,
|
||||
Tasks: []*structs.Task{t},
|
||||
Name: t.Name,
|
||||
Count: 1,
|
||||
Tasks: []*structs.Task{t},
|
||||
RestartPolicy: structs.NewRestartPolicy(result.Type),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -180,6 +182,7 @@ func parseGroups(result *structs.Job, obj *hclobj.Object) error {
|
||||
delete(m, "constraint")
|
||||
delete(m, "meta")
|
||||
delete(m, "task")
|
||||
delete(m, "restart")
|
||||
|
||||
// Default count to 1 if not specified
|
||||
if _, ok := m["count"]; !ok {
|
||||
@@ -200,6 +203,10 @@ func parseGroups(result *structs.Job, obj *hclobj.Object) error {
|
||||
}
|
||||
}
|
||||
|
||||
if err := parseRestartPolicy(structs.NewRestartPolicy(result.Type), o); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Parse out meta fields. These are in HCL as a list so we need
|
||||
// to iterate over them and merge them.
|
||||
if metaO := o.Get("meta", false); metaO != nil {
|
||||
@@ -228,6 +235,42 @@ func parseGroups(result *structs.Job, obj *hclobj.Object) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseRestartPolicy(result *structs.RestartPolicy, obj *hclobj.Object) error {
|
||||
var restartHclObj *hclobj.Object
|
||||
var m map[string]interface{}
|
||||
if restartHclObj = obj.Get("restart", false); restartHclObj == nil {
|
||||
return nil
|
||||
}
|
||||
if err := hcl.DecodeObject(&m, restartHclObj); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if delay, ok := m["delay"]; ok {
|
||||
d, err := toDuration(delay)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Invalid Delay time in restart policy: %v", err)
|
||||
}
|
||||
result.Delay = d
|
||||
}
|
||||
|
||||
if interval, ok := m["interval"]; ok {
|
||||
i, err := toDuration(interval)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Invalid Interval time in restart policy: %v", err)
|
||||
}
|
||||
result.Interval = i
|
||||
}
|
||||
|
||||
if attempts, ok := m["attempts"]; ok {
|
||||
a, err := toInteger(attempts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Invalid value in attempts: %v", err)
|
||||
}
|
||||
result.Attempts = a
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseConstraints(result *[]*structs.Constraint, obj *hclobj.Object) error {
|
||||
for _, o := range obj.Elem(false) {
|
||||
var m map[string]interface{}
|
||||
@@ -477,3 +520,35 @@ func parseUpdate(result *structs.UpdateStrategy, obj *hclobj.Object) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func toDuration(value interface{}) (time.Duration, error) {
|
||||
var dur time.Duration
|
||||
var err error
|
||||
switch v := value.(type) {
|
||||
case string:
|
||||
dur, err = time.ParseDuration(v)
|
||||
case int:
|
||||
dur = time.Duration(v) * time.Second
|
||||
default:
|
||||
err = fmt.Errorf("Invalid time %s", value)
|
||||
}
|
||||
|
||||
return dur, err
|
||||
}
|
||||
|
||||
func toInteger(value interface{}) (int, error) {
|
||||
var integer int
|
||||
var err error
|
||||
switch v := value.(type) {
|
||||
case string:
|
||||
var i int64
|
||||
i, err = strconv.ParseInt(v, 10, 32)
|
||||
integer = int(i)
|
||||
case int:
|
||||
integer = v
|
||||
default:
|
||||
err = fmt.Errorf("Value: %v can't be parsed into int", value)
|
||||
}
|
||||
|
||||
return integer, err
|
||||
}
|
||||
|
||||
@@ -48,6 +48,11 @@ func TestParse(t *testing.T) {
|
||||
&structs.TaskGroup{
|
||||
Name: "outside",
|
||||
Count: 1,
|
||||
RestartPolicy: &structs.RestartPolicy{
|
||||
Attempts: 2,
|
||||
Interval: 1 * time.Minute,
|
||||
Delay: 15 * time.Second,
|
||||
},
|
||||
Tasks: []*structs.Task{
|
||||
&structs.Task{
|
||||
Name: "outside",
|
||||
|
||||
@@ -31,6 +31,11 @@ job "binstore-storagelocker" {
|
||||
|
||||
group "binsl" {
|
||||
count = 5
|
||||
restart {
|
||||
attempts = 5
|
||||
interval = "10m"
|
||||
delay = "15s"
|
||||
}
|
||||
task "binstore" {
|
||||
driver = "docker"
|
||||
config {
|
||||
|
||||
@@ -898,6 +898,37 @@ func (u *UpdateStrategy) Rolling() bool {
|
||||
return u.Stagger > 0 && u.MaxParallel > 0
|
||||
}
|
||||
|
||||
// RestartPolicy influences how Nomad restarts Tasks when they
|
||||
// crash or fail.
|
||||
type RestartPolicy struct {
|
||||
Attempts int
|
||||
Interval time.Duration
|
||||
Delay time.Duration
|
||||
}
|
||||
|
||||
func (r *RestartPolicy) Validate() error {
|
||||
if time.Duration(r.Attempts)*r.Delay > r.Interval {
|
||||
return fmt.Errorf("Nomad can't restart the TaskGroup %v times in an interval of %v with a delay of %v", r.Attempts, r.Interval, r.Delay)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewRestartPolicy(jobType string) *RestartPolicy {
|
||||
defaultDelayBetweenRestarts := 15 * time.Second
|
||||
defaultAttempts := 15
|
||||
var defaultRestartInterval time.Duration
|
||||
|
||||
if jobType == "service" {
|
||||
defaultRestartInterval = 1 * time.Minute
|
||||
defaultAttempts = 2
|
||||
}
|
||||
return &RestartPolicy{
|
||||
Attempts: defaultAttempts,
|
||||
Interval: defaultRestartInterval,
|
||||
Delay: defaultDelayBetweenRestarts,
|
||||
}
|
||||
}
|
||||
|
||||
// TaskGroup is an atomic unit of placement. Each task group belongs to
|
||||
// a job and may contain any number of tasks. A task group support running
|
||||
// in many replicas using the same configuration..
|
||||
@@ -913,6 +944,9 @@ type TaskGroup struct {
|
||||
// all the tasks contained.
|
||||
Constraints []*Constraint
|
||||
|
||||
//RestartPolicy of a TaskGroup
|
||||
RestartPolicy *RestartPolicy
|
||||
|
||||
// Tasks are the collection of tasks that this task group needs to run
|
||||
Tasks []*Task
|
||||
|
||||
@@ -940,6 +974,10 @@ func (tg *TaskGroup) Validate() error {
|
||||
}
|
||||
}
|
||||
|
||||
if err := tg.RestartPolicy.Validate(); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
|
||||
// Check for duplicate tasks
|
||||
tasks := make(map[string]int)
|
||||
for idx, task := range tg.Tasks {
|
||||
@@ -954,6 +992,7 @@ func (tg *TaskGroup) Validate() error {
|
||||
|
||||
// Validate the tasks
|
||||
for idx, task := range tg.Tasks {
|
||||
|
||||
if err := task.Validate(); err != nil {
|
||||
outer := fmt.Errorf("Task %d validation failed: %s", idx+1, err)
|
||||
mErr.Errors = append(mErr.Errors, outer)
|
||||
|
||||
Reference in New Issue
Block a user