Only upsert a job if the spec changes and push deployment creation into reconciler

This commit is contained in:
Alex Dadgar
2017-05-22 17:02:20 -07:00
parent df40bd8a5f
commit c16195a2ea
6 changed files with 192 additions and 36 deletions

View File

@@ -142,15 +142,20 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
// Clear the Vault token
args.Job.VaultToken = ""
// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Register failed: %v", err)
return err
}
// Check if the job has changed at all
if currentJob == nil || currentJob.SpecChanged(args.Job) {
// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Register failed: %v", err)
return err
}
// Populate the reply with job information
reply.JobModifyIndex = index
// Populate the reply with job information
reply.JobModifyIndex = index
} else {
reply.JobModifyIndex = currentJob.JobModifyIndex
}
// If the job is periodic or parameterized, we don't create an eval.
if args.Job.IsPeriodic() || args.Job.IsParameterized() {
@@ -164,7 +169,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
JobModifyIndex: index,
JobModifyIndex: reply.JobModifyIndex,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{

View File

@@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
)
func TestJobEndpoint_Register(t *testing.T) {
@@ -232,6 +233,9 @@ func TestJobEndpoint_Register_Existing(t *testing.T) {
if out.Priority != 100 {
t.Fatalf("expected update")
}
if out.Version != 1 {
t.Fatalf("expected update")
}
// Lookup the evaluation
eval, err := state.EvalByID(ws, resp.EvalID)
@@ -263,6 +267,28 @@ func TestJobEndpoint_Register_Existing(t *testing.T) {
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Check to ensure the job version didn't get bumped becasue we submitted
// the same job
state = s1.fsm.State()
ws = memdb.NewWatchSet()
out, err = state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if out.Version != 1 {
t.Fatalf("expected no update; got %v; diff %v", out.Version, pretty.Diff(job2, out))
}
}
func TestJobEndpoint_Register_Periodic(t *testing.T) {

View File

@@ -1627,6 +1627,47 @@ func (j *Job) RequiredSignals() map[string]map[string][]string {
return signals
}
// SpecChanged determines if the functional specification has changed between
// two job versions. The new job will be mutated but returned to its original
// value before returning, thus concurrent access to the job should be blocked.
func (j *Job) SpecChanged(new *Job) bool {
if j == nil {
return new != nil
}
// Capture the original mutable values so they can be restored.
oStatus := new.Status
oStatusDescription := new.StatusDescription
oStable := new.Stable
oVersion := new.Version
oCreateIndex := new.CreateIndex
oModifyIndex := new.ModifyIndex
oJobModifyIndex := new.JobModifyIndex
// Update the new job so we can do a reflect
new.Status = j.Status
new.StatusDescription = j.StatusDescription
new.Stable = j.Stable
new.Version = j.Version
new.CreateIndex = j.CreateIndex
new.ModifyIndex = j.ModifyIndex
new.JobModifyIndex = j.JobModifyIndex
// Deep equals the jobs
equal := reflect.DeepEqual(j, new)
// Restore the new jobs values
new.Status = oStatus
new.StatusDescription = oStatusDescription
new.Stable = oStable
new.Version = oVersion
new.CreateIndex = oCreateIndex
new.ModifyIndex = oModifyIndex
new.JobModifyIndex = oJobModifyIndex
return !equal
}
// JobListStub is used to return a subset of job information
// for the job list
type JobListStub struct {

View File

@@ -295,6 +295,48 @@ func TestJob_Canonicalize_Update(t *testing.T) {
}
}
func TestJob_SpecChanged(t *testing.T) {
// Get a base test job
base := testJob()
// Only modify the indexes/mutable state of the job
mutatedBase := base.Copy()
mutatedBase.Status = "foo"
mutatedBase.ModifyIndex = base.ModifyIndex + 100
// changed contains a spec change that should be detected
change := base.Copy()
change.Priority = 99
cases := []struct {
Name string
Original *Job
New *Job
Changed bool
}{
{
Name: "Same job except mutable indexes",
Changed: false,
Original: base,
New: mutatedBase,
},
{
Name: "Different",
Changed: true,
Original: base,
New: change,
},
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
if actual := c.Original.SpecChanged(c.New); actual != c.Changed {
t.Fatalf("SpecChanged() returned %v; want %v", actual, c.Changed)
}
})
}
}
func testJob() *Job {
return &Job{
Region: "global",

View File

@@ -390,6 +390,11 @@ func (s *GenericScheduler) computeJobAllocs2() error {
s.plan.CreatedDeployment = results.createDeployment
s.plan.DeploymentUpdates = results.deploymentUpdates
// Update the stored deployment
if results.createDeployment != nil {
s.deployment = results.createDeployment
}
// Handle the stop
for _, stop := range results.stop {
s.plan.AppendUpdate(stop.alloc, structs.AllocDesiredStatusStop, stop.statusDescription, stop.clientStatus)

View File

@@ -106,7 +106,8 @@ func NewAllocReconciler(ctx Context, stack Stack, batch bool,
func (a *allocReconciler) Compute() *reconcileResults {
// If we are just stopping a job we do not need to do anything more than
// stopping all running allocs
if a.job == nil || a.job.Stop {
stopped := a.job == nil || a.job.Stop
if stopped {
a.handleStop()
// Cancel the deployment since it is not needed
@@ -121,6 +122,29 @@ func (a *allocReconciler) Compute() *reconcileResults {
return a.result
}
// Check if the deployment is referencing an older job
if d := a.deployment; d != nil {
if d.JobCreateIndex != a.job.CreateIndex || d.JobModifyIndex != a.job.JobModifyIndex {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusCancelled,
StatusDescription: structs.DeploymentStatusDescriptionNewerJob,
})
a.deployment = nil
}
}
// Create a new deployment
if a.deployment == nil && !stopped && a.job.HasUpdateStrategy() {
a.deployment = structs.NewDeployment(a.job)
a.result.createDeployment = a.deployment
a.ctx.Logger().Printf("ALEX: MADE DEPLOYMENT %q", a.deployment.ID)
}
if a.deployment != nil {
a.ctx.Logger().Printf("ALEX: CURRENT DEPLOYMENT %q", a.deployment.ID)
}
m := newAllocMatrix(a.job, a.existingAllocs)
for group, as := range m {
a.computeGroup(group, as)
@@ -225,13 +249,6 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
_, inplace, destructive := a.computeUpdates(tg, untainted)
a.ctx.Logger().Printf("RECONCILER -- Inplace (%d); Destructive (%d)", len(inplace), len(destructive))
// XXX Not clear if this is needed
// Update untainted so that it contains all existing allocations that have
// been inplace updated or do not have to be updated and does not include
// any canaries.
//untainted = untainted.difference(destructive)
//a.ctx.Logger().Printf("RECONCILER -- untainted %#v", untainted)
// Get the update strategy of the group
strategy, update := a.groupUpdateStrategy[group]
@@ -239,8 +256,9 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
// The fact that we have destructive updates and have less canaries than is
// desired means we need to create canaries
requireCanary := len(destructive) != 0 && update && strategy.Canary != 0 && len(canaries) < strategy.Canary
if requireCanary && !a.deploymentPaused {
requireCanary := len(destructive) != 0 && update && len(canaries) < strategy.Canary
placeCanaries := requireCanary && !a.deploymentPaused
if placeCanaries {
a.ctx.Logger().Printf("RECONCILER -- Canary (%d)", strategy.Canary-len(canaries))
for i := len(canaries); i < strategy.Canary; i++ {
a.result.place = append(a.result.place, allocPlaceResult{
@@ -253,12 +271,8 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
}
// Determine how many we can place
limit := tg.Count
if update {
// XXX This is wrong. Need to detect health first. Probably only within
// the truly untainted set
limit = strategy.MaxParallel
}
haveCanaries := len(canaries) != 0 || placeCanaries
limit := a.computeLimit(tg, strategy, untainted, haveCanaries)
a.ctx.Logger().Printf("RECONCILER -- LIMIT %v", limit)
// Place if:
@@ -299,18 +313,9 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
limit -= min
}
// Migrations should be done under the rolling update strategy, however we
// do not abide by the paused state of the deployment since this could block
// node draining.
min := helper.IntMin(len(migrate), limit)
i := 0
a.ctx.Logger().Printf("RECONCILER -- Migrating (%d)", min)
// TODO Migrations should be done using a stagger and max_parallel.
a.ctx.Logger().Printf("RECONCILER -- Migrating (%d)", len(migrate))
for _, alloc := range migrate {
if i == min {
break
}
i++
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocMigrating,
@@ -324,6 +329,38 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
}
}
func (a *allocReconciler) computeLimit(group *structs.TaskGroup, strategy *structs.UpdateStrategy, untainted allocSet, canaries bool) int {
// If there is no update stategy or deployment for the group we can deploy
// as many as the group has
if strategy == nil || a.deployment == nil {
return group.Count
} else if a.deploymentPaused {
// If the deployment is paused, do not create anything else
return 0
}
// Get the state of the deployment for the group
deploymentState := a.deployment.TaskGroups[group.Name]
// If we have canaries and they have not been promoted the limit is 0
if canaries && (deploymentState == nil || !deploymentState.Promoted) {
return 0
}
// If we have been promoted or there are no canaries, the limit is the
// configured MaxParallel - any outstanding non-healthy alloc for the
// deployment
limit := strategy.MaxParallel
partOf, _ := untainted.filterByDeployment(a.deployment.ID)
for _, alloc := range partOf {
if alloc.DeploymentStatus == nil || alloc.DeploymentStatus.Healthy == nil {
limit--
}
}
return limit
}
func (a *allocReconciler) computePlacements(group *structs.TaskGroup, untainted, destructiveUpdates allocSet) []allocPlaceResult {
// Hot path the nothing to do case
existing := len(untainted) + len(destructiveUpdates)