diff --git a/nomad/fsm.go b/nomad/fsm.go index 42b74afc2..6f76f7e5e 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -155,6 +155,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyUpsertVaultAccessor(buf[1:], log.Index) case structs.VaultAccessorDegisterRequestType: return n.applyDeregisterVaultAccessor(buf[1:], log.Index) + case structs.ApplyPlanResultsRequestType: + return n.applyPlanResults(buf[1:], log.Index) default: if ignoreUnknown { n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) @@ -545,6 +547,22 @@ func (n *nomadFSM) applyDeregisterVaultAccessor(buf []byte, index uint64) interf return nil } +// applyPlanApply applies the results of a plan application +func (n *nomadFSM) applyPlanResults(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_plan_results"}, time.Now()) + var req structs.ApplyPlanResultsRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.UpsertPlanResults(index, &req); err != nil { + n.logger.Printf("[ERR] nomad.fsm: ApplyPlan failed: %v", err) + return err + } + + return nil +} + func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { // Create a new snapshot snap, err := n.state.Snapshot() diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index b44efc12a..2eaf4d079 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -1484,3 +1484,96 @@ func TestFSM_ReconcileSummaries(t *testing.T) { t.Fatalf("expected: %#v, actual: %#v", &expected, out2) } } + +func TestFSM_ApplyPlanResults(t *testing.T) { + fsm := testFSM(t) + + // Create the request and create a deployment + alloc := mock.Alloc() + job := alloc.Job + alloc.Job = nil + + d := mock.Deployment() + d.JobID = job.ID + d.JobModifyIndex = job.ModifyIndex + d.JobVersion = job.Version + + alloc.DeploymentID = d.ID + + fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID)) + req := structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + Job: job, + Alloc: []*structs.Allocation{alloc}, + }, + CreatedDeployment: d, + } + buf, err := structs.Encode(structs.ApplyPlanResultsRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify the allocation is registered + ws := memdb.NewWatchSet() + out, err := fsm.State().AllocByID(ws, alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + alloc.CreateIndex = out.CreateIndex + alloc.ModifyIndex = out.ModifyIndex + alloc.AllocModifyIndex = out.AllocModifyIndex + + // Job should be re-attached + alloc.Job = job + if !reflect.DeepEqual(alloc, out) { + t.Fatalf("bad: %#v %#v", alloc, out) + } + + dout, err := fsm.State().DeploymentByID(ws, d.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if tg, ok := dout.TaskGroups[alloc.TaskGroup]; !ok || tg.PlacedAllocs != 1 { + t.Fatalf("err: %v %v", tg, err) + } + + // Ensure that the original job is used + evictAlloc := alloc.Copy() + job = mock.Job() + job.Priority = 123 + + evictAlloc.Job = nil + evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict + req2 := structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + Job: job, + Alloc: []*structs.Allocation{evictAlloc}, + }, + } + buf, err = structs.Encode(structs.ApplyPlanResultsRequestType, req2) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are evicted + out, err = fsm.State().AllocByID(ws, alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out.DesiredStatus != structs.AllocDesiredStatusEvict { + t.Fatalf("alloc found!") + } + if out.Job == nil || out.Job.Priority == 123 { + t.Fatalf("bad job") + } +} diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 985cdc02a..95b892328 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -559,7 +559,7 @@ func (j *Job) GetJobVersions(args *structs.JobSpecificRequest, reply.Index = out[0].ModifyIndex } else { // Use the last index that affected the nodes table - index, err := state.Index("job_versions") + index, err := state.Index("job_version") if err != nil { return err } diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index d2f2ca700..8509b83ad 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -310,11 +310,16 @@ func VaultAccessor() *structs.VaultAccessor { func Deployment() *structs.Deployment { return &structs.Deployment{ - ID: structs.GenerateUUID(), - JobID: structs.GenerateUUID(), - JobVersion: 2, - JobModifyIndex: 20, - JobCreateIndex: 18, + ID: structs.GenerateUUID(), + JobID: structs.GenerateUUID(), + JobVersion: 2, + JobModifyIndex: 20, + JobCreateIndex: 18, + TaskGroups: map[string]*structs.DeploymentState{ + "web": &structs.DeploymentState{ + DesiredTotal: 10, + }, + }, Status: structs.DeploymentStatusRunning, StatusDescription: structs.DeploymentStatusRunning, ModifyIndex: 23, diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 5512dff63..f18842fb4 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -106,7 +106,7 @@ func (s *Server) planApply() { } // Dispatch the Raft transaction for the plan - future, err := s.applyPlan(pending.plan.Job, result, snap) + future, err := s.applyPlan(pending.plan, result, snap) if err != nil { s.logger.Printf("[ERR] nomad: failed to submit plan: %v", err) pending.respond(nil, err) @@ -120,16 +120,22 @@ func (s *Server) planApply() { } // applyPlan is used to apply the plan result and to return the alloc index -func (s *Server) applyPlan(job *structs.Job, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) { +func (s *Server) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) { // Determine the miniumum number of updates, could be more if there // are multiple updates per node minUpdates := len(result.NodeUpdate) minUpdates += len(result.NodeAllocation) + // Grab the job + job := plan.Job + // Setup the update request - req := structs.AllocUpdateRequest{ - Job: job, - Alloc: make([]*structs.Allocation, 0, minUpdates), + req := structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + Job: job, + Alloc: make([]*structs.Allocation, 0, minUpdates), + }, + CreatedDeployment: plan.CreatedDeployment, } for _, updateList := range result.NodeUpdate { req.Alloc = append(req.Alloc, updateList...) @@ -148,20 +154,15 @@ func (s *Server) applyPlan(job *structs.Job, result *structs.PlanResult, snap *s } // Dispatch the Raft transaction - future, err := s.raftApplyFuture(structs.AllocUpdateRequestType, &req) + future, err := s.raftApplyFuture(structs.ApplyPlanResultsRequestType, &req) if err != nil { return nil, err } // Optimistically apply to our state view if snap != nil { - // Attach the job to all the allocations. It is pulled out in the - // payload to avoid the redundancy of encoding, but should be denormalized - // prior to being inserted into MemDB. - structs.DenormalizeAllocationJobs(req.Job, req.Alloc) - nextIdx := s.raft.AppliedIndex() + 1 - if err := snap.UpsertAllocs(nextIdx, req.Alloc); err != nil { + if err := snap.UpsertPlanResults(nextIdx, &req); err != nil { return future, err } } diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 17fc0c6db..d6e1cd193 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -70,7 +70,7 @@ func TestPlanApply_applyPlan(t *testing.T) { // Register alloc alloc := mock.Alloc() s1.State().UpsertJobSummary(1000, mock.JobSummary(alloc.JobID)) - plan := &structs.PlanResult{ + planRes := &structs.PlanResult{ NodeAllocation: map[string][]*structs.Allocation{ node.ID: []*structs.Allocation{alloc}, }, @@ -82,8 +82,14 @@ func TestPlanApply_applyPlan(t *testing.T) { t.Fatalf("err: %v", err) } + // Create the plan with a deployment + plan := &structs.Plan{ + Job: alloc.Job, + CreatedDeployment: mock.Deployment(), + } + // Apply the plan - future, err := s1.applyPlan(alloc.Job, plan, snap) + future, err := s1.applyPlan(plan, planRes, snap) if err != nil { t.Fatalf("err: %v", err) } @@ -94,6 +100,10 @@ func TestPlanApply_applyPlan(t *testing.T) { t.Fatalf("bad: %v %v", out, err) } + if out, err := snap.DeploymentByID(ws, plan.CreatedDeployment.ID); err != nil || out == nil { + t.Fatalf("bad: %v %v", out, err) + } + // Check plan does apply cleanly index, err := planWaitFuture(future) if err != nil { @@ -104,7 +114,8 @@ func TestPlanApply_applyPlan(t *testing.T) { } // Lookup the allocation - out, err := s1.fsm.State().AllocByID(ws, alloc.ID) + fsmState := s1.fsm.State() + out, err := fsmState.AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -112,6 +123,15 @@ func TestPlanApply_applyPlan(t *testing.T) { t.Fatalf("missing alloc") } + // Lookup the deployment + dout, err := fsmState.DeploymentByID(ws, plan.CreatedDeployment.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if dout == nil { + t.Fatalf("missing deployment") + } + // Evict alloc, Register alloc2 allocEvict := new(structs.Allocation) *allocEvict = *alloc @@ -120,7 +140,7 @@ func TestPlanApply_applyPlan(t *testing.T) { allocEvict.Job = nil alloc2 := mock.Alloc() s1.State().UpsertJobSummary(1500, mock.JobSummary(alloc2.JobID)) - plan = &structs.PlanResult{ + planRes = &structs.PlanResult{ NodeUpdate: map[string][]*structs.Allocation{ node.ID: []*structs.Allocation{allocEvict}, }, @@ -136,7 +156,10 @@ func TestPlanApply_applyPlan(t *testing.T) { } // Apply the plan - future, err = s1.applyPlan(job, plan, snap) + plan = &structs.Plan{ + Job: job, + } + future, err = s1.applyPlan(plan, planRes, snap) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 7124f16af..510ef1299 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -85,11 +85,69 @@ func (s *StateStore) Abandon() { close(s.abandonCh) } +// UpsertPlanResults is used to upsert the results of a plan. +func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanResultsRequest) error { + txn := s.db.Txn(true) + defer txn.Abort() + + // Upsert the newly created deployment + if results.CreatedDeployment != nil { + if err := s.upsertDeploymentImpl(index, results.CreatedDeployment, true, txn); err != nil { + return err + } + } + + // Attach the job to all the allocations. It is pulled out in the payload to + // avoid the redundancy of encoding, but should be denormalized prior to + // being inserted into MemDB. + structs.DenormalizeAllocationJobs(results.Job, results.Alloc) + + // Calculate the total resources of allocations. It is pulled out in the + // payload to avoid encoding something that can be computed, but should be + // denormalized prior to being inserted into MemDB. + for _, alloc := range results.Alloc { + if alloc.Resources != nil { + continue + } + + alloc.Resources = new(structs.Resources) + for _, task := range alloc.TaskResources { + alloc.Resources.Add(task) + } + + // Add the shared resources + alloc.Resources.Add(alloc.SharedResources) + } + + // Upsert the allocations + if err := s.upsertAllocsImpl(index, results.Alloc, txn); err != nil { + return err + } + + txn.Commit() + return nil +} + // UpsertJobSummary upserts a job summary into the state store. func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error { txn := s.db.Txn(true) defer txn.Abort() + // Check if the job summary already exists + existing, err := txn.First("jobSummary", "id", jobSummary.JobID) + if err != nil { + return fmt.Errorf("periodic launch lookup failed: %v", err) + } + + // Setup the indexes correctly + if existing != nil { + jobSummary.CreateIndex = existing.(*structs.JobSummary).CreateIndex + jobSummary.ModifyIndex = index + } else { + jobSummary.CreateIndex = index + jobSummary.ModifyIndex = index + } + // Update the index if err := txn.Insert("job_summary", jobSummary); err != nil { return err @@ -124,7 +182,14 @@ func (s *StateStore) DeleteJobSummary(index uint64, id string) error { func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment, cancelPrior bool) error { txn := s.db.Txn(true) defer txn.Abort() + if err := s.upsertDeploymentImpl(index, deployment, cancelPrior, txn); err != nil { + return err + } + txn.Commit() + return nil +} +func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Deployment, cancelPrior bool, txn *memdb.Txn) error { // Go through and cancel any active deployment for the job. if cancelPrior { iter, err := txn.Get("deployment", "job_prefix", deployment.JobID) @@ -157,6 +222,21 @@ func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployme } } + // Check if the deployment already exists + existing, err := txn.First("deployment", "id", deployment.ID) + if err != nil { + return fmt.Errorf("periodic launch lookup failed: %v", err) + } + + // Setup the indexes correctly + if existing != nil { + deployment.CreateIndex = existing.(*structs.Deployment).CreateIndex + deployment.ModifyIndex = index + } else { + deployment.CreateIndex = index + deployment.ModifyIndex = index + } + // Insert the deployment if err := txn.Insert("deployment", deployment); err != nil { return err @@ -167,7 +247,6 @@ func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployme return fmt.Errorf("index update failed: %v", err) } - txn.Commit() return nil } @@ -186,7 +265,10 @@ func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error func (s *StateStore) DeploymentByID(ws memdb.WatchSet, deploymentID string) (*structs.Deployment, error) { txn := s.db.Txn(false) + return s.deploymentByIDImpl(ws, deploymentID, txn) +} +func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string, txn *memdb.Txn) (*structs.Deployment, error) { watchCh, existing, err := txn.FirstWatch("deployment", "id", deploymentID) if err != nil { return nil, fmt.Errorf("node lookup failed: %v", err) @@ -200,7 +282,7 @@ func (s *StateStore) DeploymentByID(ws memdb.WatchSet, deploymentID string) (*st return nil, nil } -func (s *StateStore) DeploymentByJobID(ws memdb.WatchSet, jobID string) ([]*structs.Deployment, error) { +func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, jobID string) ([]*structs.Deployment, error) { txn := s.db.Txn(false) // Get an iterator over the deployments @@ -231,6 +313,39 @@ func (s *StateStore) DeploymentByJobID(ws memdb.WatchSet, jobID string) ([]*stru return out, nil } +func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, jobID string) (*structs.Deployment, error) { + txn := s.db.Txn(false) + + // Get an iterator over the deployments + iter, err := txn.Get("deployment", "job_prefix", jobID) + if err != nil { + return nil, err + } + + ws.Add(iter.WatchCh()) + + var out *structs.Deployment + for { + raw := iter.Next() + if raw == nil { + break + } + + d := raw.(*structs.Deployment) + + // Filter non-exact matches + if d.JobID != jobID { + continue + } + + if out == nil || out.CreateIndex < d.CreateIndex { + out = d + } + } + + return out, nil +} + // DeleteDeployment is used to delete a deployment by ID func (s *StateStore) DeleteDeployment(index uint64, deploymentID string) error { txn := s.db.Txn(true) @@ -1280,12 +1395,21 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a return nil } -// UpsertAllocs is used to evict a set of allocations -// and allocate new ones at the same time. +// UpsertAllocs is used to evict a set of allocations and allocate new ones at +// the same time. func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) error { txn := s.db.Txn(true) defer txn.Abort() + if err := s.upsertAllocsImpl(index, allocs, txn); err != nil { + return err + } + txn.Commit() + return nil +} +// upsertAllocs is the actual implementation of UpsertAllocs so that it may be +// used with an existing transaction. +func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation, txn *memdb.Txn) error { // Handle the allocations jobs := make(map[string]string, 1) for _, alloc := range allocs { @@ -1319,6 +1443,9 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er alloc.ModifyIndex = index alloc.AllocModifyIndex = index + // Keep the clients task states + alloc.TaskStates = exist.TaskStates + // If the scheduler is marking this allocation as lost we do not // want to reuse the status of the existing allocation. if alloc.ClientStatus != structs.AllocClientStatusLost { @@ -1332,6 +1459,10 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er } } + if err := s.updateDeploymentWithAlloc(index, alloc, exist, txn); err != nil { + return fmt.Errorf("error updating deployment: %v", err) + } + if err := s.updateSummaryWithAlloc(index, alloc, exist, txn); err != nil { return fmt.Errorf("error updating job summary: %v", err) } @@ -1364,7 +1495,6 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er return fmt.Errorf("setting job status failed: %v", err) } - txn.Commit() return nil } @@ -2034,6 +2164,86 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, return nil } +// updateDeploymentWithAlloc is used to update the deployment state associated +// with the given allocation +func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *structs.Allocation, txn *memdb.Txn) error { + // Nothing to do if the allocation is not associated with a deployment + if alloc.DeploymentID == "" { + return nil + } + + // Get the deployment + ws := memdb.NewWatchSet() + deployment, err := s.deploymentByIDImpl(ws, alloc.DeploymentID, txn) + if err != nil { + return err + } + if deployment == nil { + return fmt.Errorf("allocation %q references unknown deployment %q", alloc.ID, alloc.DeploymentID) + } + + // Retrieve the deployment state object + _, ok := deployment.TaskGroups[alloc.TaskGroup] + if !ok { + // If the task group isn't part of the deployment, the task group wasn't + // part of a rolling update so nothing to do + return nil + } + + // Do not modify in-place. Instead keep track of what must be done + placed := 0 + + // TODO test when I am sure of what this method will do + // XXX Unclear whether this will be helpful because a seperate code path is + // likely need for setting health + healthy := 0 + unhealthy := 0 + + // If there was no existing allocation, this is a placement and we increment + // the placement + if existing == nil { + placed++ + } else if existing.DeploymentHealth == nil && alloc.DeploymentHealth != nil { + if *alloc.DeploymentHealth { + healthy++ + } else { + unhealthy++ + } + } else if existing.DeploymentHealth != nil && alloc.DeploymentHealth != nil { + // See if it has gone from healthy to unhealthy + if *existing.DeploymentHealth && !*alloc.DeploymentHealth { + healthy-- + unhealthy++ + } + } + + // Nothing to do + if placed == 0 && healthy == 0 && unhealthy == 0 { + return nil + } + + // Create a copy of the deployment object + deploymentCopy := deployment.Copy() + deploymentCopy.ModifyIndex = index + + if unhealthy != 0 { + deploymentCopy.Status = structs.DeploymentStatusFailed + deploymentCopy.StatusDescription = "Allocation(s) marked as unhealthy" + } + + state := deploymentCopy.TaskGroups[alloc.TaskGroup] + state.PlacedAllocs += placed + state.HealthyAllocs += healthy + state.UnhealthyAllocs += unhealthy + + // Upsert the new deployment + if err := s.upsertDeploymentImpl(index, deploymentCopy, false, txn); err != nil { + return err + } + + return nil +} + // updateSummaryWithAlloc updates the job summary when allocations are updated // or inserted func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocation, diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 98d4ced2f..86319ec28 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -25,13 +25,218 @@ func testStateStore(t *testing.T) *StateStore { return state } +// This test checks that: +// 1) The job is denormalized +// 2) Allocations are created +func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing.T) { + state := testStateStore(t) + alloc := mock.Alloc() + job := alloc.Job + alloc.Job = nil + + if err := state.UpsertJob(999, job); err != nil { + t.Fatalf("err: %v", err) + } + + // Create a plan result + res := structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc}, + Job: job, + }, + } + + err := state.UpsertPlanResults(1000, &res) + if err != nil { + t.Fatalf("err: %v", err) + } + + ws := memdb.NewWatchSet() + out, err := state.AllocByID(ws, alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(alloc, out) { + t.Fatalf("bad: %#v %#v", alloc, out) + } + + index, err := state.Index("allocs") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1000 { + t.Fatalf("bad: %d", index) + } + + if watchFired(ws) { + t.Fatalf("bad") + } +} + +// This test checks that the deployment is created and allocations count towards +// the deployment +func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { + state := testStateStore(t) + alloc := mock.Alloc() + alloc2 := mock.Alloc() + job := alloc.Job + alloc.Job = nil + alloc2.Job = nil + + d := mock.Deployment() + alloc.DeploymentID = d.ID + alloc2.DeploymentID = d.ID + + if err := state.UpsertJob(999, job); err != nil { + t.Fatalf("err: %v", err) + } + + // Create a plan result + res := structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc, alloc2}, + Job: job, + }, + CreatedDeployment: d, + } + + err := state.UpsertPlanResults(1000, &res) + if err != nil { + t.Fatalf("err: %v", err) + } + + ws := memdb.NewWatchSet() + out, err := state.AllocByID(ws, alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(alloc, out) { + t.Fatalf("bad: %#v %#v", alloc, out) + } + + dout, err := state.DeploymentByID(ws, d.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if dout == nil { + t.Fatalf("bad: nil deployment") + } + + tg, ok := dout.TaskGroups[alloc.TaskGroup] + if !ok { + t.Fatalf("bad: nil deployment state") + } + if tg == nil || tg.PlacedAllocs != 2 { + t.Fatalf("bad: %v", dout) + } + + if watchFired(ws) { + t.Fatalf("bad") + } +} + +// This test checks that when a new deployment is made, the old ones are +// cancelled. +func TestStateStore_UpsertPlanResults_Deployment_CancelOld(t *testing.T) { + state := testStateStore(t) + + // Create a job that applies to all + job := mock.Job() + if err := state.UpsertJob(998, job); err != nil { + t.Fatalf("err: %v", err) + } + + // Create two deployments: + // One that is already terminal and assert its modify index isn't touched + // A second that is outstanding and assert it gets cancelled. + dterminal := mock.Deployment() + dterminal.Status = structs.DeploymentStatusFailed + dterminal.JobID = job.ID + doutstanding := mock.Deployment() + doutstanding.JobID = job.ID + + if err := state.UpsertDeployment(999, dterminal, false); err != nil { + t.Fatalf("err: %v", err) + } + if err := state.UpsertDeployment(1000, doutstanding, false); err != nil { + t.Fatalf("err: %v", err) + } + + alloc := mock.Alloc() + alloc2 := mock.Alloc() + alloc.Job = nil + alloc2.Job = nil + + dnew := mock.Deployment() + dnew.JobID = job.ID + alloc.DeploymentID = dnew.ID + alloc2.DeploymentID = dnew.ID + + // Create a plan result + res := structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc, alloc2}, + Job: job, + }, + CreatedDeployment: dnew, + } + + err := state.UpsertPlanResults(1000, &res) + if err != nil { + t.Fatalf("err: %v", err) + } + + ws := memdb.NewWatchSet() + + // Check the deployments are correctly updated. + dout, err := state.DeploymentByID(ws, dnew.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if dout == nil { + t.Fatalf("bad: nil deployment") + } + + tg, ok := dout.TaskGroups[alloc.TaskGroup] + if !ok { + t.Fatalf("bad: nil deployment state") + } + if tg == nil || tg.PlacedAllocs != 2 { + t.Fatalf("bad: %v", dout) + } + + dterminalout, err := state.DeploymentByID(ws, dterminal.ID) + if err != nil || dterminalout == nil { + t.Fatalf("bad: %v %v", err, dterminalout) + } + if !reflect.DeepEqual(dterminalout, dterminal) { + t.Fatalf("bad: %v %v", dterminal, dterminalout) + } + + doutstandingout, err := state.DeploymentByID(ws, doutstanding.ID) + if err != nil || doutstandingout == nil { + t.Fatalf("bad: %v %v", err, doutstandingout) + } + if doutstandingout.Status != structs.DeploymentStatusCancelled || doutstandingout.ModifyIndex != 1000 { + t.Fatalf("bad: %v", doutstandingout) + } + + if watchFired(ws) { + t.Fatalf("bad") + } +} + func TestStateStore_UpsertDeployment(t *testing.T) { state := testStateStore(t) deployment := mock.Deployment() // Create a watchset so we can test that upsert fires the watch ws := memdb.NewWatchSet() - _, err := state.DeploymentByJobID(ws, deployment.ID) + _, err := state.DeploymentsByJobID(ws, deployment.ID) if err != nil { t.Fatalf("bad: %v", err) } @@ -103,7 +308,7 @@ func TestStateStore_UpsertDeployment_Cancel(t *testing.T) { } // Get the deployments by job - deployments, err := state.DeploymentByJobID(ws, deployment.JobID) + deployments, err := state.DeploymentsByJobID(ws, deployment.JobID) if err != nil { t.Fatalf("err: %v", err) } @@ -112,6 +317,15 @@ func TestStateStore_UpsertDeployment_Cancel(t *testing.T) { t.Fatalf("got %d deployments; want %v", l, 2) } + latest, err := state.LatestDeploymentByJobID(ws, deployment.JobID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if latest == nil || latest.CreateIndex != 1001 { + t.Fatalf("bad: %+v", latest) + } + index, err := state.Index("deployment") if err != nil { t.Fatalf("err: %v", err) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c011a7166..c4cfae486 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -53,6 +53,7 @@ const ( ReconcileJobSummariesRequestType VaultAccessorRegisterRequestType VaultAccessorDegisterRequestType + ApplyPlanResultsRequestType ) const ( @@ -360,6 +361,19 @@ type PlanRequest struct { WriteRequest } +// ApplyPlanResultsRequest is used by the planner to apply a Raft transaction +// committing the result of a plan. +type ApplyPlanResultsRequest struct { + // AllocUpdateRequest holds the allocation updates to be made by the + // scheduler. + AllocUpdateRequest + + // CreatedDeployment is the deployment created as a result of a scheduling + // event. Any existing deployment should be cancelled when the new + // deployment is created. + CreatedDeployment *Deployment +} + // AllocUpdateRequest is used to submit changes to allocations, either // to cause evictions or to assign new allocaitons. Both can be done // within a single transaction