diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 2e6c6db2c..746123962 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -29,6 +29,7 @@ import ( _ "github.com/hashicorp/nomad/e2e/rescheduling" _ "github.com/hashicorp/nomad/e2e/spread" _ "github.com/hashicorp/nomad/e2e/systemsched" + _ "github.com/hashicorp/nomad/e2e/scalingpolicies" _ "github.com/hashicorp/nomad/e2e/taskevents" _ "github.com/hashicorp/nomad/e2e/vaultsecrets" _ "github.com/hashicorp/nomad/e2e/volumes" diff --git a/e2e/scalingpolicies/input/namespace_a_1.nomad b/e2e/scalingpolicies/input/namespace_a_1.nomad new file mode 100644 index 000000000..6cfb8be85 --- /dev/null +++ b/e2e/scalingpolicies/input/namespace_a_1.nomad @@ -0,0 +1,34 @@ +job "horizontally_scalable" { + datacenters = ["dc1"] + type = "service" + namespace = "NamespaceA" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "horizontally_scalable" { + + scaling { + min = 1 + max = 10 + enabled = true + + policy { + // Setting a single value allows us to check the policy block is + // handled opaquely by Nomad. + cooldown = "13m" + } + } + + task "test" { + driver = "raw_exec" + + config { + command = "bash" + args = ["-c", "sleep 15000"] + } + } + } +} diff --git a/e2e/scalingpolicies/input/namespace_default_1.nomad b/e2e/scalingpolicies/input/namespace_default_1.nomad new file mode 100644 index 000000000..fec404f84 --- /dev/null +++ b/e2e/scalingpolicies/input/namespace_default_1.nomad @@ -0,0 +1,33 @@ +job "horizontally_scalable" { + datacenters = ["dc1"] + type = "service" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "horizontally_scalable" { + + scaling { + min = 1 + max = 10 + enabled = true + + policy { + // Setting a single value allows us to check the policy block is + // handled opaquely by Nomad. + cooldown = "13m" + } + } + + task "test" { + driver = "raw_exec" + + config { + command = "bash" + args = ["-c", "sleep 15000"] + } + } + } +} diff --git a/e2e/scalingpolicies/input/namespace_default_2.nomad b/e2e/scalingpolicies/input/namespace_default_2.nomad new file mode 100644 index 000000000..28d0f0ee3 --- /dev/null +++ b/e2e/scalingpolicies/input/namespace_default_2.nomad @@ -0,0 +1,33 @@ +job "horizontally_scalable" { + datacenters = ["dc1"] + type = "service" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "horizontally_scalable" { + + scaling { + min = 1 + max = 11 + enabled = false + + policy { + // Setting a single value allows us to check the policy block is + // handled opaquely by Nomad. + cooldown = "14m" + } + } + + task "test" { + driver = "raw_exec" + + config { + command = "bash" + args = ["-c", "sleep 15000"] + } + } + } +} diff --git a/e2e/scalingpolicies/scalingpolicies.go b/e2e/scalingpolicies/scalingpolicies.go new file mode 100644 index 000000000..3579219d2 --- /dev/null +++ b/e2e/scalingpolicies/scalingpolicies.go @@ -0,0 +1,188 @@ +package scalingpolicies + +import ( + "os" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/e2e/e2eutil" + "github.com/hashicorp/nomad/e2e/framework" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/stretchr/testify/require" +) + +type ScalingPolicyE2ETest struct { + framework.TC + namespaceIDs []string + namespacedJobIDs [][2]string +} + +func init() { + framework.AddSuites(&framework.TestSuite{ + Component: "ScalingPolicies", + CanRunLocal: true, + Cases: []framework.TestCase{ + new(ScalingPolicyE2ETest), + }, + }) + +} + +func (tc *ScalingPolicyE2ETest) BeforeAll(f *framework.F) { + e2eutil.WaitForLeader(f.T(), tc.Nomad()) + e2eutil.WaitForNodesReady(f.T(), tc.Nomad(), 1) +} + +func (tc *ScalingPolicyE2ETest) AfterEach(f *framework.F) { + if os.Getenv("NOMAD_TEST_SKIPCLEANUP") == "1" { + return + } + + for _, namespacedJob := range tc.namespacedJobIDs { + _, err := e2eutil.Command("nomad", "job", "stop", "-purge", "-namespace", + namespacedJob[0], namespacedJob[1]) + f.Assert().NoError(err) + } + tc.namespacedJobIDs = [][2]string{} + + for _, ns := range tc.namespaceIDs { + _, err := e2eutil.Command("nomad", "namespace", "delete", ns) + f.Assert().NoError(err) + } + tc.namespaceIDs = []string{} + + _, err := e2eutil.Command("nomad", "system", "gc") + f.Assert().NoError(err) +} + +// TestScalingPolicies multi-namespace scaling policy test which performs reads +// and job manipulations to ensure Nomad behaves as expected. +func (tc *ScalingPolicyE2ETest) TestScalingPolicies(f *framework.F) { + t := f.T() + + // Create our non-default namespace. + _, err := e2eutil.Command("nomad", "namespace", "apply", "NamespaceA") + f.NoError(err, "could not create namespace") + tc.namespaceIDs = append(tc.namespaceIDs, "NamespaceA") + + // Register the jobs, capturing their IDs. + jobDefault1 := tc.run(f, "scalingpolicies/input/namespace_default_1.nomad", "default", []string{"running"}) + jobDefault2 := tc.run(f, "scalingpolicies/input/namespace_default_1.nomad", "default", []string{"running"}) + jobA := tc.run(f, "scalingpolicies/input/namespace_a_1.nomad", "NamespaceA", []string{"running"}) + + // Setup some reused query options. + defaultQueryOpts := api.QueryOptions{Namespace: "default"} + aQueryOpts := api.QueryOptions{Namespace: "NamespaceA"} + + // Perform initial listings to check each namespace has the correct number + // of policies. + defaultPolicyList, _, err := tc.Nomad().Scaling().ListPolicies(&defaultQueryOpts) + require.NoError(t, err) + require.Len(t, defaultPolicyList, 2) + + policyListA, _, err := tc.Nomad().Scaling().ListPolicies(&aQueryOpts) + require.NoError(t, err) + require.Len(t, policyListA, 1) + + // Deregister a job from the default namespace and then check all the + // response objects. + _, _, err = tc.Nomad().Jobs().Deregister(jobDefault1, true, &api.WriteOptions{Namespace: "default"}) + require.NoError(t, err) + + for i, namespacedJob := range tc.namespacedJobIDs { + if namespacedJob[1] == jobDefault1 && namespacedJob[0] == "default" { + tc.namespacedJobIDs = append(tc.namespacedJobIDs[:i], tc.namespacedJobIDs[i+1:]...) + break + } + } + + defaultPolicyList, _, err = tc.Nomad().Scaling().ListPolicies(&defaultQueryOpts) + require.NoError(t, err) + require.Len(t, defaultPolicyList, 1) + + defaultPolicy := defaultPolicyList[0] + require.True(t, defaultPolicy.Enabled) + require.Equal(t, "horizontal", defaultPolicy.Type) + require.Equal(t, defaultPolicy.Target["Namespace"], "default") + require.Equal(t, defaultPolicy.Target["Job"], jobDefault2) + require.Equal(t, defaultPolicy.Target["Group"], "horizontally_scalable") + + defaultPolicyInfo, _, err := tc.Nomad().Scaling().GetPolicy(defaultPolicy.ID, &defaultQueryOpts) + require.NoError(t, err) + require.Equal(t, *defaultPolicyInfo.Min, int64(1)) + require.Equal(t, *defaultPolicyInfo.Max, int64(10)) + require.Equal(t, defaultPolicyInfo.Policy["cooldown"], "13m") + require.Equal(t, defaultPolicyInfo.Target["Namespace"], "default") + require.Equal(t, defaultPolicyInfo.Target["Job"], jobDefault2) + require.Equal(t, defaultPolicyInfo.Target["Group"], "horizontally_scalable") + + // Check response objects from the namespace with name "NamespaceA". + aPolicyList, _, err := tc.Nomad().Scaling().ListPolicies(&aQueryOpts) + require.NoError(t, err) + require.Len(t, aPolicyList, 1) + + aPolicy := aPolicyList[0] + require.True(t, aPolicy.Enabled) + require.Equal(t, "horizontal", aPolicy.Type) + require.Equal(t, aPolicy.Target["Namespace"], "NamespaceA") + require.Equal(t, aPolicy.Target["Job"], jobA) + require.Equal(t, aPolicy.Target["Group"], "horizontally_scalable") + + aPolicyInfo, _, err := tc.Nomad().Scaling().GetPolicy(aPolicy.ID, &aQueryOpts) + require.NoError(t, err) + require.Equal(t, *aPolicyInfo.Min, int64(1)) + require.Equal(t, *aPolicyInfo.Max, int64(10)) + require.Equal(t, aPolicyInfo.Policy["cooldown"], "13m") + require.Equal(t, aPolicyInfo.Target["Namespace"], "NamespaceA") + require.Equal(t, aPolicyInfo.Target["Job"], jobA) + require.Equal(t, aPolicyInfo.Target["Group"], "horizontally_scalable") + + // List policies using the splat namespace operator. + splatPolicyList, _, err := tc.Nomad().Scaling().ListPolicies(&api.QueryOptions{Namespace: "*"}) + require.NoError(t, err) + require.Len(t, splatPolicyList, 2) + + // Deregister the job from the "NamespaceA" namespace and then check the + // response objects. + _, _, err = tc.Nomad().Jobs().Deregister(jobA, true, &api.WriteOptions{Namespace: "NamespaceA"}) + require.NoError(t, err) + + for i, namespacedJob := range tc.namespacedJobIDs { + if namespacedJob[1] == jobA && namespacedJob[0] == "NamespaceA" { + tc.namespacedJobIDs = append(tc.namespacedJobIDs[:i], tc.namespacedJobIDs[i+1:]...) + break + } + } + + aPolicyList, _, err = tc.Nomad().Scaling().ListPolicies(&aQueryOpts) + require.NoError(t, err) + require.Len(t, aPolicyList, 0) + + // Update the running job scaling policy and ensure the changes are + // reflected. + err = e2eutil.Register(jobDefault2, "scalingpolicies/input/namespace_default_2.nomad") + require.NoError(t, err) + + defaultPolicyList, _, err = tc.Nomad().Scaling().ListPolicies(&defaultQueryOpts) + require.NoError(t, err) + require.Len(t, defaultPolicyList, 1) + + defaultPolicyInfo, _, err = tc.Nomad().Scaling().GetPolicy(defaultPolicyList[0].ID, &defaultQueryOpts) + require.NoError(t, err) + require.Equal(t, *defaultPolicyInfo.Min, int64(1)) + require.Equal(t, *defaultPolicyInfo.Max, int64(11)) + require.Equal(t, defaultPolicyInfo.Policy["cooldown"], "14m") + require.Equal(t, defaultPolicyInfo.Target["Namespace"], "default") + require.Equal(t, defaultPolicyInfo.Target["Job"], jobDefault2) + require.Equal(t, defaultPolicyInfo.Target["Group"], "horizontally_scalable") + +} + +// run is a helper which runs a job within a namespace, providing the caller +// with the generated jobID. +func (tc *ScalingPolicyE2ETest) run(f *framework.F, jobSpec, ns string, expected []string) string { + jobID := "test-scaling-policy-" + uuid.Generate()[0:8] + f.NoError(e2eutil.Register(jobID, jobSpec)) + tc.namespacedJobIDs = append(tc.namespacedJobIDs, [2]string{ns, jobID}) + f.NoError(e2eutil.WaitForAllocStatusExpected(jobID, ns, expected), "job should be running") + return jobID +}