Add tests for plan normalization

This commit is contained in:
Arshneet Singh
2019-03-05 13:41:41 -08:00
parent 4eedab18a7
commit f75c6b4bdb
10 changed files with 6485 additions and 5685 deletions

View File

@@ -56,7 +56,7 @@ func testFSM(t *testing.T) *nomadFSM {
Logger: logger,
Region: "global",
}
fsm, err := NewFSM(fsmConfig)
fsm, err := NewFSM(fsmConfig, TestServer(t, nil))
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@@ -3,6 +3,7 @@ package nomad
import (
"reflect"
"testing"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/testlog"
@@ -62,6 +63,7 @@ func testRegisterJob(t *testing.T, s *Server, j *structs.Job) {
}
}
// Deprecated: Tests the older unoptimized code path for applyPlan
func TestPlanApply_applyPlan(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
@@ -228,6 +230,154 @@ func TestPlanApply_applyPlan(t *testing.T) {
assert.Equal(index, evalOut.ModifyIndex)
}
func TestPlanApply_applyPlanWithNormalizedAllocs(t *testing.T) {
t.Parallel()
s1 := TestServer(t, func(c *Config) {
c.Build = "0.9.1"
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Register node
node := mock.Node()
testRegisterNode(t, s1, node)
// Register a fake deployment
oldDeployment := mock.Deployment()
if err := s1.State().UpsertDeployment(900, oldDeployment); err != nil {
t.Fatalf("UpsertDeployment failed: %v", err)
}
// Create a deployment
dnew := mock.Deployment()
// Create a deployment update for the old deployment id
desiredStatus, desiredStatusDescription := "foo", "bar"
updates := []*structs.DeploymentStatusUpdate{
{
DeploymentID: oldDeployment.ID,
Status: desiredStatus,
StatusDescription: desiredStatusDescription,
},
}
// Register allocs, deployment and deployment update
alloc := mock.Alloc()
stoppedAlloc := mock.Alloc()
stoppedAllocDiff := &structs.Allocation{
ID: stoppedAlloc.ID,
DesiredDescription: "Desired Description",
ClientStatus: structs.AllocClientStatusLost,
}
preemptedAlloc := mock.Alloc()
preemptedAllocDiff := &structs.Allocation{
ID: preemptedAlloc.ID,
PreemptedByAllocation: alloc.ID,
}
s1.State().UpsertJobSummary(1000, mock.JobSummary(alloc.JobID))
s1.State().UpsertAllocs(1100, []*structs.Allocation{stoppedAlloc, preemptedAlloc})
// Create an eval
eval := mock.Eval()
eval.JobID = alloc.JobID
if err := s1.State().UpsertEvals(1, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}
timestampBeforeCommit := time.Now().UTC().UnixNano()
planRes := &structs.PlanResult{
NodeAllocation: map[string][]*structs.Allocation{
node.ID: {alloc},
},
NodeUpdate: map[string][]*structs.Allocation{
stoppedAlloc.NodeID: {stoppedAllocDiff},
},
NodePreemptions: map[string][]*structs.Allocation{
preemptedAlloc.NodeID: {preemptedAllocDiff},
},
Deployment: dnew,
DeploymentUpdates: updates,
}
// Snapshot the state
snap, err := s1.State().Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
// Create the plan with a deployment
plan := &structs.Plan{
Job: alloc.Job,
Deployment: dnew,
DeploymentUpdates: updates,
EvalID: eval.ID,
}
// Apply the plan
future, err := s1.applyPlan(plan, planRes, snap)
assert := assert.New(t)
assert.Nil(err)
// Verify our optimistic snapshot is updated
ws := memdb.NewWatchSet()
allocOut, err := snap.AllocByID(ws, alloc.ID)
assert.Nil(err)
assert.NotNil(allocOut)
deploymentOut, err := snap.DeploymentByID(ws, plan.Deployment.ID)
assert.Nil(err)
assert.NotNil(deploymentOut)
// Check plan does apply cleanly
index, err := planWaitFuture(future)
assert.Nil(err)
assert.NotEqual(0, index)
// Lookup the allocation
fsmState := s1.fsm.State()
allocOut, err = fsmState.AllocByID(ws, alloc.ID)
assert.Nil(err)
assert.NotNil(allocOut)
assert.True(allocOut.CreateTime > 0)
assert.True(allocOut.ModifyTime > 0)
assert.Equal(allocOut.CreateTime, allocOut.ModifyTime)
// Verify stopped alloc diff applied cleanly
updatedStoppedAlloc, err := fsmState.AllocByID(ws, stoppedAlloc.ID)
assert.Nil(err)
assert.NotNil(updatedStoppedAlloc)
assert.True(updatedStoppedAlloc.ModifyTime > timestampBeforeCommit)
assert.Equal(updatedStoppedAlloc.DesiredDescription, stoppedAllocDiff.DesiredDescription)
assert.Equal(updatedStoppedAlloc.ClientStatus, stoppedAllocDiff.ClientStatus)
assert.Equal(updatedStoppedAlloc.DesiredStatus, structs.AllocDesiredStatusStop)
// Verify preempted alloc diff applied cleanly
updatedPreemptedAlloc, err := fsmState.AllocByID(ws, preemptedAlloc.ID)
assert.Nil(err)
assert.NotNil(updatedPreemptedAlloc)
assert.True(updatedPreemptedAlloc.ModifyTime > timestampBeforeCommit)
assert.Equal(updatedPreemptedAlloc.DesiredDescription,
"Preempted by alloc ID " + preemptedAllocDiff.PreemptedByAllocation)
assert.Equal(updatedPreemptedAlloc.DesiredStatus, structs.AllocDesiredStatusEvict)
// Lookup the new deployment
dout, err := fsmState.DeploymentByID(ws, plan.Deployment.ID)
assert.Nil(err)
assert.NotNil(dout)
// Lookup the updated deployment
dout2, err := fsmState.DeploymentByID(ws, oldDeployment.ID)
assert.Nil(err)
assert.NotNil(dout2)
assert.Equal(desiredStatus, dout2.Status)
assert.Equal(desiredStatusDescription, dout2.StatusDescription)
// Lookup updated eval
evalOut, err := fsmState.EvalByID(ws, eval.ID)
assert.Nil(err)
assert.NotNil(evalOut)
assert.Equal(index, evalOut.ModifyIndex)
}
func TestPlanApply_EvalPlan_Simple(t *testing.T) {
t.Parallel()
state := testStateStore(t)

View File

@@ -9,7 +9,7 @@ import (
"testing"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
@@ -88,6 +88,7 @@ func TestStateStore_Blocking_MinQuery(t *testing.T) {
}
}
// COMPAT 0.11: Uses AllocUpdateRequest.Alloc
// This test checks that:
// 1) The job is denormalized
// 2) Allocations are created
@@ -140,6 +141,94 @@ func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing
assert.EqualValues(1000, evalOut.ModifyIndex)
}
// This test checks that:
// 1) The job is denormalized
// 2) Allocations are denormalized and updated with the diff
func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
job := alloc.Job
alloc.Job = nil
stoppedAlloc := mock.Alloc()
stoppedAlloc.Job = job
stoppedAllocDiff := &structs.Allocation{
ID: stoppedAlloc.ID,
DesiredDescription: "desired desc",
ClientStatus: structs.AllocClientStatusLost,
}
preemptedAlloc := mock.Alloc()
preemptedAlloc.Job = job
preemptedAllocDiff := &structs.Allocation{
ID: preemptedAlloc.ID,
PreemptedByAllocation: alloc.ID,
}
if err := state.UpsertAllocs(900, []*structs.Allocation{stoppedAlloc, preemptedAlloc}); err != nil {
t.Fatalf("err: %v", err)
}
if err := state.UpsertJob(999, job); err != nil {
t.Fatalf("err: %v", err)
}
eval := mock.Eval()
eval.JobID = job.ID
// Create an eval
if err := state.UpsertEvals(1, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}
// Create a plan result
res := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
AllocsUpdated: []*structs.Allocation{alloc},
AllocsStopped: []*structs.Allocation{stoppedAllocDiff},
Job: job,
},
EvalID: eval.ID,
NodePreemptions: []*structs.Allocation{preemptedAllocDiff},
}
assert := assert.New(t)
planModifyIndex := uint64(1000)
err := state.UpsertPlanResults(planModifyIndex, &res)
assert.Nil(err)
ws := memdb.NewWatchSet()
out, err := state.AllocByID(ws, alloc.ID)
assert.Nil(err)
assert.Equal(alloc, out)
updatedStoppedAlloc, err := state.AllocByID(ws, stoppedAlloc.ID)
assert.Nil(err)
assert.Equal(stoppedAllocDiff.DesiredDescription, updatedStoppedAlloc.DesiredDescription)
assert.Equal(structs.AllocDesiredStatusStop, updatedStoppedAlloc.DesiredStatus)
assert.Equal(stoppedAllocDiff.ClientStatus, updatedStoppedAlloc.ClientStatus)
assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex)
assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex)
updatedPreemptedAlloc, err := state.AllocByID(ws, preemptedAlloc.ID)
assert.Nil(err)
assert.Equal(structs.AllocDesiredStatusEvict, updatedPreemptedAlloc.DesiredStatus)
assert.Equal(preemptedAllocDiff.PreemptedByAllocation, updatedPreemptedAlloc.PreemptedByAllocation)
assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex)
assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex)
index, err := state.Index("allocs")
assert.Nil(err)
assert.EqualValues(planModifyIndex, index)
if watchFired(ws) {
t.Fatalf("bad")
}
evalOut, err := state.EvalByID(ws, eval.ID)
assert.Nil(err)
assert.NotNil(evalOut)
assert.EqualValues(planModifyIndex, evalOut.ModifyIndex)
}
// This test checks that the deployment is created and allocations count towards
// the deployment
func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) {
@@ -271,11 +360,9 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) {
require.NoError(err)
minimalPreemptedAlloc := &structs.Allocation{
ID: preemptedAlloc.ID,
Namespace: preemptedAlloc.Namespace,
DesiredStatus: structs.AllocDesiredStatusEvict,
ModifyTime: time.Now().Unix(),
DesiredDescription: fmt.Sprintf("Preempted by allocation %v", alloc.ID),
ID: preemptedAlloc.ID,
PreemptedByAllocation: alloc.ID,
ModifyTime: time.Now().Unix(),
}
// Create eval for preempted job
@@ -316,7 +403,7 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) {
preempted, err := state.AllocByID(ws, preemptedAlloc.ID)
require.NoError(err)
require.Equal(preempted.DesiredStatus, structs.AllocDesiredStatusEvict)
require.Equal(preempted.DesiredDescription, fmt.Sprintf("Preempted by allocation %v", alloc.ID))
require.Equal(preempted.DesiredDescription, fmt.Sprintf("Preempted by alloc ID %v", alloc.ID))
// Verify eval for preempted job
preemptedJobEval, err := state.EvalByID(ws, eval2.ID)

View File

@@ -9,7 +9,7 @@ import (
"time"
"github.com/hashicorp/consul/api"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
@@ -2842,6 +2842,151 @@ func TestTaskArtifact_Validate_Checksum(t *testing.T) {
}
}
func TestPlan_NormalizeAllocationsWhenNormalizeAllocsIsTrue(t *testing.T) {
t.Parallel()
plan := &Plan{
NodeUpdate: make(map[string][]*Allocation),
NodePreemptions: make(map[string][]*Allocation),
}
plan.NormalizeAllocs = true
stoppedAlloc := MockAlloc()
desiredDesc := "Desired desc"
plan.AppendStoppedAlloc(stoppedAlloc, desiredDesc, AllocClientStatusLost)
preemptedAlloc := MockAlloc()
preemptingAllocID := uuid.Generate()
plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID)
plan.NormalizeAllocations()
actualStoppedAlloc := plan.NodeUpdate[stoppedAlloc.NodeID][0]
expectedStoppedAlloc := &Allocation{
ID: stoppedAlloc.ID,
DesiredDescription: desiredDesc,
ClientStatus: AllocClientStatusLost,
}
assert.Equal(t, expectedStoppedAlloc, actualStoppedAlloc)
actualPreemptedAlloc := plan.NodePreemptions[preemptedAlloc.NodeID][0]
expectedPreemptedAlloc := &Allocation{
ID: preemptedAlloc.ID,
PreemptedByAllocation: preemptingAllocID,
}
assert.Equal(t, expectedPreemptedAlloc, actualPreemptedAlloc)
}
func TestPlan_NormalizeAllocationsWhenNormalizeAllocsIsFalse(t *testing.T) {
t.Parallel()
plan := &Plan{
NodeUpdate: make(map[string][]*Allocation),
NodePreemptions: make(map[string][]*Allocation),
}
plan.NormalizeAllocs = false
stoppedAlloc := MockAlloc()
desiredDesc := "Desired desc"
plan.AppendStoppedAlloc(stoppedAlloc, desiredDesc, AllocClientStatusLost)
preemptedAlloc := MockAlloc()
preemptingAllocID := uuid.Generate()
plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID)
plan.NormalizeAllocations()
actualStoppedAlloc := plan.NodeUpdate[stoppedAlloc.NodeID][0]
expectedStoppedAlloc := new(Allocation)
*expectedStoppedAlloc = *stoppedAlloc
expectedStoppedAlloc.DesiredDescription = desiredDesc
expectedStoppedAlloc.DesiredStatus = AllocDesiredStatusStop
expectedStoppedAlloc.ClientStatus = AllocClientStatusLost
expectedStoppedAlloc.Job = nil
assert.Equal(t, expectedStoppedAlloc, actualStoppedAlloc)
actualPreemptedAlloc := plan.NodePreemptions[preemptedAlloc.NodeID][0]
expectedPreemptedAlloc := &Allocation{
ID: preemptedAlloc.ID,
PreemptedByAllocation: preemptingAllocID,
JobID: preemptedAlloc.JobID,
Namespace: preemptedAlloc.Namespace,
DesiredStatus: AllocDesiredStatusEvict,
DesiredDescription: fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID),
AllocatedResources: preemptedAlloc.AllocatedResources,
TaskResources: preemptedAlloc.TaskResources,
SharedResources: preemptedAlloc.SharedResources,
}
assert.Equal(t, expectedPreemptedAlloc, actualPreemptedAlloc)
}
func TestPlan_AppendStoppedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) {
t.Parallel()
plan := &Plan{
NodeUpdate: make(map[string][]*Allocation),
}
alloc := MockAlloc()
desiredDesc := "Desired desc"
plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost)
appendedAlloc := plan.NodeUpdate[alloc.NodeID][0]
expectedAlloc := new(Allocation)
*expectedAlloc = *alloc
expectedAlloc.DesiredDescription = desiredDesc
expectedAlloc.DesiredStatus = AllocDesiredStatusStop
expectedAlloc.ClientStatus = AllocClientStatusLost
expectedAlloc.Job = nil
assert.Equal(t, expectedAlloc, appendedAlloc)
assert.Equal(t, alloc.Job, plan.Job)
}
func TestPlan_AppendPreemptedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) {
t.Parallel()
plan := &Plan{
NodePreemptions: make(map[string][]*Allocation),
}
alloc := MockAlloc()
preemptingAllocID := uuid.Generate()
plan.AppendPreemptedAlloc(alloc, preemptingAllocID)
appendedAlloc := plan.NodePreemptions[alloc.NodeID][0]
expectedAlloc := &Allocation{
ID: alloc.ID,
PreemptedByAllocation: preemptingAllocID,
JobID: alloc.JobID,
Namespace: alloc.Namespace,
DesiredStatus: AllocDesiredStatusEvict,
DesiredDescription: fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID),
AllocatedResources: alloc.AllocatedResources,
TaskResources: alloc.TaskResources,
SharedResources: alloc.SharedResources,
}
assert.Equal(t, expectedAlloc, appendedAlloc)
}
func TestPlan_MsgPackTags(t *testing.T) {
t.Parallel()
planType := reflect.TypeOf(Plan{})
msgPackTags, _ := planType.FieldByName("_struct")
normalizeTag, _ := planType.FieldByName("NormalizeAllocs")
assert.Equal(t, msgPackTags.Tag, reflect.StructTag(`codec:",omitempty"`))
assert.Equal(t, normalizeTag.Tag, reflect.StructTag(`codec:"-"`))
}
func TestAllocation_MsgPackTags(t *testing.T) {
t.Parallel()
planType := reflect.TypeOf(Allocation{})
msgPackTags, _ := planType.FieldByName("_struct")
assert.Equal(t, msgPackTags.Tag, reflect.StructTag(`codec:",omitempty"`))
}
func TestEvaluation_MsgPackTags(t *testing.T) {
t.Parallel()
planType := reflect.TypeOf(Evaluation{})
msgPackTags, _ := planType.FieldByName("_struct")
assert.Equal(t, msgPackTags.Tag, reflect.StructTag(`codec:",omitempty"`))
}
func TestAllocation_Terminated(t *testing.T) {
type desiredState struct {
ClientStatus string

View File

@@ -86,23 +86,8 @@ func TestIsNomadServer(t *testing.T) {
}
}
func TestServersMeetMinimumVersion(t *testing.T) {
func TestServersMeetMinimumVersionExcludingFailed(t *testing.T) {
t.Parallel()
makeMember := func(version string) serf.Member {
return serf.Member{
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "nomad",
"region": "aws",
"dc": "east-aws",
"port": "10000",
"build": version,
"vsn": "1",
},
Status: serf.StatusAlive,
}
}
cases := []struct {
members []serf.Member
@@ -112,7 +97,7 @@ func TestServersMeetMinimumVersion(t *testing.T) {
// One server, meets reqs
{
members: []serf.Member{
makeMember("0.7.5"),
makeMember("0.7.5", serf.StatusAlive),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: true,
@@ -120,7 +105,7 @@ func TestServersMeetMinimumVersion(t *testing.T) {
// One server in dev, meets reqs
{
members: []serf.Member{
makeMember("0.8.5-dev"),
makeMember("0.8.5-dev", serf.StatusAlive),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: true,
@@ -128,7 +113,7 @@ func TestServersMeetMinimumVersion(t *testing.T) {
// One server with meta, meets reqs
{
members: []serf.Member{
makeMember("0.7.5+ent"),
makeMember("0.7.5+ent", serf.StatusAlive),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: true,
@@ -136,16 +121,17 @@ func TestServersMeetMinimumVersion(t *testing.T) {
// One server, doesn't meet reqs
{
members: []serf.Member{
makeMember("0.7.5"),
makeMember("0.7.5", serf.StatusAlive),
},
ver: version.Must(version.NewVersion("0.8.0")),
expected: false,
},
// Multiple servers, meets req version
// Multiple servers, meets req version, includes failed that doesn't meet req
{
members: []serf.Member{
makeMember("0.7.5"),
makeMember("0.8.0"),
makeMember("0.7.5", serf.StatusAlive),
makeMember("0.8.0", serf.StatusAlive),
makeMember("0.7.0", serf.StatusFailed),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: true,
@@ -153,8 +139,8 @@ func TestServersMeetMinimumVersion(t *testing.T) {
// Multiple servers, doesn't meet req version
{
members: []serf.Member{
makeMember("0.7.5"),
makeMember("0.8.0"),
makeMember("0.7.5", serf.StatusAlive),
makeMember("0.8.0", serf.StatusAlive),
},
ver: version.Must(version.NewVersion("0.8.0")),
expected: false,
@@ -169,6 +155,60 @@ func TestServersMeetMinimumVersion(t *testing.T) {
}
}
func TestServersMeetMinimumVersionIncludingFailed(t *testing.T) {
t.Parallel()
cases := []struct {
members []serf.Member
ver *version.Version
expected bool
}{
// Multiple servers, meets req version
{
members: []serf.Member{
makeMember("0.7.5", serf.StatusAlive),
makeMember("0.8.0", serf.StatusAlive),
makeMember("0.7.5", serf.StatusFailed),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: true,
},
// Multiple servers, doesn't meet req version
{
members: []serf.Member{
makeMember("0.7.5", serf.StatusAlive),
makeMember("0.8.0", serf.StatusAlive),
makeMember("0.7.0", serf.StatusFailed),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: false,
},
}
for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver, true)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}
}
}
func makeMember(version string, status serf.MemberStatus) serf.Member {
return serf.Member{
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "nomad",
"region": "aws",
"dc": "east-aws",
"port": "10000",
"build": version,
"vsn": "1",
},
Status: status,
}
}
func TestShuffleStrings(t *testing.T) {
t.Parallel()
// Generate input

View File

@@ -8,20 +8,22 @@ import (
"time"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
)
type NoopScheduler struct {
state scheduler.State
planner scheduler.Planner
eval *structs.Evaluation
err error
state scheduler.State
planner scheduler.Planner
eval *structs.Evaluation
allowPlanOptimization bool
err error
}
func (n *NoopScheduler) Process(eval *structs.Evaluation) error {
@@ -38,8 +40,9 @@ func (n *NoopScheduler) Process(eval *structs.Evaluation) error {
func init() {
scheduler.BuiltinSchedulers["noop"] = func(logger log.Logger, s scheduler.State, p scheduler.Planner, allowPlanOptimization bool) scheduler.Scheduler {
n := &NoopScheduler{
state: s,
planner: p,
state: s,
planner: p,
allowPlanOptimization: allowPlanOptimization,
}
return n
}
@@ -390,6 +393,57 @@ func TestWorker_SubmitPlan(t *testing.T) {
}
}
func TestWorker_SubmitPlanNormalizedAllocations(t *testing.T) {
t.Parallel()
s1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
c.EnabledSchedulers = []string{structs.JobTypeService}
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Register node
node := mock.Node()
testRegisterNode(t, s1, node)
job := mock.Job()
eval1 := mock.Eval()
eval1.JobID = job.ID
s1.fsm.State().UpsertJob(0, job)
s1.fsm.State().UpsertEvals(0, []*structs.Evaluation{eval1})
stoppedAlloc := mock.Alloc()
preemptedAlloc := mock.Alloc()
s1.fsm.State().UpsertAllocs(5, []*structs.Allocation{stoppedAlloc, preemptedAlloc})
// Create an allocation plan
plan := &structs.Plan{
Job: job,
EvalID: eval1.ID,
NodeUpdate: make(map[string][]*structs.Allocation),
NodePreemptions: make(map[string][]*structs.Allocation),
NormalizeAllocs: true,
}
desiredDescription := "desired desc"
plan.AppendStoppedAlloc(stoppedAlloc, desiredDescription, structs.AllocClientStatusLost)
preemptingAllocID := uuid.Generate()
plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID)
// Attempt to submit a plan
w := &Worker{srv: s1, logger: s1.logger}
w.SubmitPlan(plan)
assert.Equal(t, &structs.Allocation{
ID: preemptedAlloc.ID,
PreemptedByAllocation: preemptingAllocID,
}, plan.NodePreemptions[preemptedAlloc.NodeID][0])
assert.Equal(t, &structs.Allocation{
ID: stoppedAlloc.ID,
DesiredDescription: desiredDescription,
ClientStatus: structs.AllocClientStatusLost,
}, plan.NodeUpdate[stoppedAlloc.NodeID][0])
}
func TestWorker_SubmitPlan_MissingNodeRefresh(t *testing.T) {
t.Parallel()
s1 := TestServer(t, func(c *Config) {

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -5,9 +5,9 @@ import (
"sync"
"time"
testing "github.com/mitchellh/go-testing-interface"
"github.com/mitchellh/go-testing-interface"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@@ -53,26 +53,30 @@ type Harness struct {
nextIndex uint64
nextIndexLock sync.Mutex
allowPlanOptimization bool
}
// NewHarness is used to make a new testing harness
func NewHarness(t testing.T) *Harness {
func NewHarness(t testing.T, allowPlanOptimization bool) *Harness {
state := state.TestStateStore(t)
h := &Harness{
t: t,
State: state,
nextIndex: 1,
t: t,
State: state,
nextIndex: 1,
allowPlanOptimization: allowPlanOptimization,
}
return h
}
// NewHarnessWithState creates a new harness with the given state for testing
// purposes.
func NewHarnessWithState(t testing.T, state *state.StateStore) *Harness {
func NewHarnessWithState(t testing.T, state *state.StateStore, allowPlanOptimization bool) *Harness {
return &Harness{
t: t,
State: state,
nextIndex: 1,
t: t,
State: state,
nextIndex: 1,
allowPlanOptimization: allowPlanOptimization,
}
}
@@ -101,22 +105,17 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
result.AllocIndex = index
// Flatten evicts and allocs
var allocs []*structs.Allocation
now := time.Now().UTC().UnixNano()
allocsStopped := make([]*structs.Allocation, 0, len(result.NodeUpdate))
for _, updateList := range plan.NodeUpdate {
allocs = append(allocs, updateList...)
}
for _, allocList := range plan.NodeAllocation {
allocs = append(allocs, allocList...)
allocsStopped = append(allocsStopped, updateList...)
}
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
now := time.Now().UTC().UnixNano()
for _, alloc := range allocs {
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
allocsUpdated := make([]*structs.Allocation, 0, len(result.NodeAllocation))
for _, allocList := range plan.NodeAllocation {
allocsUpdated = append(allocsUpdated, allocList...)
}
updateCreateTimestamp(allocsUpdated, now)
// Set modify time for preempted allocs and flatten them
var preemptedAllocs []*structs.Allocation
@@ -130,8 +129,7 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
// Setup the update request
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: plan.Job,
Alloc: allocs,
Job: plan.Job,
},
Deployment: plan.Deployment,
DeploymentUpdates: plan.DeploymentUpdates,
@@ -139,11 +137,33 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
NodePreemptions: preemptedAllocs,
}
if h.allowPlanOptimization {
req.AllocsStopped = allocsStopped
req.AllocsUpdated = allocsUpdated
} else {
// Deprecated: Handles unoptimized log format
var allocs []*structs.Allocation
allocs = append(allocs, allocsStopped...)
allocs = append(allocs, allocsUpdated...)
updateCreateTimestamp(allocs, now)
req.Alloc = allocs
}
// Apply the full plan
err := h.State.UpsertPlanResults(index, &req)
return result, nil, err
}
func updateCreateTimestamp(allocations []*structs.Allocation, now int64) {
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
for _, alloc := range allocations {
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
}
}
func (h *Harness) UpdateEval(eval *structs.Evaluation) error {
// Ensure sequential plan application
h.planLock.Lock()
@@ -214,15 +234,15 @@ func (h *Harness) Snapshot() State {
// Scheduler is used to return a new scheduler from
// a snapshot of current state using the harness for planning.
func (h *Harness) Scheduler(factory Factory) Scheduler {
func (h *Harness) Scheduler(factory Factory, allowPlanOptimization bool) Scheduler {
logger := testlog.HCLogger(h.t)
return factory(logger, h.Snapshot(), h, false)
return factory(logger, h.Snapshot(), h, allowPlanOptimization)
}
// Process is used to process an evaluation given a factory
// function to create the scheduler
func (h *Harness) Process(factory Factory, eval *structs.Evaluation) error {
sched := h.Scheduler(factory)
sched := h.Scheduler(factory, h.allowPlanOptimization)
return sched.Process(eval)
}

View File

@@ -621,7 +621,7 @@ func TestEvictAndPlace_LimitEqualToAllocs(t *testing.T) {
}
func TestSetStatus(t *testing.T) {
h := NewHarness(t)
h := NewHarness(t, true)
logger := testlog.HCLogger(t)
eval := mock.Eval()
status := "a"
@@ -640,7 +640,7 @@ func TestSetStatus(t *testing.T) {
}
// Test next evals
h = NewHarness(t)
h = NewHarness(t, true)
next := mock.Eval()
if err := setStatus(logger, h, eval, next, nil, nil, status, desc, nil, ""); err != nil {
t.Fatalf("setStatus() failed: %v", err)
@@ -656,7 +656,7 @@ func TestSetStatus(t *testing.T) {
}
// Test blocked evals
h = NewHarness(t)
h = NewHarness(t, true)
blocked := mock.Eval()
if err := setStatus(logger, h, eval, nil, blocked, nil, status, desc, nil, ""); err != nil {
t.Fatalf("setStatus() failed: %v", err)
@@ -672,7 +672,7 @@ func TestSetStatus(t *testing.T) {
}
// Test metrics
h = NewHarness(t)
h = NewHarness(t, true)
metrics := map[string]*structs.AllocMetric{"foo": nil}
if err := setStatus(logger, h, eval, nil, nil, metrics, status, desc, nil, ""); err != nil {
t.Fatalf("setStatus() failed: %v", err)
@@ -688,7 +688,7 @@ func TestSetStatus(t *testing.T) {
}
// Test queued allocations
h = NewHarness(t)
h = NewHarness(t, true)
queuedAllocs := map[string]int{"web": 1}
if err := setStatus(logger, h, eval, nil, nil, metrics, status, desc, queuedAllocs, ""); err != nil {
@@ -704,7 +704,7 @@ func TestSetStatus(t *testing.T) {
t.Fatalf("setStatus() didn't set failed task group metrics correctly: %v", newEval)
}
h = NewHarness(t)
h = NewHarness(t, true)
dID := uuid.Generate()
if err := setStatus(logger, h, eval, nil, nil, metrics, status, desc, queuedAllocs, dID); err != nil {
t.Fatalf("setStatus() failed: %v", err)