Create Deployments through plan application

This commit is contained in:
Alex Dadgar
2017-05-05 13:52:01 -07:00
parent bb0bdbddde
commit 64d7fd8330
9 changed files with 608 additions and 30 deletions

View File

@@ -155,6 +155,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyUpsertVaultAccessor(buf[1:], log.Index)
case structs.VaultAccessorDegisterRequestType:
return n.applyDeregisterVaultAccessor(buf[1:], log.Index)
case structs.ApplyPlanResultsRequestType:
return n.applyPlanResults(buf[1:], log.Index)
default:
if ignoreUnknown {
n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
@@ -545,6 +547,22 @@ func (n *nomadFSM) applyDeregisterVaultAccessor(buf []byte, index uint64) interf
return nil
}
// applyPlanApply applies the results of a plan application
func (n *nomadFSM) applyPlanResults(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_plan_results"}, time.Now())
var req structs.ApplyPlanResultsRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertPlanResults(index, &req); err != nil {
n.logger.Printf("[ERR] nomad.fsm: ApplyPlan failed: %v", err)
return err
}
return nil
}
func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
// Create a new snapshot
snap, err := n.state.Snapshot()

View File

@@ -1484,3 +1484,96 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
t.Fatalf("expected: %#v, actual: %#v", &expected, out2)
}
}
func TestFSM_ApplyPlanResults(t *testing.T) {
fsm := testFSM(t)
// Create the request and create a deployment
alloc := mock.Alloc()
job := alloc.Job
alloc.Job = nil
d := mock.Deployment()
d.JobID = job.ID
d.JobModifyIndex = job.ModifyIndex
d.JobVersion = job.Version
alloc.DeploymentID = d.ID
fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{alloc},
},
CreatedDeployment: d,
}
buf, err := structs.Encode(structs.ApplyPlanResultsRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify the allocation is registered
ws := memdb.NewWatchSet()
out, err := fsm.State().AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
alloc.CreateIndex = out.CreateIndex
alloc.ModifyIndex = out.ModifyIndex
alloc.AllocModifyIndex = out.AllocModifyIndex
// Job should be re-attached
alloc.Job = job
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)
}
dout, err := fsm.State().DeploymentByID(ws, d.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if tg, ok := dout.TaskGroups[alloc.TaskGroup]; !ok || tg.PlacedAllocs != 1 {
t.Fatalf("err: %v %v", tg, err)
}
// Ensure that the original job is used
evictAlloc := alloc.Copy()
job = mock.Job()
job.Priority = 123
evictAlloc.Job = nil
evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict
req2 := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{evictAlloc},
},
}
buf, err = structs.Encode(structs.ApplyPlanResultsRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are evicted
out, err = fsm.State().AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out.DesiredStatus != structs.AllocDesiredStatusEvict {
t.Fatalf("alloc found!")
}
if out.Job == nil || out.Job.Priority == 123 {
t.Fatalf("bad job")
}
}

View File

@@ -559,7 +559,7 @@ func (j *Job) GetJobVersions(args *structs.JobSpecificRequest,
reply.Index = out[0].ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := state.Index("job_versions")
index, err := state.Index("job_version")
if err != nil {
return err
}

View File

@@ -310,11 +310,16 @@ func VaultAccessor() *structs.VaultAccessor {
func Deployment() *structs.Deployment {
return &structs.Deployment{
ID: structs.GenerateUUID(),
JobID: structs.GenerateUUID(),
JobVersion: 2,
JobModifyIndex: 20,
JobCreateIndex: 18,
ID: structs.GenerateUUID(),
JobID: structs.GenerateUUID(),
JobVersion: 2,
JobModifyIndex: 20,
JobCreateIndex: 18,
TaskGroups: map[string]*structs.DeploymentState{
"web": &structs.DeploymentState{
DesiredTotal: 10,
},
},
Status: structs.DeploymentStatusRunning,
StatusDescription: structs.DeploymentStatusRunning,
ModifyIndex: 23,

View File

@@ -106,7 +106,7 @@ func (s *Server) planApply() {
}
// Dispatch the Raft transaction for the plan
future, err := s.applyPlan(pending.plan.Job, result, snap)
future, err := s.applyPlan(pending.plan, result, snap)
if err != nil {
s.logger.Printf("[ERR] nomad: failed to submit plan: %v", err)
pending.respond(nil, err)
@@ -120,16 +120,22 @@ func (s *Server) planApply() {
}
// applyPlan is used to apply the plan result and to return the alloc index
func (s *Server) applyPlan(job *structs.Job, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
func (s *Server) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
// Determine the miniumum number of updates, could be more if there
// are multiple updates per node
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)
// Grab the job
job := plan.Job
// Setup the update request
req := structs.AllocUpdateRequest{
Job: job,
Alloc: make([]*structs.Allocation, 0, minUpdates),
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: job,
Alloc: make([]*structs.Allocation, 0, minUpdates),
},
CreatedDeployment: plan.CreatedDeployment,
}
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
@@ -148,20 +154,15 @@ func (s *Server) applyPlan(job *structs.Job, result *structs.PlanResult, snap *s
}
// Dispatch the Raft transaction
future, err := s.raftApplyFuture(structs.AllocUpdateRequestType, &req)
future, err := s.raftApplyFuture(structs.ApplyPlanResultsRequestType, &req)
if err != nil {
return nil, err
}
// Optimistically apply to our state view
if snap != nil {
// Attach the job to all the allocations. It is pulled out in the
// payload to avoid the redundancy of encoding, but should be denormalized
// prior to being inserted into MemDB.
structs.DenormalizeAllocationJobs(req.Job, req.Alloc)
nextIdx := s.raft.AppliedIndex() + 1
if err := snap.UpsertAllocs(nextIdx, req.Alloc); err != nil {
if err := snap.UpsertPlanResults(nextIdx, &req); err != nil {
return future, err
}
}

View File

@@ -70,7 +70,7 @@ func TestPlanApply_applyPlan(t *testing.T) {
// Register alloc
alloc := mock.Alloc()
s1.State().UpsertJobSummary(1000, mock.JobSummary(alloc.JobID))
plan := &structs.PlanResult{
planRes := &structs.PlanResult{
NodeAllocation: map[string][]*structs.Allocation{
node.ID: []*structs.Allocation{alloc},
},
@@ -82,8 +82,14 @@ func TestPlanApply_applyPlan(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Create the plan with a deployment
plan := &structs.Plan{
Job: alloc.Job,
CreatedDeployment: mock.Deployment(),
}
// Apply the plan
future, err := s1.applyPlan(alloc.Job, plan, snap)
future, err := s1.applyPlan(plan, planRes, snap)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -94,6 +100,10 @@ func TestPlanApply_applyPlan(t *testing.T) {
t.Fatalf("bad: %v %v", out, err)
}
if out, err := snap.DeploymentByID(ws, plan.CreatedDeployment.ID); err != nil || out == nil {
t.Fatalf("bad: %v %v", out, err)
}
// Check plan does apply cleanly
index, err := planWaitFuture(future)
if err != nil {
@@ -104,7 +114,8 @@ func TestPlanApply_applyPlan(t *testing.T) {
}
// Lookup the allocation
out, err := s1.fsm.State().AllocByID(ws, alloc.ID)
fsmState := s1.fsm.State()
out, err := fsmState.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -112,6 +123,15 @@ func TestPlanApply_applyPlan(t *testing.T) {
t.Fatalf("missing alloc")
}
// Lookup the deployment
dout, err := fsmState.DeploymentByID(ws, plan.CreatedDeployment.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if dout == nil {
t.Fatalf("missing deployment")
}
// Evict alloc, Register alloc2
allocEvict := new(structs.Allocation)
*allocEvict = *alloc
@@ -120,7 +140,7 @@ func TestPlanApply_applyPlan(t *testing.T) {
allocEvict.Job = nil
alloc2 := mock.Alloc()
s1.State().UpsertJobSummary(1500, mock.JobSummary(alloc2.JobID))
plan = &structs.PlanResult{
planRes = &structs.PlanResult{
NodeUpdate: map[string][]*structs.Allocation{
node.ID: []*structs.Allocation{allocEvict},
},
@@ -136,7 +156,10 @@ func TestPlanApply_applyPlan(t *testing.T) {
}
// Apply the plan
future, err = s1.applyPlan(job, plan, snap)
plan = &structs.Plan{
Job: job,
}
future, err = s1.applyPlan(plan, planRes, snap)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@@ -85,11 +85,69 @@ func (s *StateStore) Abandon() {
close(s.abandonCh)
}
// UpsertPlanResults is used to upsert the results of a plan.
func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanResultsRequest) error {
txn := s.db.Txn(true)
defer txn.Abort()
// Upsert the newly created deployment
if results.CreatedDeployment != nil {
if err := s.upsertDeploymentImpl(index, results.CreatedDeployment, true, txn); err != nil {
return err
}
}
// Attach the job to all the allocations. It is pulled out in the payload to
// avoid the redundancy of encoding, but should be denormalized prior to
// being inserted into MemDB.
structs.DenormalizeAllocationJobs(results.Job, results.Alloc)
// Calculate the total resources of allocations. It is pulled out in the
// payload to avoid encoding something that can be computed, but should be
// denormalized prior to being inserted into MemDB.
for _, alloc := range results.Alloc {
if alloc.Resources != nil {
continue
}
alloc.Resources = new(structs.Resources)
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
}
// Add the shared resources
alloc.Resources.Add(alloc.SharedResources)
}
// Upsert the allocations
if err := s.upsertAllocsImpl(index, results.Alloc, txn); err != nil {
return err
}
txn.Commit()
return nil
}
// UpsertJobSummary upserts a job summary into the state store.
func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error {
txn := s.db.Txn(true)
defer txn.Abort()
// Check if the job summary already exists
existing, err := txn.First("jobSummary", "id", jobSummary.JobID)
if err != nil {
return fmt.Errorf("periodic launch lookup failed: %v", err)
}
// Setup the indexes correctly
if existing != nil {
jobSummary.CreateIndex = existing.(*structs.JobSummary).CreateIndex
jobSummary.ModifyIndex = index
} else {
jobSummary.CreateIndex = index
jobSummary.ModifyIndex = index
}
// Update the index
if err := txn.Insert("job_summary", jobSummary); err != nil {
return err
@@ -124,7 +182,14 @@ func (s *StateStore) DeleteJobSummary(index uint64, id string) error {
func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment, cancelPrior bool) error {
txn := s.db.Txn(true)
defer txn.Abort()
if err := s.upsertDeploymentImpl(index, deployment, cancelPrior, txn); err != nil {
return err
}
txn.Commit()
return nil
}
func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Deployment, cancelPrior bool, txn *memdb.Txn) error {
// Go through and cancel any active deployment for the job.
if cancelPrior {
iter, err := txn.Get("deployment", "job_prefix", deployment.JobID)
@@ -157,6 +222,21 @@ func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployme
}
}
// Check if the deployment already exists
existing, err := txn.First("deployment", "id", deployment.ID)
if err != nil {
return fmt.Errorf("periodic launch lookup failed: %v", err)
}
// Setup the indexes correctly
if existing != nil {
deployment.CreateIndex = existing.(*structs.Deployment).CreateIndex
deployment.ModifyIndex = index
} else {
deployment.CreateIndex = index
deployment.ModifyIndex = index
}
// Insert the deployment
if err := txn.Insert("deployment", deployment); err != nil {
return err
@@ -167,7 +247,6 @@ func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployme
return fmt.Errorf("index update failed: %v", err)
}
txn.Commit()
return nil
}
@@ -186,7 +265,10 @@ func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error
func (s *StateStore) DeploymentByID(ws memdb.WatchSet, deploymentID string) (*structs.Deployment, error) {
txn := s.db.Txn(false)
return s.deploymentByIDImpl(ws, deploymentID, txn)
}
func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string, txn *memdb.Txn) (*structs.Deployment, error) {
watchCh, existing, err := txn.FirstWatch("deployment", "id", deploymentID)
if err != nil {
return nil, fmt.Errorf("node lookup failed: %v", err)
@@ -200,7 +282,7 @@ func (s *StateStore) DeploymentByID(ws memdb.WatchSet, deploymentID string) (*st
return nil, nil
}
func (s *StateStore) DeploymentByJobID(ws memdb.WatchSet, jobID string) ([]*structs.Deployment, error) {
func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, jobID string) ([]*structs.Deployment, error) {
txn := s.db.Txn(false)
// Get an iterator over the deployments
@@ -231,6 +313,39 @@ func (s *StateStore) DeploymentByJobID(ws memdb.WatchSet, jobID string) ([]*stru
return out, nil
}
func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, jobID string) (*structs.Deployment, error) {
txn := s.db.Txn(false)
// Get an iterator over the deployments
iter, err := txn.Get("deployment", "job_prefix", jobID)
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
var out *structs.Deployment
for {
raw := iter.Next()
if raw == nil {
break
}
d := raw.(*structs.Deployment)
// Filter non-exact matches
if d.JobID != jobID {
continue
}
if out == nil || out.CreateIndex < d.CreateIndex {
out = d
}
}
return out, nil
}
// DeleteDeployment is used to delete a deployment by ID
func (s *StateStore) DeleteDeployment(index uint64, deploymentID string) error {
txn := s.db.Txn(true)
@@ -1280,12 +1395,21 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a
return nil
}
// UpsertAllocs is used to evict a set of allocations
// and allocate new ones at the same time.
// UpsertAllocs is used to evict a set of allocations and allocate new ones at
// the same time.
func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) error {
txn := s.db.Txn(true)
defer txn.Abort()
if err := s.upsertAllocsImpl(index, allocs, txn); err != nil {
return err
}
txn.Commit()
return nil
}
// upsertAllocs is the actual implementation of UpsertAllocs so that it may be
// used with an existing transaction.
func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation, txn *memdb.Txn) error {
// Handle the allocations
jobs := make(map[string]string, 1)
for _, alloc := range allocs {
@@ -1319,6 +1443,9 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
alloc.ModifyIndex = index
alloc.AllocModifyIndex = index
// Keep the clients task states
alloc.TaskStates = exist.TaskStates
// If the scheduler is marking this allocation as lost we do not
// want to reuse the status of the existing allocation.
if alloc.ClientStatus != structs.AllocClientStatusLost {
@@ -1332,6 +1459,10 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
}
}
if err := s.updateDeploymentWithAlloc(index, alloc, exist, txn); err != nil {
return fmt.Errorf("error updating deployment: %v", err)
}
if err := s.updateSummaryWithAlloc(index, alloc, exist, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}
@@ -1364,7 +1495,6 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("setting job status failed: %v", err)
}
txn.Commit()
return nil
}
@@ -2034,6 +2164,86 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
return nil
}
// updateDeploymentWithAlloc is used to update the deployment state associated
// with the given allocation
func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *structs.Allocation, txn *memdb.Txn) error {
// Nothing to do if the allocation is not associated with a deployment
if alloc.DeploymentID == "" {
return nil
}
// Get the deployment
ws := memdb.NewWatchSet()
deployment, err := s.deploymentByIDImpl(ws, alloc.DeploymentID, txn)
if err != nil {
return err
}
if deployment == nil {
return fmt.Errorf("allocation %q references unknown deployment %q", alloc.ID, alloc.DeploymentID)
}
// Retrieve the deployment state object
_, ok := deployment.TaskGroups[alloc.TaskGroup]
if !ok {
// If the task group isn't part of the deployment, the task group wasn't
// part of a rolling update so nothing to do
return nil
}
// Do not modify in-place. Instead keep track of what must be done
placed := 0
// TODO test when I am sure of what this method will do
// XXX Unclear whether this will be helpful because a seperate code path is
// likely need for setting health
healthy := 0
unhealthy := 0
// If there was no existing allocation, this is a placement and we increment
// the placement
if existing == nil {
placed++
} else if existing.DeploymentHealth == nil && alloc.DeploymentHealth != nil {
if *alloc.DeploymentHealth {
healthy++
} else {
unhealthy++
}
} else if existing.DeploymentHealth != nil && alloc.DeploymentHealth != nil {
// See if it has gone from healthy to unhealthy
if *existing.DeploymentHealth && !*alloc.DeploymentHealth {
healthy--
unhealthy++
}
}
// Nothing to do
if placed == 0 && healthy == 0 && unhealthy == 0 {
return nil
}
// Create a copy of the deployment object
deploymentCopy := deployment.Copy()
deploymentCopy.ModifyIndex = index
if unhealthy != 0 {
deploymentCopy.Status = structs.DeploymentStatusFailed
deploymentCopy.StatusDescription = "Allocation(s) marked as unhealthy"
}
state := deploymentCopy.TaskGroups[alloc.TaskGroup]
state.PlacedAllocs += placed
state.HealthyAllocs += healthy
state.UnhealthyAllocs += unhealthy
// Upsert the new deployment
if err := s.upsertDeploymentImpl(index, deploymentCopy, false, txn); err != nil {
return err
}
return nil
}
// updateSummaryWithAlloc updates the job summary when allocations are updated
// or inserted
func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocation,

View File

@@ -25,13 +25,218 @@ func testStateStore(t *testing.T) *StateStore {
return state
}
// This test checks that:
// 1) The job is denormalized
// 2) Allocations are created
func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
job := alloc.Job
alloc.Job = nil
if err := state.UpsertJob(999, job); err != nil {
t.Fatalf("err: %v", err)
}
// Create a plan result
res := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
Job: job,
},
}
err := state.UpsertPlanResults(1000, &res)
if err != nil {
t.Fatalf("err: %v", err)
}
ws := memdb.NewWatchSet()
out, err := state.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)
}
index, err := state.Index("allocs")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1000 {
t.Fatalf("bad: %d", index)
}
if watchFired(ws) {
t.Fatalf("bad")
}
}
// This test checks that the deployment is created and allocations count towards
// the deployment
func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
alloc2 := mock.Alloc()
job := alloc.Job
alloc.Job = nil
alloc2.Job = nil
d := mock.Deployment()
alloc.DeploymentID = d.ID
alloc2.DeploymentID = d.ID
if err := state.UpsertJob(999, job); err != nil {
t.Fatalf("err: %v", err)
}
// Create a plan result
res := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc, alloc2},
Job: job,
},
CreatedDeployment: d,
}
err := state.UpsertPlanResults(1000, &res)
if err != nil {
t.Fatalf("err: %v", err)
}
ws := memdb.NewWatchSet()
out, err := state.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)
}
dout, err := state.DeploymentByID(ws, d.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if dout == nil {
t.Fatalf("bad: nil deployment")
}
tg, ok := dout.TaskGroups[alloc.TaskGroup]
if !ok {
t.Fatalf("bad: nil deployment state")
}
if tg == nil || tg.PlacedAllocs != 2 {
t.Fatalf("bad: %v", dout)
}
if watchFired(ws) {
t.Fatalf("bad")
}
}
// This test checks that when a new deployment is made, the old ones are
// cancelled.
func TestStateStore_UpsertPlanResults_Deployment_CancelOld(t *testing.T) {
state := testStateStore(t)
// Create a job that applies to all
job := mock.Job()
if err := state.UpsertJob(998, job); err != nil {
t.Fatalf("err: %v", err)
}
// Create two deployments:
// One that is already terminal and assert its modify index isn't touched
// A second that is outstanding and assert it gets cancelled.
dterminal := mock.Deployment()
dterminal.Status = structs.DeploymentStatusFailed
dterminal.JobID = job.ID
doutstanding := mock.Deployment()
doutstanding.JobID = job.ID
if err := state.UpsertDeployment(999, dterminal, false); err != nil {
t.Fatalf("err: %v", err)
}
if err := state.UpsertDeployment(1000, doutstanding, false); err != nil {
t.Fatalf("err: %v", err)
}
alloc := mock.Alloc()
alloc2 := mock.Alloc()
alloc.Job = nil
alloc2.Job = nil
dnew := mock.Deployment()
dnew.JobID = job.ID
alloc.DeploymentID = dnew.ID
alloc2.DeploymentID = dnew.ID
// Create a plan result
res := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc, alloc2},
Job: job,
},
CreatedDeployment: dnew,
}
err := state.UpsertPlanResults(1000, &res)
if err != nil {
t.Fatalf("err: %v", err)
}
ws := memdb.NewWatchSet()
// Check the deployments are correctly updated.
dout, err := state.DeploymentByID(ws, dnew.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if dout == nil {
t.Fatalf("bad: nil deployment")
}
tg, ok := dout.TaskGroups[alloc.TaskGroup]
if !ok {
t.Fatalf("bad: nil deployment state")
}
if tg == nil || tg.PlacedAllocs != 2 {
t.Fatalf("bad: %v", dout)
}
dterminalout, err := state.DeploymentByID(ws, dterminal.ID)
if err != nil || dterminalout == nil {
t.Fatalf("bad: %v %v", err, dterminalout)
}
if !reflect.DeepEqual(dterminalout, dterminal) {
t.Fatalf("bad: %v %v", dterminal, dterminalout)
}
doutstandingout, err := state.DeploymentByID(ws, doutstanding.ID)
if err != nil || doutstandingout == nil {
t.Fatalf("bad: %v %v", err, doutstandingout)
}
if doutstandingout.Status != structs.DeploymentStatusCancelled || doutstandingout.ModifyIndex != 1000 {
t.Fatalf("bad: %v", doutstandingout)
}
if watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_UpsertDeployment(t *testing.T) {
state := testStateStore(t)
deployment := mock.Deployment()
// Create a watchset so we can test that upsert fires the watch
ws := memdb.NewWatchSet()
_, err := state.DeploymentByJobID(ws, deployment.ID)
_, err := state.DeploymentsByJobID(ws, deployment.ID)
if err != nil {
t.Fatalf("bad: %v", err)
}
@@ -103,7 +308,7 @@ func TestStateStore_UpsertDeployment_Cancel(t *testing.T) {
}
// Get the deployments by job
deployments, err := state.DeploymentByJobID(ws, deployment.JobID)
deployments, err := state.DeploymentsByJobID(ws, deployment.JobID)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -112,6 +317,15 @@ func TestStateStore_UpsertDeployment_Cancel(t *testing.T) {
t.Fatalf("got %d deployments; want %v", l, 2)
}
latest, err := state.LatestDeploymentByJobID(ws, deployment.JobID)
if err != nil {
t.Fatalf("err: %v", err)
}
if latest == nil || latest.CreateIndex != 1001 {
t.Fatalf("bad: %+v", latest)
}
index, err := state.Index("deployment")
if err != nil {
t.Fatalf("err: %v", err)

View File

@@ -53,6 +53,7 @@ const (
ReconcileJobSummariesRequestType
VaultAccessorRegisterRequestType
VaultAccessorDegisterRequestType
ApplyPlanResultsRequestType
)
const (
@@ -360,6 +361,19 @@ type PlanRequest struct {
WriteRequest
}
// ApplyPlanResultsRequest is used by the planner to apply a Raft transaction
// committing the result of a plan.
type ApplyPlanResultsRequest struct {
// AllocUpdateRequest holds the allocation updates to be made by the
// scheduler.
AllocUpdateRequest
// CreatedDeployment is the deployment created as a result of a scheduling
// event. Any existing deployment should be cancelled when the new
// deployment is created.
CreatedDeployment *Deployment
}
// AllocUpdateRequest is used to submit changes to allocations, either
// to cause evictions or to assign new allocaitons. Both can be done
// within a single transaction