diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 1f2ecf079..7b5b96b96 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -25,7 +25,6 @@ import ( _ "github.com/hashicorp/nomad/e2e/quotas" _ "github.com/hashicorp/nomad/e2e/scalingpolicies" _ "github.com/hashicorp/nomad/e2e/scheduler_sysbatch" - _ "github.com/hashicorp/nomad/e2e/scheduler_system" _ "github.com/hashicorp/nomad/e2e/taskevents" // these are no longer on the old framework but by importing them @@ -44,6 +43,7 @@ import ( _ "github.com/hashicorp/nomad/e2e/podman" _ "github.com/hashicorp/nomad/e2e/rescheduling" _ "github.com/hashicorp/nomad/e2e/scaling" + _ "github.com/hashicorp/nomad/e2e/scheduler_system" _ "github.com/hashicorp/nomad/e2e/secret" _ "github.com/hashicorp/nomad/e2e/spread" _ "github.com/hashicorp/nomad/e2e/vaultsecrets" diff --git a/e2e/scheduler_system/doc.go b/e2e/scheduler_system/doc.go new file mode 100644 index 000000000..7e468fbcb --- /dev/null +++ b/e2e/scheduler_system/doc.go @@ -0,0 +1,5 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +// Package scheduler_system contains test cases related to system scheduler +package scheduler_system diff --git a/e2e/scheduler_system/input/system_canary_v0.nomad.hcl b/e2e/scheduler_system/input/system_canary_v0.nomad.hcl new file mode 100644 index 000000000..ede0217a9 --- /dev/null +++ b/e2e/scheduler_system/input/system_canary_v0.nomad.hcl @@ -0,0 +1,48 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: BUSL-1.1 + +job "system_job" { + datacenters = ["dc1", "dc2"] + + type = "system" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "system_job_group" { + count = 1 + + update { + max_parallel = 1 + min_healthy_time = "1s" + healthy_deadline = "1m" + auto_revert = false + canary = 50 + } + + restart { + attempts = 10 + interval = "1m" + + delay = "2s" + mode = "delay" + } + + task "system_task" { + driver = "docker" + + config { + image = "busybox:1" + + command = "/bin/sh" + args = ["-c", "sleep 15000"] + } + + env { + version = "0" + } + } + } +} diff --git a/e2e/scheduler_system/input/system_canary_v1.nomad.hcl b/e2e/scheduler_system/input/system_canary_v1.nomad.hcl new file mode 100644 index 000000000..74176468f --- /dev/null +++ b/e2e/scheduler_system/input/system_canary_v1.nomad.hcl @@ -0,0 +1,48 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: BUSL-1.1 + +job "system_job" { + datacenters = ["dc1", "dc2"] + + type = "system" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "system_job_group" { + count = 1 + + update { + max_parallel = 1 + min_healthy_time = "1s" + healthy_deadline = "1m" + auto_revert = false + canary = 50 + } + + restart { + attempts = 10 + interval = "1m" + + delay = "2s" + mode = "delay" + } + + task "system_task" { + driver = "docker" + + config { + image = "busybox:1" + + command = "/bin/sh" + args = ["-c", "sleep 150000"] + } + + env { + version = "1" + } + } + } +} diff --git a/e2e/scheduler_system/systemsched.go b/e2e/scheduler_system/systemsched.go deleted file mode 100644 index dd614dac0..000000000 --- a/e2e/scheduler_system/systemsched.go +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 - -package scheduler_system - -import ( - "github.com/hashicorp/nomad/api" - "github.com/hashicorp/nomad/e2e/e2eutil" - "github.com/hashicorp/nomad/e2e/framework" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/require" -) - -type SystemSchedTest struct { - framework.TC - jobIDs []string -} - -func init() { - framework.AddSuites(&framework.TestSuite{ - Component: "SystemScheduler", - CanRunLocal: true, - Cases: []framework.TestCase{ - new(SystemSchedTest), - }, - }) -} - -func (tc *SystemSchedTest) BeforeAll(f *framework.F) { - // Ensure cluster has leader before running tests - e2eutil.WaitForLeader(f.T(), tc.Nomad()) - e2eutil.WaitForNodesReady(f.T(), tc.Nomad(), 4) -} - -func (tc *SystemSchedTest) TestJobUpdateOnIneligbleNode(f *framework.F) { - t := f.T() - nomadClient := tc.Nomad() - - jobID := "system_deployment" - tc.jobIDs = append(tc.jobIDs, jobID) - e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scheduler_system/input/system_job0.nomad", jobID, "") - - jobs := nomadClient.Jobs() - allocs, _, err := jobs.Allocations(jobID, true, nil) - require.NoError(t, err) - require.True(t, len(allocs) >= 3) - - allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs) - - // Wait for allocations to get past initial pending state - e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs) - - // Mark one node as ineligible - nodesAPI := tc.Nomad().Nodes() - disabledNodeID := allocs[0].NodeID - _, err = nodesAPI.ToggleEligibility(disabledNodeID, false, nil) - require.NoError(t, err) - - // Assert all jobs still running - jobs = nomadClient.Jobs() - allocs, _, err = jobs.Allocations(jobID, true, nil) - require.NoError(t, err) - - allocIDs = e2eutil.AllocIDsFromAllocationListStubs(allocs) - allocForDisabledNode := make(map[string]*api.AllocationListStub) - - // Wait for allocs to run and collect allocs on ineligible node - // Allocation could have failed, ensure there is one thats running - // and that it is the correct version (0) - e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs) - for _, alloc := range allocs { - if alloc.NodeID == disabledNodeID { - allocForDisabledNode[alloc.ID] = alloc - } - } - - // Filter down to only our latest running alloc - for _, alloc := range allocForDisabledNode { - require.Equal(t, uint64(0), alloc.JobVersion) - if alloc.ClientStatus == structs.AllocClientStatusComplete { - // remove the old complete alloc from map - delete(allocForDisabledNode, alloc.ID) - } - } - require.NotEmpty(t, allocForDisabledNode) - require.Len(t, allocForDisabledNode, 1) - - // Update job - e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scheduler_system/input/system_job1.nomad", jobID, "") - - // Get updated allocations - jobs = nomadClient.Jobs() - allocs, _, err = jobs.Allocations(jobID, false, nil) - require.NoError(t, err) - - // Wait for allocs to start - allocIDs = e2eutil.AllocIDsFromAllocationListStubs(allocs) - e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs) - - // Get latest alloc status now that they are no longer pending - allocs, _, err = jobs.Allocations(jobID, false, nil) - require.NoError(t, err) - - var foundPreviousAlloc bool - for _, dAlloc := range allocForDisabledNode { - for _, alloc := range allocs { - if alloc.ID == dAlloc.ID { - foundPreviousAlloc = true - require.Equal(t, uint64(0), alloc.JobVersion) - } else if alloc.ClientStatus == structs.AllocClientStatusRunning { - // Ensure allocs running on non disabled node are - // newer version - require.Equal(t, uint64(1), alloc.JobVersion) - } - } - } - require.True(t, foundPreviousAlloc, "unable to find previous alloc for ineligible node") -} - -func (tc *SystemSchedTest) AfterEach(f *framework.F) { - nomadClient := tc.Nomad() - - // Mark all nodes eligible - nodesAPI := tc.Nomad().Nodes() - nodes, _, _ := nodesAPI.List(nil) - for _, node := range nodes { - nodesAPI.ToggleEligibility(node.ID, true, nil) - } - - jobs := nomadClient.Jobs() - // Stop all jobs in test - for _, id := range tc.jobIDs { - jobs.Deregister(id, true, nil) - } - tc.jobIDs = []string{} - // Garbage collect - nomadClient.System().GarbageCollect() -} diff --git a/e2e/scheduler_system/systemsched_test.go b/e2e/scheduler_system/systemsched_test.go new file mode 100644 index 000000000..ee4c3b89f --- /dev/null +++ b/e2e/scheduler_system/systemsched_test.go @@ -0,0 +1,201 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package scheduler_system + +import ( + "context" + "testing" + "time" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/e2e/v3/cluster3" + "github.com/hashicorp/nomad/e2e/v3/jobs3" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" +) + +func TestSystemScheduler(t *testing.T) { + cluster3.Establish(t, + cluster3.Leader(), + cluster3.LinuxClients(3), + ) + + t.Run("testJobUpdateOnIneligibleNode", testJobUpdateOnIneligbleNode) + t.Run("testCanaryUpdate", testCanaryUpdate) +} + +func testJobUpdateOnIneligbleNode(t *testing.T) { + job, cleanup := jobs3.Submit(t, + "./input/system_job0.nomad", + jobs3.DisableRandomJobID(), + ) + t.Cleanup(cleanup) + + allocs := job.Allocs() + must.True(t, len(allocs) >= 3) + + // Mark one node as ineligible + nodesAPI := job.NodesApi() + disabledNodeID := allocs[0].NodeID + _, err := nodesAPI.ToggleEligibility(disabledNodeID, false, nil) + must.NoError(t, err) + + // make sure to mark all nodes as eligible once we're done + t.Cleanup(func() { + nodes, _, err := nodesAPI.List(nil) + must.NoError(t, err) + for _, n := range nodes { + _, err := nodesAPI.ToggleEligibility(n.ID, true, nil) + must.NoError(t, err) + } + }) + + // Assert all jobs still running + allocs = job.Allocs() + must.SliceNotEmpty(t, allocs) + + allocForDisabledNode := make(map[string]*api.AllocationListStub) + for _, alloc := range allocs { + if alloc.NodeID == disabledNodeID { + allocForDisabledNode[alloc.ID] = alloc + } + } + + // Update job + job2, cleanup2 := jobs3.Submit(t, + "./input/system_job1.nomad", + jobs3.DisableRandomJobID(), + ) + t.Cleanup(cleanup2) + + // Get updated allocations + allocs = job2.Allocs() + must.SliceNotEmpty(t, allocs) + + var foundPreviousAlloc bool + for _, dAlloc := range allocForDisabledNode { + for _, alloc := range allocs { + if alloc.ID == dAlloc.ID { + foundPreviousAlloc = true + must.Eq(t, uint64(0), alloc.JobVersion) + } else if alloc.ClientStatus == structs.AllocClientStatusRunning { + // Ensure allocs running on non disabled node are + // newer version + must.Eq(t, uint64(1), alloc.JobVersion) + } + } + } + must.True(t, foundPreviousAlloc, must.Sprint("unable to find previous alloc for ineligible node")) +} + +func testCanaryUpdate(t *testing.T) { + _, cleanup := jobs3.Submit(t, + "./input/system_canary_v0.nomad.hcl", + jobs3.DisableRandomJobID(), + ) + t.Cleanup(cleanup) + + // Update job + job2, cleanup2 := jobs3.Submit(t, + "./input/system_canary_v1.nomad.hcl", + jobs3.DisableRandomJobID(), + jobs3.Detach(), + ) + t.Cleanup(cleanup2) + + // how many eligible nodes do we have? + nodesApi := job2.NodesApi() + nodesList, _, err := nodesApi.List(nil) + must.Nil(t, err) + must.SliceNotEmpty(t, nodesList) + + numberOfEligibleNodes := 0 + for _, n := range nodesList { + if n.SchedulingEligibility == api.NodeSchedulingEligible { + numberOfEligibleNodes += 1 + } + } + + // Get updated allocations + allocs := job2.Allocs() + must.SliceNotEmpty(t, allocs) + + deploymentsApi := job2.DeploymentsApi() + deploymentsList, _, err := deploymentsApi.List(nil) + must.NoError(t, err) + + var deployment *api.Deployment + for _, d := range deploymentsList { + if d.JobID == job2.JobID() && d.Status == api.DeploymentStatusRunning { + deployment = d + } + } + must.NotNil(t, deployment) + + // wait for the canary allocations to become healthy + timeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + job2.WaitForDeploymentFunc(timeout, deployment.ID, func(d *api.Deployment) bool { + for _, tg := range d.TaskGroups { // we only have 1 tg in this job + if d.JobVersion == 1 && tg.HealthyAllocs >= tg.DesiredCanaries { + return true + } + } + return false + }) + + // find allocations from v1 version of the job, they should all be canaries + // and there should be exactly 2 + count := 0 + for _, a := range allocs { + if a.JobVersion == 1 { + must.True(t, a.DeploymentStatus.Canary) + count += 1 + } + } + must.Eq(t, numberOfEligibleNodes/2, count, must.Sprint("expected canaries to be placed on 50% of eligible nodes")) + + // promote canaries + deployments, _, err := deploymentsApi.List(nil) + must.NoError(t, err) + must.SliceLen(t, 2, deployments) + _, _, err = deploymentsApi.PromoteAll(deployments[0].ID, nil) + must.NoError(t, err) + + // promoting canaries on a system job should result in a new deployment + deploymentsList, _, err = deploymentsApi.List(nil) + must.NoError(t, err) + + for _, d := range deploymentsList { + if d.JobID == job2.JobID() && d.Status == api.DeploymentStatusRunning { + deployment = d + break + } + } + must.NotNil(t, deployment) + + // wait for the promotions to become healthy + job2.WaitForDeploymentFunc(timeout, deployment.ID, func(d *api.Deployment) bool { + for _, tg := range d.TaskGroups { // we only have 1 tg in this job + if d.JobVersion == 1 && tg.HealthyAllocs >= tg.DesiredTotal { + return true + } + } + return false + }) + + // expect the number of allocations for promoted deployment to be the same + // as the number of eligible nodes + newAllocs := job2.Allocs() + must.SliceNotEmpty(t, newAllocs) + + promotedAllocs := 0 + for _, a := range newAllocs { + if a.JobVersion == 1 { + promotedAllocs += 1 + } + } + must.Eq(t, numberOfEligibleNodes, promotedAllocs) +} diff --git a/e2e/v3/cluster3/cluster3.go b/e2e/v3/cluster3/cluster3.go index 13693a93e..f8eb8c6d3 100644 --- a/e2e/v3/cluster3/cluster3.go +++ b/e2e/v3/cluster3/cluster3.go @@ -148,7 +148,7 @@ func (c *Cluster) wait() { errCh <- err }() - for i := 0; i < 5; i++ { + for range 5 { err := <-errCh must.NoError(c.t, err) } diff --git a/e2e/v3/jobs3/jobs3.go b/e2e/v3/jobs3/jobs3.go index 67a18bdad..3cfceaa98 100644 --- a/e2e/v3/jobs3/jobs3.go +++ b/e2e/v3/jobs3/jobs3.go @@ -75,6 +75,30 @@ func (sub *Submission) Allocs() []*nomadapi.AllocationListStub { return allocs } +// WaitForDeploymentFunc monitors a given deployment with provided fn and +// returns success if the fn returns true. +func (sub *Submission) WaitForDeploymentFunc(ctx context.Context, + deploymentID string, fn func(*nomadapi.Deployment) bool) { + sub.t.Helper() + + deploymentsApi := sub.nomadClient.Deployments() + for { + select { + case <-ctx.Done(): + must.Unreachable(sub.t, must.Sprint("timeout reached waiting for deployment")) + default: + } + + deployment, _, err := deploymentsApi.Info(deploymentID, nil) + must.NoError(sub.t, err) + must.NotNil(sub.t, deployment) + + if fn(deployment) { + return + } + } +} + type TaskEvents struct { Group string Task string @@ -199,6 +223,14 @@ func (sub *Submission) AllocID(group string) string { panic("bug") } +func (sub *Submission) NodesApi() *nomadapi.Nodes { + return sub.nomadClient.Nodes() +} + +func (sub *Submission) DeploymentsApi() *nomadapi.Deployments { + return sub.nomadClient.Deployments() +} + func (sub *Submission) logf(msg string, args ...any) { sub.t.Helper() util3.Log3(sub.t, sub.verbose, msg, args...) @@ -408,7 +440,7 @@ EVAL: } switch *job.Type { - case "service": + case "service", "system": // need to monitor the deployment until it is complete depAPI := sub.nomadClient.Deployments() DEPLOY: