From 7c2e59f32aa8b61b6cf47fe2ef2ec1f09f50d0bc Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 11 May 2017 12:49:04 -0700 Subject: [PATCH] Plan allows updating the status of deployments --- nomad/plan_apply.go | 1 + nomad/state/state_store.go | 39 ++++++++++++++- nomad/state/state_store_test.go | 84 ++++++++++++++++++++++++++++++++- nomad/structs/structs.go | 26 +++++++++- 4 files changed, 146 insertions(+), 4 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f18842fb4..a3f7e7421 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -136,6 +136,7 @@ func (s *Server) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap Alloc: make([]*structs.Allocation, 0, minUpdates), }, CreatedDeployment: plan.CreatedDeployment, + DeploymentUpdates: plan.DeploymentUpdates, } for _, updateList := range result.NodeUpdate { req.Alloc = append(req.Alloc, updateList...) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index e9cf216bf..91785decf 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -97,6 +97,11 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR } } + // Update the status of deployments effected by the plan. + if len(results.DeploymentUpdates) != 0 { + s.upsertDeploymentUpdates(index, results.DeploymentUpdates, txn) + } + // Attach the job to all the allocations. It is pulled out in the payload to // avoid the redundancy of encoding, but should be denormalized prior to // being inserted into MemDB. @@ -128,6 +133,38 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR return nil } +// upsertDeploymentUpdates updates the deployments given the passed status +// updates. +func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *memdb.Txn) error { + for _, d := range updates { + raw, err := txn.First("deployment", "id", d.DeploymentID) + if err != nil { + return err + } + if raw == nil { + return fmt.Errorf("Deployment ID %q couldn't be updated as it does not exist", d.DeploymentID) + } + + copy := raw.(*structs.Deployment).Copy() + + // Apply the new status + copy.Status = d.Status + copy.StatusDescription = d.StatusDescription + copy.ModifyIndex = index + + // Insert the deployment + if err := txn.Insert("deployment", copy); err != nil { + return err + } + } + + if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + return nil +} + // UpsertJobSummary upserts a job summary into the state store. func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error { txn := s.db.Txn(true) @@ -2199,7 +2236,7 @@ func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *st // If there was no existing allocation, this is a placement and we increment // the placement - existingHealthSet := existing.DeploymentStatus != nil && existing.DeploymentStatus.Healthy != nil + existingHealthSet := existing != nil && existing.DeploymentStatus != nil && existing.DeploymentStatus.Healthy != nil allocHealthSet := alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Healthy != nil if existing == nil { placed++ diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 86319ec28..2151a4a42 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -184,7 +184,7 @@ func TestStateStore_UpsertPlanResults_Deployment_CancelOld(t *testing.T) { CreatedDeployment: dnew, } - err := state.UpsertPlanResults(1000, &res) + err := state.UpsertPlanResults(1001, &res) if err != nil { t.Fatalf("err: %v", err) } @@ -221,7 +221,87 @@ func TestStateStore_UpsertPlanResults_Deployment_CancelOld(t *testing.T) { if err != nil || doutstandingout == nil { t.Fatalf("bad: %v %v", err, doutstandingout) } - if doutstandingout.Status != structs.DeploymentStatusCancelled || doutstandingout.ModifyIndex != 1000 { + if doutstandingout.Status != structs.DeploymentStatusCancelled || doutstandingout.ModifyIndex != 1001 { + t.Fatalf("bad: %v", doutstandingout) + } + + if watchFired(ws) { + t.Fatalf("bad") + } +} + +// This test checks that deployment updates are applied correctly +func TestStateStore_UpsertPlanResults_DeploymentUpdates(t *testing.T) { + state := testStateStore(t) + + // Create a job that applies to all + job := mock.Job() + if err := state.UpsertJob(998, job); err != nil { + t.Fatalf("err: %v", err) + } + + // Create a deployment that we will update its status + doutstanding := mock.Deployment() + doutstanding.JobID = job.ID + + if err := state.UpsertDeployment(1000, doutstanding, false); err != nil { + t.Fatalf("err: %v", err) + } + + alloc := mock.Alloc() + alloc.Job = nil + + dnew := mock.Deployment() + dnew.JobID = job.ID + alloc.DeploymentID = dnew.ID + + // Update the old deployment + update := &structs.DeploymentStatusUpdate{ + DeploymentID: doutstanding.ID, + Status: "foo", + StatusDescription: "bar", + } + + // Create a plan result + res := structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc}, + Job: job, + }, + CreatedDeployment: dnew, + DeploymentUpdates: []*structs.DeploymentStatusUpdate{update}, + } + + err := state.UpsertPlanResults(1000, &res) + if err != nil { + t.Fatalf("err: %v", err) + } + + ws := memdb.NewWatchSet() + + // Check the deployments are correctly updated. + dout, err := state.DeploymentByID(ws, dnew.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if dout == nil { + t.Fatalf("bad: nil deployment") + } + + tg, ok := dout.TaskGroups[alloc.TaskGroup] + if !ok { + t.Fatalf("bad: nil deployment state") + } + if tg == nil || tg.PlacedAllocs != 1 { + t.Fatalf("bad: %v", dout) + } + + doutstandingout, err := state.DeploymentByID(ws, doutstanding.ID) + if err != nil || doutstandingout == nil { + t.Fatalf("bad: %v %v", err, doutstandingout) + } + if doutstandingout.Status != update.Status || doutstandingout.StatusDescription != update.StatusDescription || doutstandingout.ModifyIndex != 1000 { t.Fatalf("bad: %v", doutstandingout) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 228853e82..2ece3ad71 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -372,6 +372,11 @@ type ApplyPlanResultsRequest struct { // event. Any existing deployment should be cancelled when the new // deployment is created. CreatedDeployment *Deployment + + // DeploymentUpdates is a set of status updates to apply to the given + // deployments. This allows the scheduler to cancel any unneeded deployment + // because the job is stopped or the update block is removed. + DeploymentUpdates []*DeploymentStatusUpdate } // AllocUpdateRequest is used to submit changes to allocations, either @@ -3568,6 +3573,18 @@ func (d *DeploymentState) Copy() *DeploymentState { return c } +// DeploymentStatusUpdate is used to update the status of a given deployment +type DeploymentStatusUpdate struct { + // DeploymentID is the ID of the deployment to update + DeploymentID string + + // Status is the new status of the deployment. + Status string + + // StatusDescription is the new status description of the deployment. + StatusDescription string +} + const ( AllocDesiredStatusRun = "run" // Allocation should run AllocDesiredStatusStop = "stop" // Allocation should stop @@ -4266,8 +4283,15 @@ type Plan struct { Annotations *PlanAnnotations // CreatedDeployment is the deployment created by the scheduler that should - // be applied by the planner. + // be applied by the planner. A created deployment will cancel all other + // deployments for a given job as there can only be a single running + // deployment. CreatedDeployment *Deployment + + // DeploymentUpdates is a set of status updates to apply to the given + // deployments. This allows the scheduler to cancel any unneeded deployment + // because the job is stopped or the update block is removed. + DeploymentUpdates []*DeploymentStatusUpdate } // AppendUpdate marks the allocation for eviction. The clientStatus of the