mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 18:05:42 +03:00
Add comments to functions, and use require instead of assert
This commit is contained in:
@@ -157,7 +157,6 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
|
||||
Deployment: result.Deployment,
|
||||
DeploymentUpdates: result.DeploymentUpdates,
|
||||
EvalID: plan.EvalID,
|
||||
NodePreemptions: make([]*structs.Allocation, 0, len(result.NodePreemptions)),
|
||||
}
|
||||
|
||||
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
|
||||
@@ -167,8 +166,9 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
|
||||
// Initialize the allocs request using the new optimized log entry format.
|
||||
// Determine the minimum number of updates, could be more if there
|
||||
// are multiple updates per node
|
||||
req.AllocsStopped = make([]*structs.Allocation, 0, len(result.NodeUpdate))
|
||||
req.AllocsStopped = make([]*structs.AllocationDiff, 0, len(result.NodeUpdate))
|
||||
req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation))
|
||||
req.AllocsPreempted = make([]*structs.AllocationDiff, 0, len(result.NodePreemptions))
|
||||
|
||||
for _, updateList := range result.NodeUpdate {
|
||||
for _, stoppedAlloc := range updateList {
|
||||
@@ -186,7 +186,7 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
|
||||
|
||||
for _, preemptions := range result.NodePreemptions {
|
||||
for _, preemptedAlloc := range preemptions {
|
||||
req.NodePreemptions = append(req.NodePreemptions, normalizePreemptedAlloc(preemptedAlloc, now))
|
||||
req.AllocsPreempted = append(req.AllocsPreempted, normalizePreemptedAlloc(preemptedAlloc, now))
|
||||
|
||||
// Gather jobids to create follow up evals
|
||||
appendNamespacedJobID(preemptedJobIDs, preemptedAlloc)
|
||||
@@ -201,8 +201,9 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
|
||||
minUpdates := len(result.NodeUpdate)
|
||||
minUpdates += len(result.NodeAllocation)
|
||||
|
||||
// Initialize the allocs request using the older log entry format
|
||||
// Initialize using the older log entry format for Alloc and NodePreemptions
|
||||
req.Alloc = make([]*structs.Allocation, 0, minUpdates)
|
||||
req.NodePreemptions = make([]*structs.Allocation, 0, len(result.NodePreemptions))
|
||||
|
||||
for _, updateList := range result.NodeUpdate {
|
||||
req.Alloc = append(req.Alloc, updateList...)
|
||||
@@ -261,16 +262,24 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
|
||||
return future, nil
|
||||
}
|
||||
|
||||
func normalizePreemptedAlloc(preemptedAlloc *structs.Allocation, now int64) *structs.Allocation {
|
||||
return &structs.Allocation{
|
||||
// normalizePreemptedAlloc removes redundant fields from a preempted allocation and
|
||||
// returns AllocationDiff. Since a preempted allocation is always an existing allocation,
|
||||
// the struct returned by this method contains only the differential, which can be
|
||||
// applied to an existing allocation, to yield the updated struct
|
||||
func normalizePreemptedAlloc(preemptedAlloc *structs.Allocation, now int64) *structs.AllocationDiff {
|
||||
return &structs.AllocationDiff{
|
||||
ID: preemptedAlloc.ID,
|
||||
PreemptedByAllocation: preemptedAlloc.PreemptedByAllocation,
|
||||
ModifyTime: now,
|
||||
}
|
||||
}
|
||||
|
||||
func normalizeStoppedAlloc(stoppedAlloc *structs.Allocation, now int64) *structs.Allocation {
|
||||
return &structs.Allocation{
|
||||
// normalizeStoppedAlloc removes redundant fields from a stopped allocation and
|
||||
// returns AllocationDiff. Since a stopped allocation is always an existing allocation,
|
||||
// the struct returned by this method contains only the differential, which can be
|
||||
// applied to an existing allocation, to yield the updated struct
|
||||
func normalizeStoppedAlloc(stoppedAlloc *structs.Allocation, now int64) *structs.AllocationDiff {
|
||||
return &structs.AllocationDiff{
|
||||
ID: stoppedAlloc.ID,
|
||||
DesiredDescription: stoppedAlloc.DesiredDescription,
|
||||
ClientStatus: stoppedAlloc.ClientStatus,
|
||||
@@ -278,6 +287,7 @@ func normalizeStoppedAlloc(stoppedAlloc *structs.Allocation, now int64) *structs
|
||||
}
|
||||
}
|
||||
|
||||
// appendNamespacedJobID appends the namespaced Job ID for the alloc to the jobIDs set
|
||||
func appendNamespacedJobID(jobIDs map[structs.NamespacedID]struct{}, alloc *structs.Allocation) {
|
||||
id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
|
||||
if _, ok := jobIDs[id]; !ok {
|
||||
@@ -285,6 +295,8 @@ func appendNamespacedJobID(jobIDs map[structs.NamespacedID]struct{}, alloc *stru
|
||||
}
|
||||
}
|
||||
|
||||
// updateAllocTimestamps sets the CreateTime and ModifyTime for the allocations
|
||||
// to the timestamp provided
|
||||
func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
|
||||
for _, alloc := range allocations {
|
||||
if alloc.CreateTime == 0 {
|
||||
|
||||
@@ -230,6 +230,8 @@ func TestPlanApply_applyPlan(t *testing.T) {
|
||||
assert.Equal(index, evalOut.ModifyIndex)
|
||||
}
|
||||
|
||||
// Verifies that applyPlan properly updates the constituent objects in MemDB,
|
||||
// when the plan contains normalized allocs.
|
||||
func TestPlanApply_applyPlanWithNormalizedAllocs(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, func(c *Config) {
|
||||
@@ -312,39 +314,41 @@ func TestPlanApply_applyPlanWithNormalizedAllocs(t *testing.T) {
|
||||
EvalID: eval.ID,
|
||||
}
|
||||
|
||||
require := require.New(t)
|
||||
assert := assert.New(t)
|
||||
|
||||
// Apply the plan
|
||||
future, err := s1.applyPlan(plan, planRes, snap)
|
||||
assert := assert.New(t)
|
||||
assert.Nil(err)
|
||||
require.NoError(err)
|
||||
|
||||
// Verify our optimistic snapshot is updated
|
||||
ws := memdb.NewWatchSet()
|
||||
allocOut, err := snap.AllocByID(ws, alloc.ID)
|
||||
assert.Nil(err)
|
||||
assert.NotNil(allocOut)
|
||||
require.NoError(err)
|
||||
require.NotNil(allocOut)
|
||||
|
||||
deploymentOut, err := snap.DeploymentByID(ws, plan.Deployment.ID)
|
||||
assert.Nil(err)
|
||||
assert.NotNil(deploymentOut)
|
||||
require.NoError(err)
|
||||
require.NotNil(deploymentOut)
|
||||
|
||||
// Check plan does apply cleanly
|
||||
index, err := planWaitFuture(future)
|
||||
assert.Nil(err)
|
||||
require.NoError(err)
|
||||
assert.NotEqual(0, index)
|
||||
|
||||
// Lookup the allocation
|
||||
fsmState := s1.fsm.State()
|
||||
allocOut, err = fsmState.AllocByID(ws, alloc.ID)
|
||||
assert.Nil(err)
|
||||
assert.NotNil(allocOut)
|
||||
require.NoError(err)
|
||||
require.NotNil(allocOut)
|
||||
assert.True(allocOut.CreateTime > 0)
|
||||
assert.True(allocOut.ModifyTime > 0)
|
||||
assert.Equal(allocOut.CreateTime, allocOut.ModifyTime)
|
||||
|
||||
// Verify stopped alloc diff applied cleanly
|
||||
updatedStoppedAlloc, err := fsmState.AllocByID(ws, stoppedAlloc.ID)
|
||||
assert.Nil(err)
|
||||
assert.NotNil(updatedStoppedAlloc)
|
||||
require.NoError(err)
|
||||
require.NotNil(updatedStoppedAlloc)
|
||||
assert.True(updatedStoppedAlloc.ModifyTime > timestampBeforeCommit)
|
||||
assert.Equal(updatedStoppedAlloc.DesiredDescription, stoppedAllocDiff.DesiredDescription)
|
||||
assert.Equal(updatedStoppedAlloc.ClientStatus, stoppedAllocDiff.ClientStatus)
|
||||
@@ -352,8 +356,8 @@ func TestPlanApply_applyPlanWithNormalizedAllocs(t *testing.T) {
|
||||
|
||||
// Verify preempted alloc diff applied cleanly
|
||||
updatedPreemptedAlloc, err := fsmState.AllocByID(ws, preemptedAlloc.ID)
|
||||
assert.Nil(err)
|
||||
assert.NotNil(updatedPreemptedAlloc)
|
||||
require.NoError(err)
|
||||
require.NotNil(updatedPreemptedAlloc)
|
||||
assert.True(updatedPreemptedAlloc.ModifyTime > timestampBeforeCommit)
|
||||
assert.Equal(updatedPreemptedAlloc.DesiredDescription,
|
||||
"Preempted by alloc ID "+preemptedAllocDiff.PreemptedByAllocation)
|
||||
@@ -361,20 +365,20 @@ func TestPlanApply_applyPlanWithNormalizedAllocs(t *testing.T) {
|
||||
|
||||
// Lookup the new deployment
|
||||
dout, err := fsmState.DeploymentByID(ws, plan.Deployment.ID)
|
||||
assert.Nil(err)
|
||||
assert.NotNil(dout)
|
||||
require.NoError(err)
|
||||
require.NotNil(dout)
|
||||
|
||||
// Lookup the updated deployment
|
||||
dout2, err := fsmState.DeploymentByID(ws, oldDeployment.ID)
|
||||
assert.Nil(err)
|
||||
assert.NotNil(dout2)
|
||||
require.NoError(err)
|
||||
require.NotNil(dout2)
|
||||
assert.Equal(desiredStatus, dout2.Status)
|
||||
assert.Equal(desiredStatusDescription, dout2.StatusDescription)
|
||||
|
||||
// Lookup updated eval
|
||||
evalOut, err := fsmState.EvalByID(ws, eval.ID)
|
||||
assert.Nil(err)
|
||||
assert.NotNil(evalOut)
|
||||
require.NoError(err)
|
||||
require.NotNil(evalOut)
|
||||
assert.Equal(index, evalOut.ModifyIndex)
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,9 @@ import (
|
||||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
||||
// This test compares the size of the normalized + OmitEmpty raft plan log entry
|
||||
// with the earlier denormalized log.
|
||||
//
|
||||
// Whenever this test is changed, care should be taken to ensure the older msgpack size
|
||||
// is recalculated when new fields are introduced in ApplyPlanResultsRequest
|
||||
func TestPlanNormalize(t *testing.T) {
|
||||
@@ -33,12 +36,12 @@ func TestPlanNormalize(t *testing.T) {
|
||||
}
|
||||
|
||||
now := time.Now().UTC().UnixNano()
|
||||
mockStoppedAllocSlice := make([]*structs.Allocation, numStoppedAllocs)
|
||||
mockStoppedAllocSlice := make([]*structs.AllocationDiff, numStoppedAllocs)
|
||||
for i := 0; i < numStoppedAllocs; i++ {
|
||||
mockStoppedAllocSlice = append(mockStoppedAllocSlice, normalizeStoppedAlloc(mockAlloc, now))
|
||||
}
|
||||
|
||||
mockPreemptionAllocSlice := make([]*structs.Allocation, numPreemptedAllocs)
|
||||
mockPreemptionAllocSlice := make([]*structs.AllocationDiff, numPreemptedAllocs)
|
||||
for i := 0; i < numPreemptedAllocs; i++ {
|
||||
mockPreemptionAllocSlice = append(mockPreemptionAllocSlice, normalizePreemptedAlloc(mockAlloc, now))
|
||||
}
|
||||
@@ -49,7 +52,7 @@ func TestPlanNormalize(t *testing.T) {
|
||||
AllocsUpdated: mockUpdatedAllocSlice,
|
||||
AllocsStopped: mockStoppedAllocSlice,
|
||||
},
|
||||
NodePreemptions: mockPreemptionAllocSlice,
|
||||
AllocsPreempted: mockPreemptionAllocSlice,
|
||||
}
|
||||
|
||||
handle := structs.MsgpackHandle
|
||||
|
||||
@@ -175,12 +175,18 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
|
||||
return err
|
||||
}
|
||||
|
||||
err = snapshot.DenormalizeAllocationsSlice(results.AllocsStopped, results.Job)
|
||||
allocsStopped, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsStopped, results.Job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = snapshot.DenormalizeAllocationsSlice(results.NodePreemptions, results.Job)
|
||||
allocsPreempted, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsPreempted, results.Job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// COMPAT 0.11: Remove this denormalization when NodePreemptions is removed
|
||||
results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions, results.Job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -210,32 +216,32 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
|
||||
}
|
||||
}
|
||||
|
||||
noOfAllocs := len(results.NodePreemptions)
|
||||
|
||||
if len(results.Alloc) > 0 {
|
||||
numAllocs := 0
|
||||
if len(results.Alloc) > 0 || len(results.NodePreemptions) > 0 {
|
||||
// COMPAT 0.11: This branch will be removed, when Alloc is removed
|
||||
// Attach the job to all the allocations. It is pulled out in the payload to
|
||||
// avoid the redundancy of encoding, but should be denormalized prior to
|
||||
// being inserted into MemDB.
|
||||
addComputedAllocAttrs(results.Alloc, results.Job)
|
||||
noOfAllocs += len(results.Alloc)
|
||||
numAllocs = len(results.Alloc) + len(results.NodePreemptions)
|
||||
} else {
|
||||
// Attach the job to all the allocations. It is pulled out in the payload to
|
||||
// avoid the redundancy of encoding, but should be denormalized prior to
|
||||
// being inserted into MemDB.
|
||||
addComputedAllocAttrs(results.AllocsUpdated, results.Job)
|
||||
noOfAllocs += len(results.AllocsStopped) + len(results.AllocsUpdated)
|
||||
numAllocs = len(allocsStopped) + len(results.AllocsUpdated) + len(allocsPreempted)
|
||||
}
|
||||
|
||||
allocsToUpsert := make([]*structs.Allocation, 0, noOfAllocs)
|
||||
allocsToUpsert := make([]*structs.Allocation, 0, numAllocs)
|
||||
|
||||
// COMPAT 0.11: This append should be removed when Alloc is removed
|
||||
// COMPAT 0.11: Both these appends should be removed when Alloc and NodePreemptions are removed
|
||||
allocsToUpsert = append(allocsToUpsert, results.Alloc...)
|
||||
|
||||
allocsToUpsert = append(allocsToUpsert, results.AllocsStopped...)
|
||||
allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...)
|
||||
allocsToUpsert = append(allocsToUpsert, results.NodePreemptions...)
|
||||
|
||||
allocsToUpsert = append(allocsToUpsert, allocsStopped...)
|
||||
allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...)
|
||||
allocsToUpsert = append(allocsToUpsert, allocsPreempted...)
|
||||
|
||||
if err := s.upsertAllocsImpl(index, allocsToUpsert, txn); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -251,6 +257,8 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
|
||||
return nil
|
||||
}
|
||||
|
||||
// addComputedAllocAttrs adds the computed/derived attributes to the allocation.
|
||||
// This method is used when an allocation is being denormalized.
|
||||
func addComputedAllocAttrs(allocs []*structs.Allocation, job *structs.Job) {
|
||||
structs.DenormalizeAllocationJobs(job, allocs)
|
||||
|
||||
@@ -4111,24 +4119,40 @@ type StateSnapshot struct {
|
||||
// Allocation for each of the Allocation diffs and merges the updated attributes with
|
||||
// the existing Allocation, and attaches the Job provided
|
||||
func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation, job *structs.Job) error {
|
||||
for _, allocDiffs := range nodeAllocations {
|
||||
if err := s.DenormalizeAllocationsSlice(allocDiffs, job); err != nil {
|
||||
for nodeID, allocs := range nodeAllocations {
|
||||
denormalizedAllocs, err := s.DenormalizeAllocationSlice(allocs, job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nodeAllocations[nodeID] = denormalizedAllocs
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DenormalizeAllocationsSlice queries the Allocation for each of the Allocation diffs and merges
|
||||
// DenormalizeAllocationSlice queries the Allocation for each allocation diff
|
||||
// represented as an Allocation and merges the updated attributes with the existing
|
||||
// Allocation, and attaches the Job provided.
|
||||
func (s *StateSnapshot) DenormalizeAllocationSlice(allocs []*structs.Allocation, job *structs.Job) ([]*structs.Allocation, error) {
|
||||
allocDiffs := make([]*structs.AllocationDiff, len(allocs))
|
||||
for i, alloc := range allocs {
|
||||
allocDiffs[i] = alloc.AllocationDiff()
|
||||
}
|
||||
|
||||
return s.DenormalizeAllocationDiffSlice(allocDiffs, job)
|
||||
}
|
||||
|
||||
// DenormalizeAllocationDiffSlice queries the Allocation for each AllocationDiff and merges
|
||||
// the updated attributes with the existing Allocation, and attaches the Job provided
|
||||
func (s *StateSnapshot) DenormalizeAllocationsSlice(allocDiffs []*structs.Allocation, job *structs.Job) error {
|
||||
func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.AllocationDiff, job *structs.Job) ([]*structs.Allocation, error) {
|
||||
// Output index for denormalized Allocations
|
||||
j := 0
|
||||
|
||||
denormalizedAllocs := make([]*structs.Allocation, len(allocDiffs))
|
||||
for _, allocDiff := range allocDiffs {
|
||||
alloc, err := s.AllocByID(nil, allocDiff.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("alloc lookup failed: %v", err)
|
||||
return nil, fmt.Errorf("alloc lookup failed: %v", err)
|
||||
}
|
||||
if alloc == nil {
|
||||
continue
|
||||
@@ -4156,12 +4180,12 @@ func (s *StateSnapshot) DenormalizeAllocationsSlice(allocDiffs []*structs.Alloca
|
||||
}
|
||||
|
||||
// Update the allocDiff in the slice to equal the denormalized alloc
|
||||
allocDiffs[j] = allocCopy
|
||||
denormalizedAllocs[j] = allocCopy
|
||||
j++
|
||||
}
|
||||
// Retain only the denormalized Allocations in the slice
|
||||
allocDiffs = allocDiffs[:j]
|
||||
return nil
|
||||
denormalizedAllocs = denormalizedAllocs[:j]
|
||||
return denormalizedAllocs, nil
|
||||
}
|
||||
|
||||
func getPreemptedAllocDesiredDescription(PreemptedByAllocID string) string {
|
||||
|
||||
@@ -152,56 +152,50 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) {
|
||||
|
||||
stoppedAlloc := mock.Alloc()
|
||||
stoppedAlloc.Job = job
|
||||
stoppedAllocDiff := &structs.Allocation{
|
||||
stoppedAllocDiff := &structs.AllocationDiff{
|
||||
ID: stoppedAlloc.ID,
|
||||
DesiredDescription: "desired desc",
|
||||
ClientStatus: structs.AllocClientStatusLost,
|
||||
}
|
||||
preemptedAlloc := mock.Alloc()
|
||||
preemptedAlloc.Job = job
|
||||
preemptedAllocDiff := &structs.Allocation{
|
||||
preemptedAllocDiff := &structs.AllocationDiff{
|
||||
ID: preemptedAlloc.ID,
|
||||
PreemptedByAllocation: alloc.ID,
|
||||
}
|
||||
|
||||
if err := state.UpsertAllocs(900, []*structs.Allocation{stoppedAlloc, preemptedAlloc}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if err := state.UpsertJob(999, job); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
require := require.New(t)
|
||||
require.NoError(state.UpsertAllocs(900, []*structs.Allocation{stoppedAlloc, preemptedAlloc}))
|
||||
require.NoError(state.UpsertJob(999, job))
|
||||
|
||||
eval := mock.Eval()
|
||||
eval.JobID = job.ID
|
||||
|
||||
// Create an eval
|
||||
if err := state.UpsertEvals(1, []*structs.Evaluation{eval}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
require.NoError(state.UpsertEvals(1, []*structs.Evaluation{eval}))
|
||||
|
||||
// Create a plan result
|
||||
res := structs.ApplyPlanResultsRequest{
|
||||
AllocUpdateRequest: structs.AllocUpdateRequest{
|
||||
AllocsUpdated: []*structs.Allocation{alloc},
|
||||
AllocsStopped: []*structs.Allocation{stoppedAllocDiff},
|
||||
AllocsStopped: []*structs.AllocationDiff{stoppedAllocDiff},
|
||||
Job: job,
|
||||
},
|
||||
EvalID: eval.ID,
|
||||
NodePreemptions: []*structs.Allocation{preemptedAllocDiff},
|
||||
AllocsPreempted: []*structs.AllocationDiff{preemptedAllocDiff},
|
||||
}
|
||||
assert := assert.New(t)
|
||||
planModifyIndex := uint64(1000)
|
||||
err := state.UpsertPlanResults(planModifyIndex, &res)
|
||||
assert.Nil(err)
|
||||
require.NoError(err)
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
out, err := state.AllocByID(ws, alloc.ID)
|
||||
assert.Nil(err)
|
||||
require.NoError(err)
|
||||
assert.Equal(alloc, out)
|
||||
|
||||
updatedStoppedAlloc, err := state.AllocByID(ws, stoppedAlloc.ID)
|
||||
assert.Nil(err)
|
||||
require.NoError(err)
|
||||
assert.Equal(stoppedAllocDiff.DesiredDescription, updatedStoppedAlloc.DesiredDescription)
|
||||
assert.Equal(structs.AllocDesiredStatusStop, updatedStoppedAlloc.DesiredStatus)
|
||||
assert.Equal(stoppedAllocDiff.ClientStatus, updatedStoppedAlloc.ClientStatus)
|
||||
@@ -209,23 +203,21 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) {
|
||||
assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex)
|
||||
|
||||
updatedPreemptedAlloc, err := state.AllocByID(ws, preemptedAlloc.ID)
|
||||
assert.Nil(err)
|
||||
require.NoError(err)
|
||||
assert.Equal(structs.AllocDesiredStatusEvict, updatedPreemptedAlloc.DesiredStatus)
|
||||
assert.Equal(preemptedAllocDiff.PreemptedByAllocation, updatedPreemptedAlloc.PreemptedByAllocation)
|
||||
assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex)
|
||||
assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex)
|
||||
|
||||
index, err := state.Index("allocs")
|
||||
assert.Nil(err)
|
||||
require.NoError(err)
|
||||
assert.EqualValues(planModifyIndex, index)
|
||||
|
||||
if watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
require.False(watchFired(ws))
|
||||
|
||||
evalOut, err := state.EvalByID(ws, eval.ID)
|
||||
assert.Nil(err)
|
||||
assert.NotNil(evalOut)
|
||||
require.NoError(err)
|
||||
require.NotNil(evalOut)
|
||||
assert.EqualValues(planModifyIndex, evalOut.ModifyIndex)
|
||||
}
|
||||
|
||||
|
||||
@@ -660,9 +660,15 @@ type ApplyPlanResultsRequest struct {
|
||||
// the evaluation itself being updated.
|
||||
EvalID string
|
||||
|
||||
// NodePreemptions is a slice of allocation diffs from other lower priority jobs
|
||||
// COMPAT 0.11
|
||||
// NodePreemptions is a slice of allocations from other lower priority jobs
|
||||
// that are preempted. Preempted allocations are marked as evicted.
|
||||
NodePreemptions []*AllocationDiff
|
||||
// Deprecated: Replaced with AllocsPreempted which contains only the diff
|
||||
NodePreemptions []*Allocation
|
||||
|
||||
// AllocsPreempted is a slice of allocation diffs from other lower priority jobs
|
||||
// that are preempted. Preempted allocations are marked as evicted.
|
||||
AllocsPreempted []*AllocationDiff
|
||||
|
||||
// PreemptionEvals is a slice of follow up evals for jobs whose allocations
|
||||
// have been preempted to place allocs in this plan
|
||||
@@ -7294,10 +7300,6 @@ type Allocation struct {
|
||||
ModifyTime int64
|
||||
}
|
||||
|
||||
// AllocationDiff is a type alias for Allocation used to indicate that a diff is
|
||||
// and not the entire allocation
|
||||
type AllocationDiff = Allocation
|
||||
|
||||
// Index returns the index of the allocation. If the allocation is from a task
|
||||
// group with count greater than 1, there will be multiple allocations for it.
|
||||
func (a *Allocation) Index() uint {
|
||||
@@ -7687,6 +7689,19 @@ func (a *Allocation) Stub() *AllocListStub {
|
||||
}
|
||||
}
|
||||
|
||||
// AllocationDiff converts an Allocation type to an AllocationDiff type
|
||||
// If at any time, modification are made to AllocationDiff so that an
|
||||
// Allocation can no longer be safely converted to AllocationDiff,
|
||||
// this method should be changed accordingly.
|
||||
func (a *Allocation) AllocationDiff() *AllocationDiff {
|
||||
return (*AllocationDiff)(a)
|
||||
}
|
||||
|
||||
// AllocationDiff is another named type for Allocation (to use the same fields),
|
||||
// which is used to represent the delta for an Allocation. If you need a method
|
||||
// defined on the al
|
||||
type AllocationDiff Allocation
|
||||
|
||||
// AllocListStub is used to return a subset of alloc information
|
||||
type AllocListStub struct {
|
||||
ID string
|
||||
@@ -8468,6 +8483,8 @@ func (p *Plan) IsNoOp() bool {
|
||||
len(p.DeploymentUpdates) == 0
|
||||
}
|
||||
|
||||
// NormalizeAllocations normalizes allocations to remove fields that can
|
||||
// be fetched from the MemDB instead of sending over the wire
|
||||
func (p *Plan) NormalizeAllocations() {
|
||||
for _, allocs := range p.NodeUpdate {
|
||||
for i, alloc := range allocs {
|
||||
|
||||
@@ -148,8 +148,9 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
|
||||
return true, parts
|
||||
}
|
||||
|
||||
// ServersMeetMinimumVersion returns whether the given alive servers are at least on the
|
||||
// given Nomad version
|
||||
// ServersMeetMinimumVersion returns whether the Nomad servers are at least on the
|
||||
// given Nomad version. The checkFailedServers parameter specifies whether version
|
||||
// for the failed servers should be verified.
|
||||
func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version, checkFailedServers bool) bool {
|
||||
for _, member := range members {
|
||||
if valid, parts := isNomadServer(member); valid && (parts.Status == serf.StatusAlive || (checkFailedServers && parts.Status == serf.StatusFailed)) {
|
||||
|
||||
@@ -104,10 +104,6 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
|
||||
|
||||
// Flatten evicts and allocs
|
||||
now := time.Now().UTC().UnixNano()
|
||||
allocsStopped := make([]*structs.Allocation, 0, len(result.NodeUpdate))
|
||||
for _, updateList := range plan.NodeUpdate {
|
||||
allocsStopped = append(allocsStopped, updateList...)
|
||||
}
|
||||
|
||||
allocsUpdated := make([]*structs.Allocation, 0, len(result.NodeAllocation))
|
||||
for _, allocList := range plan.NodeAllocation {
|
||||
@@ -115,15 +111,6 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
|
||||
}
|
||||
updateCreateTimestamp(allocsUpdated, now)
|
||||
|
||||
// Set modify time for preempted allocs and flatten them
|
||||
var preemptedAllocs []*structs.Allocation
|
||||
for _, preemptions := range result.NodePreemptions {
|
||||
for _, alloc := range preemptions {
|
||||
alloc.ModifyTime = now
|
||||
preemptedAllocs = append(preemptedAllocs, alloc)
|
||||
}
|
||||
}
|
||||
|
||||
// Setup the update request
|
||||
req := structs.ApplyPlanResultsRequest{
|
||||
AllocUpdateRequest: structs.AllocUpdateRequest{
|
||||
@@ -132,19 +119,53 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
|
||||
Deployment: plan.Deployment,
|
||||
DeploymentUpdates: plan.DeploymentUpdates,
|
||||
EvalID: plan.EvalID,
|
||||
NodePreemptions: preemptedAllocs,
|
||||
}
|
||||
|
||||
if h.optimizePlan {
|
||||
req.AllocsStopped = allocsStopped
|
||||
stoppedAllocDiffs := make([]*structs.AllocationDiff, 0, len(result.NodeUpdate))
|
||||
for _, updateList := range plan.NodeUpdate {
|
||||
for _, stoppedAlloc := range updateList {
|
||||
stoppedAllocDiffs = append(stoppedAllocDiffs, stoppedAlloc.AllocationDiff())
|
||||
}
|
||||
}
|
||||
req.AllocsStopped = stoppedAllocDiffs
|
||||
|
||||
req.AllocsUpdated = allocsUpdated
|
||||
|
||||
preemptedAllocDiffs := make([]*structs.AllocationDiff, 0, len(result.NodePreemptions))
|
||||
for _, preemptions := range plan.NodePreemptions {
|
||||
for _, preemptedAlloc := range preemptions {
|
||||
allocDiff := preemptedAlloc.AllocationDiff()
|
||||
allocDiff.ModifyTime = now
|
||||
preemptedAllocDiffs = append(preemptedAllocDiffs, allocDiff)
|
||||
}
|
||||
}
|
||||
req.AllocsPreempted = preemptedAllocDiffs
|
||||
} else {
|
||||
// COMPAT 0.11: Handles unoptimized log format
|
||||
var allocs []*structs.Allocation
|
||||
|
||||
allocsStopped := make([]*structs.Allocation, 0, len(result.NodeUpdate))
|
||||
for _, updateList := range plan.NodeUpdate {
|
||||
allocsStopped = append(allocsStopped, updateList...)
|
||||
}
|
||||
allocs = append(allocs, allocsStopped...)
|
||||
|
||||
allocs = append(allocs, allocsUpdated...)
|
||||
updateCreateTimestamp(allocs, now)
|
||||
|
||||
req.Alloc = allocs
|
||||
|
||||
// Set modify time for preempted allocs and flatten them
|
||||
var preemptedAllocs []*structs.Allocation
|
||||
for _, preemptions := range result.NodePreemptions {
|
||||
for _, alloc := range preemptions {
|
||||
alloc.ModifyTime = now
|
||||
preemptedAllocs = append(preemptedAllocs, alloc)
|
||||
}
|
||||
}
|
||||
|
||||
req.NodePreemptions = preemptedAllocs
|
||||
}
|
||||
|
||||
// Apply the full plan
|
||||
|
||||
Reference in New Issue
Block a user