mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 10:55:42 +03:00
Merge pull request #4224 from hashicorp/b-cron-parse
Handle potential panic in cron parsing
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/gorhill/cronexpr"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -537,14 +538,14 @@ func (p *PeriodicConfig) Canonicalize() {
|
||||
// passed time. If no matching instance exists, the zero value of time.Time is
|
||||
// returned. The `time.Location` of the returned value matches that of the
|
||||
// passed time.
|
||||
func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
|
||||
func (p *PeriodicConfig) Next(fromTime time.Time) (time.Time, error) {
|
||||
if *p.SpecType == PeriodicSpecCron {
|
||||
if e, err := cronexpr.Parse(*p.Spec); err == nil {
|
||||
return e.Next(fromTime)
|
||||
return structs.CronParseNext(e, fromTime, *p.Spec)
|
||||
}
|
||||
}
|
||||
|
||||
return time.Time{}
|
||||
return time.Time{}, nil
|
||||
}
|
||||
|
||||
func (p *PeriodicConfig) GetLocation() (*time.Location, error) {
|
||||
|
||||
@@ -247,9 +247,13 @@ func (c *JobRunCommand) Run(args []string) int {
|
||||
loc, err := job.Periodic.GetLocation()
|
||||
if err == nil {
|
||||
now := time.Now().In(loc)
|
||||
next := job.Periodic.Next(now)
|
||||
c.Ui.Output(fmt.Sprintf("Approximate next launch time: %s (%s from now)",
|
||||
formatTime(next), formatTimeDifference(now, next, time.Second)))
|
||||
next, err := job.Periodic.Next(now)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error determining next launch time: %v", err))
|
||||
} else {
|
||||
c.Ui.Output(fmt.Sprintf("Approximate next launch time: %s (%s from now)",
|
||||
formatTime(next), formatTimeDifference(now, next, time.Second)))
|
||||
}
|
||||
}
|
||||
} else if !paramjob {
|
||||
c.Ui.Output("Evaluation ID: " + evalID)
|
||||
|
||||
@@ -185,10 +185,12 @@ func (c *JobStatusCommand) Run(args []string) int {
|
||||
location, err := job.Periodic.GetLocation()
|
||||
if err == nil {
|
||||
now := time.Now().In(location)
|
||||
next := job.Periodic.Next(now)
|
||||
basic = append(basic, fmt.Sprintf("Next Periodic Launch|%s",
|
||||
fmt.Sprintf("%s (%s from now)",
|
||||
formatTime(next), formatTimeDifference(now, next, time.Second))))
|
||||
next, err := job.Periodic.Next(now)
|
||||
if err == nil {
|
||||
basic = append(basic, fmt.Sprintf("Next Periodic Launch|%s",
|
||||
fmt.Sprintf("%s (%s from now)",
|
||||
formatTime(next), formatTimeDifference(now, next, time.Second))))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -425,7 +425,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
|
||||
// tracking it.
|
||||
if err := n.periodicDispatcher.Add(req.Job); err != nil {
|
||||
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err)
|
||||
return err
|
||||
return fmt.Errorf("failed adding job to periodic dispatcher: %v", err)
|
||||
}
|
||||
|
||||
// Create a watch set
|
||||
|
||||
@@ -1217,7 +1217,10 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
|
||||
|
||||
// If it is a periodic job calculate the next launch
|
||||
if args.Job.IsPeriodic() && args.Job.Periodic.Enabled {
|
||||
reply.NextPeriodicLaunch = args.Job.Periodic.Next(time.Now().In(args.Job.Periodic.GetLocation()))
|
||||
reply.NextPeriodicLaunch, err = args.Job.Periodic.Next(time.Now().In(args.Job.Periodic.GetLocation()))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse cron expression: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
reply.FailedTGAllocs = updatedEval.FailedTGAllocs
|
||||
|
||||
@@ -379,7 +379,8 @@ func (s *Server) restorePeriodicDispatcher() error {
|
||||
}
|
||||
|
||||
if err := s.periodicDispatcher.Add(job); err != nil {
|
||||
return err
|
||||
s.logger.Printf("[ERR] nomad.periodic: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// We do not need to force run the job since it isn't active.
|
||||
@@ -400,7 +401,11 @@ func (s *Server) restorePeriodicDispatcher() error {
|
||||
}
|
||||
|
||||
// nextLaunch is the next launch that should occur.
|
||||
nextLaunch := job.Periodic.Next(launch.Launch.In(job.Periodic.GetLocation()))
|
||||
nextLaunch, err := job.Periodic.Next(launch.Launch.In(job.Periodic.GetLocation()))
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic launch for job %s: %v", job.NamespacedID(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
// We skip force launching the job if there should be no next launch
|
||||
// (the zero case) or if the next launch time is in the future. If it is
|
||||
|
||||
@@ -221,7 +221,10 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
|
||||
|
||||
// Add or update the job.
|
||||
p.tracked[tuple] = job
|
||||
next := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
|
||||
next, err := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed adding job %s: %v", job.NamespacedID(), err)
|
||||
}
|
||||
if tracked {
|
||||
if err := p.heap.Update(job, next); err != nil {
|
||||
return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
|
||||
@@ -344,9 +347,11 @@ func (p *PeriodicDispatch) run(ctx context.Context) {
|
||||
func (p *PeriodicDispatch) dispatch(job *structs.Job, launchTime time.Time) {
|
||||
p.l.Lock()
|
||||
|
||||
nextLaunch := job.Periodic.Next(launchTime)
|
||||
if err := p.heap.Update(job, nextLaunch); err != nil {
|
||||
p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %q (%s): %v", job.ID, job.Namespace, err)
|
||||
nextLaunch, err := job.Periodic.Next(launchTime)
|
||||
if err != nil {
|
||||
p.logger.Printf("[ERR] nomad.periodic: failed to parse next periodic launch for job %s: %v", job.NamespacedID(), err)
|
||||
} else if err := p.heap.Update(job, nextLaunch); err != nil {
|
||||
p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %s: %v", job.NamespacedID(), err)
|
||||
}
|
||||
|
||||
// If the job prohibits overlapping and there are running children, we skip
|
||||
|
||||
@@ -1997,6 +1997,14 @@ type Job struct {
|
||||
JobModifyIndex uint64
|
||||
}
|
||||
|
||||
// NamespacedID returns the namespaced id useful for logging
|
||||
func (j *Job) NamespacedID() *NamespacedID {
|
||||
return &NamespacedID{
|
||||
ID: j.ID,
|
||||
Namespace: j.Namespace,
|
||||
}
|
||||
}
|
||||
|
||||
// Canonicalize is used to canonicalize fields in the Job. This should be called
|
||||
// when registering a Job. A set of warnings are returned if the job was changed
|
||||
// in anyway that the user should be made aware of.
|
||||
@@ -2649,20 +2657,33 @@ func (p *PeriodicConfig) Canonicalize() {
|
||||
p.location = l
|
||||
}
|
||||
|
||||
// CronParseNext is a helper that parses the next time for the given expression
|
||||
// but captures any panic that may occur in the underlying library.
|
||||
func CronParseNext(e *cronexpr.Expression, fromTime time.Time, spec string) (t time.Time, err error) {
|
||||
defer func() {
|
||||
if recover() != nil {
|
||||
t = time.Time{}
|
||||
err = fmt.Errorf("failed parsing cron expression: %q", spec)
|
||||
}
|
||||
}()
|
||||
|
||||
return e.Next(fromTime), nil
|
||||
}
|
||||
|
||||
// Next returns the closest time instant matching the spec that is after the
|
||||
// passed time. If no matching instance exists, the zero value of time.Time is
|
||||
// returned. The `time.Location` of the returned value matches that of the
|
||||
// passed time.
|
||||
func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
|
||||
func (p *PeriodicConfig) Next(fromTime time.Time) (time.Time, error) {
|
||||
switch p.SpecType {
|
||||
case PeriodicSpecCron:
|
||||
if e, err := cronexpr.Parse(p.Spec); err == nil {
|
||||
return e.Next(fromTime)
|
||||
return CronParseNext(e, fromTime, p.Spec)
|
||||
}
|
||||
case PeriodicSpecTest:
|
||||
split := strings.Split(p.Spec, ",")
|
||||
if len(split) == 1 && split[0] == "" {
|
||||
return time.Time{}
|
||||
return time.Time{}, nil
|
||||
}
|
||||
|
||||
// Parse the times
|
||||
@@ -2670,7 +2691,7 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
|
||||
for i, s := range split {
|
||||
unix, err := strconv.Atoi(s)
|
||||
if err != nil {
|
||||
return time.Time{}
|
||||
return time.Time{}, nil
|
||||
}
|
||||
|
||||
times[i] = time.Unix(int64(unix), 0)
|
||||
@@ -2679,12 +2700,12 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
|
||||
// Find the next match
|
||||
for _, next := range times {
|
||||
if fromTime.Before(next) {
|
||||
return next
|
||||
return next, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return time.Time{}
|
||||
return time.Time{}, nil
|
||||
}
|
||||
|
||||
// GetLocation returns the location to use for determining the time zone to run
|
||||
|
||||
@@ -2009,15 +2009,44 @@ func TestPeriodicConfig_ValidCron(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPeriodicConfig_NextCron(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
type testExpectation struct {
|
||||
Time time.Time
|
||||
HasError bool
|
||||
ErrorMsg string
|
||||
}
|
||||
|
||||
from := time.Date(2009, time.November, 10, 23, 22, 30, 0, time.UTC)
|
||||
specs := []string{"0 0 29 2 * 1980", "*/5 * * * *"}
|
||||
expected := []time.Time{{}, time.Date(2009, time.November, 10, 23, 25, 0, 0, time.UTC)}
|
||||
specs := []string{"0 0 29 2 * 1980",
|
||||
"*/5 * * * *",
|
||||
"1 15-0 * * 1-5"}
|
||||
expected := []*testExpectation{
|
||||
{
|
||||
Time: time.Time{},
|
||||
HasError: false,
|
||||
},
|
||||
{
|
||||
Time: time.Date(2009, time.November, 10, 23, 25, 0, 0, time.UTC),
|
||||
HasError: false,
|
||||
},
|
||||
{
|
||||
Time: time.Time{},
|
||||
HasError: true,
|
||||
ErrorMsg: "failed parsing cron expression",
|
||||
},
|
||||
}
|
||||
|
||||
for i, spec := range specs {
|
||||
p := &PeriodicConfig{Enabled: true, SpecType: PeriodicSpecCron, Spec: spec}
|
||||
p.Canonicalize()
|
||||
n := p.Next(from)
|
||||
if expected[i] != n {
|
||||
t.Fatalf("Next(%v) returned %v; want %v", from, n, expected[i])
|
||||
n, err := p.Next(from)
|
||||
nextExpected := expected[i]
|
||||
|
||||
require.Equal(nextExpected.Time, n)
|
||||
require.Equal(err != nil, nextExpected.HasError)
|
||||
if err != nil {
|
||||
require.True(strings.Contains(err.Error(), nextExpected.ErrorMsg))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2034,6 +2063,8 @@ func TestPeriodicConfig_ValidTimeZone(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPeriodicConfig_DST(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
// On Sun, Mar 12, 2:00 am 2017: +1 hour UTC
|
||||
p := &PeriodicConfig{
|
||||
Enabled: true,
|
||||
@@ -2050,15 +2081,14 @@ func TestPeriodicConfig_DST(t *testing.T) {
|
||||
e1 := time.Date(2017, time.March, 11, 10, 0, 0, 0, time.UTC)
|
||||
e2 := time.Date(2017, time.March, 12, 9, 0, 0, 0, time.UTC)
|
||||
|
||||
n1 := p.Next(t1).UTC()
|
||||
n2 := p.Next(t2).UTC()
|
||||
n1, err := p.Next(t1)
|
||||
require.Nil(err)
|
||||
|
||||
if !reflect.DeepEqual(e1, n1) {
|
||||
t.Fatalf("Got %v; want %v", n1, e1)
|
||||
}
|
||||
if !reflect.DeepEqual(e2, n2) {
|
||||
t.Fatalf("Got %v; want %v", n1, e1)
|
||||
}
|
||||
n2, err := p.Next(t2)
|
||||
require.Nil(err)
|
||||
|
||||
require.Equal(e1, n1.UTC())
|
||||
require.Equal(e2, n2.UTC())
|
||||
}
|
||||
|
||||
func TestRestartPolicy_Validate(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user