From 7f989499ffb4bfc5eca31531ade7c385cfc77a56 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 1 Mar 2018 11:21:32 -0800 Subject: [PATCH] Correct defaulting --- api/jobs.go | 1 + api/tasks.go | 44 +++++++- api/tasks_test.go | 152 ++++++++++++++++++++++++++ command/agent/job_endpoint.go | 9 ++ command/agent/job_endpoint_test.go | 12 ++ jobspec/parse.go | 19 +++- jobspec/parse_test.go | 38 +++++++ jobspec/test-fixtures/migrate-job.hcl | 28 +++++ 8 files changed, 297 insertions(+), 6 deletions(-) create mode 100644 jobspec/test-fixtures/migrate-job.hcl diff --git a/api/jobs.go b/api/jobs.go index 9e3227af4..5fcecf403 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -559,6 +559,7 @@ type Job struct { ParameterizedJob *ParameterizedJobConfig Payload []byte Reschedule *ReschedulePolicy + Migrate *MigrateStrategy Meta map[string]string VaultToken *string `mapstructure:"vault_token"` Status *string diff --git a/api/tasks.go b/api/tasks.go index f7d3d9fb0..47b502d57 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -321,6 +321,30 @@ func (m *MigrateStrategy) Canonicalize() { } } +func (m *MigrateStrategy) Merge(o *MigrateStrategy) { + if o.MaxParallel != nil { + m.MaxParallel = o.MaxParallel + } + if o.HealthCheck != nil { + m.HealthCheck = o.HealthCheck + } + if o.MinHealthyTime != nil { + m.MinHealthyTime = o.MinHealthyTime + } + if o.HealthyDeadline != nil { + m.HealthyDeadline = o.HealthyDeadline + } +} + +func (m *MigrateStrategy) Copy() *MigrateStrategy { + if m == nil { + return nil + } + nm := new(MigrateStrategy) + *nm = *m + return nm +} + // TaskGroup is the unit of scheduling. type TaskGroup struct { Name *string @@ -415,7 +439,25 @@ func (g *TaskGroup) Canonicalize(job *Job) { } g.ReschedulePolicy = defaultReschedulePolicy - g.Migrate.Canonicalize() + // Merge the migrate strategy from the job + if jm, tm := job.Migrate != nil, g.Migrate != nil; jm && tm { + jobMigrate := job.Migrate.Copy() + jobMigrate.Merge(g.Migrate) + g.Migrate = jobMigrate + } else if jm { + jobMigrate := job.Migrate.Copy() + g.Migrate = jobMigrate + } + + // Merge with default reschedule policy + if *job.Type == "service" { + defaultMigrateStrategy := &MigrateStrategy{} + defaultMigrateStrategy.Canonicalize() + if g.Migrate != nil { + defaultMigrateStrategy.Merge(g.Migrate) + } + g.Migrate = defaultMigrateStrategy + } var defaultRestartPolicy *RestartPolicy switch *job.Type { diff --git a/api/tasks_test.go b/api/tasks_test.go index 3280507ad..d72acc179 100644 --- a/api/tasks_test.go +++ b/api/tasks_test.go @@ -430,6 +430,158 @@ func TestTaskGroup_Canonicalize_ReschedulePolicy(t *testing.T) { } } +// Verifies that migrate strategy is merged correctly +func TestTaskGroup_Canonicalize_MigrateStrategy(t *testing.T) { + type testCase struct { + desc string + jobType string + jobMigrate *MigrateStrategy + taskMigrate *MigrateStrategy + expected *MigrateStrategy + } + + testCases := []testCase{ + { + desc: "Default batch", + jobType: "batch", + jobMigrate: nil, + taskMigrate: nil, + expected: nil, + }, + { + desc: "Default service", + jobType: "service", + jobMigrate: nil, + taskMigrate: nil, + expected: &MigrateStrategy{ + MaxParallel: helper.IntToPtr(1), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(10 * time.Second), + HealthyDeadline: helper.TimeToPtr(5 * time.Minute), + }, + }, + { + desc: "Empty job migrate strategy", + jobType: "service", + jobMigrate: &MigrateStrategy{ + MaxParallel: helper.IntToPtr(0), + HealthCheck: helper.StringToPtr(""), + MinHealthyTime: helper.TimeToPtr(0), + HealthyDeadline: helper.TimeToPtr(0), + }, + taskMigrate: nil, + expected: &MigrateStrategy{ + MaxParallel: helper.IntToPtr(0), + HealthCheck: helper.StringToPtr(""), + MinHealthyTime: helper.TimeToPtr(0), + HealthyDeadline: helper.TimeToPtr(0), + }, + }, + { + desc: "Inherit from job", + jobType: "service", + jobMigrate: &MigrateStrategy{ + MaxParallel: helper.IntToPtr(3), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(2), + HealthyDeadline: helper.TimeToPtr(2), + }, + taskMigrate: nil, + expected: &MigrateStrategy{ + MaxParallel: helper.IntToPtr(3), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(2), + HealthyDeadline: helper.TimeToPtr(2), + }, + }, + { + desc: "Set in task", + jobType: "service", + jobMigrate: nil, + taskMigrate: &MigrateStrategy{ + MaxParallel: helper.IntToPtr(3), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(2), + HealthyDeadline: helper.TimeToPtr(2), + }, + expected: &MigrateStrategy{ + MaxParallel: helper.IntToPtr(3), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(2), + HealthyDeadline: helper.TimeToPtr(2), + }, + }, + { + desc: "Merge from job", + jobType: "service", + jobMigrate: &MigrateStrategy{ + MaxParallel: helper.IntToPtr(11), + }, + taskMigrate: &MigrateStrategy{ + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(2), + HealthyDeadline: helper.TimeToPtr(2), + }, + expected: &MigrateStrategy{ + MaxParallel: helper.IntToPtr(11), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(2), + HealthyDeadline: helper.TimeToPtr(2), + }, + }, + { + desc: "Override from group", + jobType: "service", + jobMigrate: &MigrateStrategy{ + MaxParallel: helper.IntToPtr(11), + }, + taskMigrate: &MigrateStrategy{ + MaxParallel: helper.IntToPtr(5), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(2), + HealthyDeadline: helper.TimeToPtr(2), + }, + expected: &MigrateStrategy{ + MaxParallel: helper.IntToPtr(5), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(2), + HealthyDeadline: helper.TimeToPtr(2), + }, + }, + { + desc: "Parallel from job, defaulting", + jobType: "service", + jobMigrate: &MigrateStrategy{ + MaxParallel: helper.IntToPtr(5), + }, + taskMigrate: nil, + expected: &MigrateStrategy{ + MaxParallel: helper.IntToPtr(5), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(10 * time.Second), + HealthyDeadline: helper.TimeToPtr(5 * time.Minute), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + job := &Job{ + ID: helper.StringToPtr("test"), + Migrate: tc.jobMigrate, + Type: helper.StringToPtr(tc.jobType), + } + job.Canonicalize() + tg := &TaskGroup{ + Name: helper.StringToPtr("foo"), + Migrate: tc.taskMigrate, + } + tg.Canonicalize(job) + assert.Equal(t, tc.expected, tg.Migrate) + }) + } +} + // TestService_CheckRestart asserts Service.CheckRestart settings are properly // inherited by Checks. func TestService_CheckRestart(t *testing.T) { diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 840fb1fee..ce1605728 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -649,6 +649,15 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) { } } + if taskGroup.Migrate != nil { + tg.Migrate = &structs.MigrateStrategy{ + MaxParallel: *taskGroup.Migrate.MaxParallel, + HealthCheck: *taskGroup.Migrate.HealthCheck, + MinHealthyTime: *taskGroup.Migrate.MinHealthyTime, + HealthyDeadline: *taskGroup.Migrate.HealthyDeadline, + } + } + tg.EphemeralDisk = &structs.EphemeralDisk{ Sticky: *taskGroup.EphemeralDisk.Sticky, SizeMB: *taskGroup.EphemeralDisk.SizeMB, diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index f59acaaf2..57b5d1869 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -1179,6 +1179,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Unlimited: helper.BoolToPtr(true), MaxDelay: helper.TimeToPtr(20 * time.Minute), }, + Migrate: &api.MigrateStrategy{ + MaxParallel: helper.IntToPtr(12), + HealthCheck: helper.StringToPtr("task_events"), + MinHealthyTime: helper.TimeToPtr(12 * time.Hour), + HealthyDeadline: helper.TimeToPtr(12 * time.Hour), + }, EphemeralDisk: &api.EphemeralDisk{ SizeMB: helper.IntToPtr(100), Sticky: helper.BoolToPtr(true), @@ -1395,6 +1401,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Unlimited: true, MaxDelay: 20 * time.Minute, }, + Migrate: &structs.MigrateStrategy{ + MaxParallel: 12, + HealthCheck: "task_events", + MinHealthyTime: 12 * time.Hour, + HealthyDeadline: 12 * time.Hour, + }, EphemeralDisk: &structs.EphemeralDisk{ SizeMB: 100, Sticky: true, diff --git a/jobspec/parse.go b/jobspec/parse.go index 4bfebc909..e56161cd4 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -104,11 +104,12 @@ func parseJob(result *api.Job, list *ast.ObjectList) error { } delete(m, "constraint") delete(m, "meta") - delete(m, "update") - delete(m, "periodic") - delete(m, "vault") + delete(m, "migrate") delete(m, "parameterized") + delete(m, "periodic") delete(m, "reschedule") + delete(m, "update") + delete(m, "vault") // Set the ID and name to the object key result.ID = helper.StringToPtr(obj.Keys[0].Token.Value().(string)) @@ -132,19 +133,20 @@ func parseJob(result *api.Job, list *ast.ObjectList) error { "all_at_once", "constraint", "datacenters", - "parameterized", "group", "id", "meta", + "migrate", "name", "namespace", + "parameterized", "periodic", "priority", "region", + "reschedule", "task", "type", "update", - "reschedule", "vault", "vault_token", } @@ -187,6 +189,13 @@ func parseJob(result *api.Job, list *ast.ObjectList) error { } } + // If we have a migration strategy, then parse that + if o := listVal.Filter("migrate"); len(o.Items) > 0 { + if err := parseMigrate(&result.Migrate, o); err != nil { + return multierror.Prefix(err, "migrate ->") + } + } + // Parse out meta fields. These are in HCL as a list so we need // to iterate over them and merge them. if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 { diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index c3989a68c..1275cd51c 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -741,6 +741,44 @@ func TestParse(t *testing.T) { }, false, }, + { + "migrate-job.hcl", + &api.Job{ + ID: helper.StringToPtr("foo"), + Name: helper.StringToPtr("foo"), + Type: helper.StringToPtr("batch"), + Datacenters: []string{"dc1"}, + Migrate: &api.MigrateStrategy{ + MaxParallel: helper.IntToPtr(2), + HealthCheck: helper.StringToPtr("task_states"), + MinHealthyTime: helper.TimeToPtr(11 * time.Second), + HealthyDeadline: helper.TimeToPtr(11 * time.Minute), + }, + TaskGroups: []*api.TaskGroup{ + { + Name: helper.StringToPtr("bar"), + Count: helper.IntToPtr(3), + Migrate: &api.MigrateStrategy{ + MaxParallel: helper.IntToPtr(3), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(1 * time.Second), + HealthyDeadline: helper.TimeToPtr(1 * time.Minute), + }, + Tasks: []*api.Task{ + { + Name: "bar", + Driver: "raw_exec", + Config: map[string]interface{}{ + "command": "bash", + "args": []interface{}{"-c", "echo hi"}, + }, + }, + }, + }, + }, + }, + false, + }, } for _, tc := range cases { diff --git a/jobspec/test-fixtures/migrate-job.hcl b/jobspec/test-fixtures/migrate-job.hcl new file mode 100644 index 000000000..5ec05e6b5 --- /dev/null +++ b/jobspec/test-fixtures/migrate-job.hcl @@ -0,0 +1,28 @@ +job "foo" { + datacenters = ["dc1"] + type = "batch" + migrate { + max_parallel = 2 + health_check = "task_states" + min_healthy_time = "11s" + healthy_deadline = "11m" + } + + group "bar" { + count = 3 + task "bar" { + driver = "raw_exec" + config { + command = "bash" + args = ["-c", "echo hi"] + } + } + + migrate { + max_parallel = 3 + health_check = "checks" + min_healthy_time = "1s" + healthy_deadline = "1m" + } + } +}