Plan allows updating the status of deployments

This commit is contained in:
Alex Dadgar
2017-05-11 12:49:04 -07:00
parent 710945bffa
commit 7c2e59f32a
4 changed files with 146 additions and 4 deletions

View File

@@ -136,6 +136,7 @@ func (s *Server) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
Alloc: make([]*structs.Allocation, 0, minUpdates),
},
CreatedDeployment: plan.CreatedDeployment,
DeploymentUpdates: plan.DeploymentUpdates,
}
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)

View File

@@ -97,6 +97,11 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
}
}
// Update the status of deployments effected by the plan.
if len(results.DeploymentUpdates) != 0 {
s.upsertDeploymentUpdates(index, results.DeploymentUpdates, txn)
}
// Attach the job to all the allocations. It is pulled out in the payload to
// avoid the redundancy of encoding, but should be denormalized prior to
// being inserted into MemDB.
@@ -128,6 +133,38 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
return nil
}
// upsertDeploymentUpdates updates the deployments given the passed status
// updates.
func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *memdb.Txn) error {
for _, d := range updates {
raw, err := txn.First("deployment", "id", d.DeploymentID)
if err != nil {
return err
}
if raw == nil {
return fmt.Errorf("Deployment ID %q couldn't be updated as it does not exist", d.DeploymentID)
}
copy := raw.(*structs.Deployment).Copy()
// Apply the new status
copy.Status = d.Status
copy.StatusDescription = d.StatusDescription
copy.ModifyIndex = index
// Insert the deployment
if err := txn.Insert("deployment", copy); err != nil {
return err
}
}
if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
return nil
}
// UpsertJobSummary upserts a job summary into the state store.
func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error {
txn := s.db.Txn(true)
@@ -2199,7 +2236,7 @@ func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *st
// If there was no existing allocation, this is a placement and we increment
// the placement
existingHealthSet := existing.DeploymentStatus != nil && existing.DeploymentStatus.Healthy != nil
existingHealthSet := existing != nil && existing.DeploymentStatus != nil && existing.DeploymentStatus.Healthy != nil
allocHealthSet := alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Healthy != nil
if existing == nil {
placed++

View File

@@ -184,7 +184,7 @@ func TestStateStore_UpsertPlanResults_Deployment_CancelOld(t *testing.T) {
CreatedDeployment: dnew,
}
err := state.UpsertPlanResults(1000, &res)
err := state.UpsertPlanResults(1001, &res)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -221,7 +221,87 @@ func TestStateStore_UpsertPlanResults_Deployment_CancelOld(t *testing.T) {
if err != nil || doutstandingout == nil {
t.Fatalf("bad: %v %v", err, doutstandingout)
}
if doutstandingout.Status != structs.DeploymentStatusCancelled || doutstandingout.ModifyIndex != 1000 {
if doutstandingout.Status != structs.DeploymentStatusCancelled || doutstandingout.ModifyIndex != 1001 {
t.Fatalf("bad: %v", doutstandingout)
}
if watchFired(ws) {
t.Fatalf("bad")
}
}
// This test checks that deployment updates are applied correctly
func TestStateStore_UpsertPlanResults_DeploymentUpdates(t *testing.T) {
state := testStateStore(t)
// Create a job that applies to all
job := mock.Job()
if err := state.UpsertJob(998, job); err != nil {
t.Fatalf("err: %v", err)
}
// Create a deployment that we will update its status
doutstanding := mock.Deployment()
doutstanding.JobID = job.ID
if err := state.UpsertDeployment(1000, doutstanding, false); err != nil {
t.Fatalf("err: %v", err)
}
alloc := mock.Alloc()
alloc.Job = nil
dnew := mock.Deployment()
dnew.JobID = job.ID
alloc.DeploymentID = dnew.ID
// Update the old deployment
update := &structs.DeploymentStatusUpdate{
DeploymentID: doutstanding.ID,
Status: "foo",
StatusDescription: "bar",
}
// Create a plan result
res := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
Job: job,
},
CreatedDeployment: dnew,
DeploymentUpdates: []*structs.DeploymentStatusUpdate{update},
}
err := state.UpsertPlanResults(1000, &res)
if err != nil {
t.Fatalf("err: %v", err)
}
ws := memdb.NewWatchSet()
// Check the deployments are correctly updated.
dout, err := state.DeploymentByID(ws, dnew.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if dout == nil {
t.Fatalf("bad: nil deployment")
}
tg, ok := dout.TaskGroups[alloc.TaskGroup]
if !ok {
t.Fatalf("bad: nil deployment state")
}
if tg == nil || tg.PlacedAllocs != 1 {
t.Fatalf("bad: %v", dout)
}
doutstandingout, err := state.DeploymentByID(ws, doutstanding.ID)
if err != nil || doutstandingout == nil {
t.Fatalf("bad: %v %v", err, doutstandingout)
}
if doutstandingout.Status != update.Status || doutstandingout.StatusDescription != update.StatusDescription || doutstandingout.ModifyIndex != 1000 {
t.Fatalf("bad: %v", doutstandingout)
}

View File

@@ -372,6 +372,11 @@ type ApplyPlanResultsRequest struct {
// event. Any existing deployment should be cancelled when the new
// deployment is created.
CreatedDeployment *Deployment
// DeploymentUpdates is a set of status updates to apply to the given
// deployments. This allows the scheduler to cancel any unneeded deployment
// because the job is stopped or the update block is removed.
DeploymentUpdates []*DeploymentStatusUpdate
}
// AllocUpdateRequest is used to submit changes to allocations, either
@@ -3568,6 +3573,18 @@ func (d *DeploymentState) Copy() *DeploymentState {
return c
}
// DeploymentStatusUpdate is used to update the status of a given deployment
type DeploymentStatusUpdate struct {
// DeploymentID is the ID of the deployment to update
DeploymentID string
// Status is the new status of the deployment.
Status string
// StatusDescription is the new status description of the deployment.
StatusDescription string
}
const (
AllocDesiredStatusRun = "run" // Allocation should run
AllocDesiredStatusStop = "stop" // Allocation should stop
@@ -4266,8 +4283,15 @@ type Plan struct {
Annotations *PlanAnnotations
// CreatedDeployment is the deployment created by the scheduler that should
// be applied by the planner.
// be applied by the planner. A created deployment will cancel all other
// deployments for a given job as there can only be a single running
// deployment.
CreatedDeployment *Deployment
// DeploymentUpdates is a set of status updates to apply to the given
// deployments. This allows the scheduler to cancel any unneeded deployment
// because the job is stopped or the update block is removed.
DeploymentUpdates []*DeploymentStatusUpdate
}
// AppendUpdate marks the allocation for eviction. The clientStatus of the