E2E: dynamic host volume tests for sticky volumes (#24869)

Add tests for dynamic host volumes where the claiming jobs have `volume.sticky =
true`. Includes a test for forced rescheduling and a test for node drain.

This changeset includes a new `e2e/v3`-style package for creating dynamic host
volumes, so we can reuse that across other tests.
This commit is contained in:
Tim Gross
2025-02-07 15:50:54 -05:00
committed by GitHub
parent a6523be478
commit 3f2d4000a6
7 changed files with 494 additions and 47 deletions

View File

@@ -4,13 +4,16 @@
package dynamic_host_volumes
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/hashicorp/nomad/api"
nomadapi "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/v3/jobs3"
"github.com/hashicorp/nomad/e2e/v3/volumes3"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
)
@@ -24,54 +27,13 @@ func TestDynamicHostVolumes_CreateWorkflow(t *testing.T) {
e2eutil.WaitForLeader(t, nomad)
e2eutil.WaitForNodesReady(t, nomad, 1)
out, err := e2eutil.Command("nomad", "volume", "create",
"-detach", "input/volume-create.nomad.hcl")
must.NoError(t, err)
split := strings.Split(out, " ")
volID := strings.TrimSpace(split[len(split)-1])
t.Logf("[%v] volume %q created", time.Since(start), volID)
t.Cleanup(func() {
_, err := e2eutil.Command("nomad", "volume", "delete", "-type", "host", volID)
must.NoError(t, err)
})
out, err = e2eutil.Command("nomad", "volume", "status", "-type", "host", volID)
must.NoError(t, err)
nodeID, err := e2eutil.GetField(out, "Node ID")
must.NoError(t, err)
must.NotEq(t, "", nodeID)
t.Logf("[%v] waiting for volume %q to be ready", time.Since(start), volID)
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
node, _, err := nomad.Nodes().Info(nodeID, nil)
if err != nil {
return err
}
_, ok := node.HostVolumes["created-volume"]
if !ok {
return fmt.Errorf("node %q did not fingerprint volume %q", nodeID, volID)
}
vol, _, err := nomad.HostVolumes().Get(volID, nil)
if err != nil {
return err
}
if vol.State != "ready" {
return fmt.Errorf("node fingerprinted volume but status was not updated")
}
t.Logf("[%v] volume %q is ready", time.Since(start), volID)
return nil
}),
wait.Timeout(10*time.Second),
wait.Gap(50*time.Millisecond),
))
_, cleanupVol := volumes3.Create(t, "input/volume-create.nomad.hcl",
volumes3.WithClient(nomad))
t.Cleanup(cleanupVol)
t.Logf("[%v] submitting mounter job", time.Since(start))
_, cleanup := jobs3.Submit(t, "./input/mount-created.nomad.hcl")
t.Cleanup(cleanup)
_, cleanupJob := jobs3.Submit(t, "./input/mount-created.nomad.hcl")
t.Cleanup(cleanupJob)
t.Logf("[%v] test complete, cleaning up", time.Since(start))
}
@@ -184,3 +146,170 @@ func TestDynamicHostVolumes_RegisterWorkflow(t *testing.T) {
t.Cleanup(cleanup2)
t.Logf("[%v] test complete, cleaning up", time.Since(start))
}
// TestDynamicHostVolumes_StickyVolumes tests where a job marks a volume as
// sticky and its allocations should have strong associations with specific
// volumes as they are replaced
func TestDynamicHostVolumes_StickyVolumes(t *testing.T) {
start := time.Now()
nomad := e2eutil.NomadClient(t)
e2eutil.WaitForLeader(t, nomad)
e2eutil.WaitForNodesReady(t, nomad, 2)
// TODO: if we create # of volumes == # of nodes, we can make test flakes
// stand out more easily
_, cleanup1 := volumes3.Create(t, "input/volume-sticky.nomad.hcl",
volumes3.WithClient(nomad))
t.Cleanup(cleanup1)
_, cleanup2 := volumes3.Create(t, "input/volume-sticky.nomad.hcl",
volumes3.WithClient(nomad))
t.Cleanup(cleanup2)
t.Logf("[%v] submitting sticky volume mounter job", time.Since(start))
jobSub, cleanupJob := jobs3.Submit(t, "./input/sticky.nomad.hcl")
t.Cleanup(cleanupJob)
allocID1 := jobSub.Allocs()[0].ID
alloc, _, err := nomad.Allocations().Info(allocID1, nil)
must.NoError(t, err)
must.Len(t, 1, alloc.HostVolumeIDs)
selectedVolID := alloc.HostVolumeIDs[0]
selectedNodeID := alloc.NodeID
t.Logf("[%v] volume %q on node %q was selected",
time.Since(start), selectedVolID, selectedNodeID)
// Test: force reschedule
_, err = nomad.Allocations().Stop(alloc, nil)
must.NoError(t, err)
t.Logf("[%v] stopped allocation %q", time.Since(start), alloc.ID)
var allocID2 string
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
allocs, _, err := nomad.Jobs().Allocations(jobSub.JobID(), true, nil)
must.NoError(t, err)
if len(allocs) != 2 {
return fmt.Errorf("alloc not started")
}
for _, a := range allocs {
if a.ID != allocID1 {
allocID2 = a.ID
if a.ClientStatus != api.AllocClientStatusRunning {
return fmt.Errorf("replacement alloc not running")
}
}
}
return nil
}),
wait.Timeout(10*time.Second),
wait.Gap(50*time.Millisecond),
))
newAlloc, _, err := nomad.Allocations().Info(allocID2, nil)
must.NoError(t, err)
must.Eq(t, []string{selectedVolID}, newAlloc.HostVolumeIDs)
must.Eq(t, selectedNodeID, newAlloc.NodeID)
t.Logf("[%v] replacement alloc %q is running", time.Since(start), newAlloc.ID)
// Test: drain node
t.Logf("[%v] draining node %q", time.Since(start), selectedNodeID)
cleanup, err := drainNode(nomad, selectedNodeID, time.Second*20)
t.Cleanup(cleanup)
must.NoError(t, err)
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
evals, _, err := nomad.Jobs().Evaluations(jobSub.JobID(), nil)
if err != nil {
return err
}
got := map[string]string{}
for _, eval := range evals {
got[eval.ID[:8]] = fmt.Sprintf("status=%q trigger=%q create_index=%d",
eval.Status,
eval.TriggeredBy,
eval.CreateIndex,
)
if eval.Status == nomadapi.EvalStatusBlocked {
return nil
}
}
return fmt.Errorf("expected blocked eval, got evals => %#v", got)
}),
wait.Timeout(10*time.Second),
wait.Gap(50*time.Millisecond),
))
t.Logf("[%v] undraining node %q", time.Since(start), selectedNodeID)
cleanup()
var allocID3 string
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
allocs, _, err := nomad.Jobs().Allocations(jobSub.JobID(), true, nil)
must.NoError(t, err)
if len(allocs) != 3 {
return fmt.Errorf("alloc not started")
}
for _, a := range allocs {
if a.ID != allocID1 && a.ID != allocID2 {
allocID3 = a.ID
if a.ClientStatus != api.AllocClientStatusRunning {
return fmt.Errorf("replacement alloc %q not running", allocID3)
}
}
}
return nil
}),
wait.Timeout(10*time.Second),
wait.Gap(50*time.Millisecond),
))
newAlloc, _, err = nomad.Allocations().Info(allocID3, nil)
must.NoError(t, err)
must.Eq(t, []string{selectedVolID}, newAlloc.HostVolumeIDs)
must.Eq(t, selectedNodeID, newAlloc.NodeID)
t.Logf("[%v] replacement alloc %q is running", time.Since(start), newAlloc.ID)
}
func drainNode(nomad *nomadapi.Client, nodeID string, timeout time.Duration) (func(), error) {
resp, err := nomad.Nodes().UpdateDrainOpts(nodeID, &nomadapi.DrainOptions{
DrainSpec: &nomadapi.DrainSpec{},
MarkEligible: false,
}, nil)
if err != nil {
return func() {}, err
}
cleanup := func() {
nomad.Nodes().UpdateDrainOpts(nodeID, &nomadapi.DrainOptions{
MarkEligible: true}, nil)
}
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
drainCh := nomad.Nodes().MonitorDrain(ctx, nodeID, resp.EvalCreateIndex, false)
for {
select {
case <-ctx.Done():
return cleanup, err
case msg := <-drainCh:
if msg == nil {
return cleanup, nil
}
}
}
}

View File

@@ -7,6 +7,11 @@ job "example" {
group "web" {
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
network {
mode = "bridge"
port "www" {

View File

@@ -7,6 +7,11 @@ job "example" {
group "web" {
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
network {
mode = "bridge"
port "www" {

View File

@@ -0,0 +1,62 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: BUSL-1.1
job "example" {
# this job will get deployed and recheduled a lot in this test, so make sure
# it happens as quickly as possible
update {
min_healthy_time = "1s"
}
reschedule {
delay = "5s"
delay_function = "constant"
unlimited = true
}
group "web" {
network {
mode = "bridge"
port "www" {
to = 8001
}
}
restart {
attempts = 0
mode = "fail"
}
volume "data" {
type = "host"
source = "sticky-volume"
sticky = true
}
task "http" {
driver = "docker"
config {
image = "busybox:1"
command = "httpd"
args = ["-v", "-f", "-p", "8001", "-h", "/var/www"]
ports = ["www"]
}
volume_mount {
volume = "data"
destination = "/var/www"
}
resources {
cpu = 128
memory = 128
}
}
}
}

View File

@@ -9,3 +9,8 @@ capability {
access_mode = "single-node-writer"
attachment_mode = "file-system"
}
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

View File

@@ -0,0 +1,16 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: BUSL-1.1
name = "sticky-volume"
type = "host"
plugin_id = "mkdir"
capability {
access_mode = "single-node-writer"
attachment_mode = "file-system"
}
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

225
e2e/v3/volumes3/host3.go Normal file
View File

@@ -0,0 +1,225 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package volumes3
import (
"context"
"fmt"
"os"
"os/exec"
"strings"
"testing"
"time"
"github.com/hashicorp/nomad/api"
nomadapi "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/e2e/v3/util3"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
)
// VolumeSubmission holds state around creating and cleaning up a dynamic host
// volume.
type VolumeSubmission struct {
t *testing.T
nomadClient *nomadapi.Client
// inputs
namespace string
filename string
waitState nomadapi.HostVolumeState
// behaviors
noCleanup bool
timeout time.Duration
verbose bool
// outputs
volID string
nodeID string
}
type Option func(*VolumeSubmission)
type Cleanup func()
func Create(t *testing.T, filename string, opts ...Option) (*VolumeSubmission, Cleanup) {
t.Helper()
sub := &VolumeSubmission{
t: t,
namespace: api.DefaultNamespace,
filename: filename,
waitState: nomadapi.HostVolumeStateReady,
timeout: 10 * time.Second,
}
for _, opt := range opts {
opt(sub)
}
start := time.Now()
sub.setClient() // setup API client if not configured by option
sub.run(start) // create the volume via API
sub.waits(start) // wait on node fingerprint
return sub, sub.cleanup
}
// VolumeID returns the volume ID set by the server
func (sub *VolumeSubmission) VolumeID() string {
return sub.volID
}
// NodeID returns the node ID, which may have been set by the server
func (sub *VolumeSubmission) NodeID() string {
return sub.nodeID
}
// Get fetches the api.HostVolume from the server for further examination
func (sub *VolumeSubmission) Get() *nomadapi.HostVolume {
vol, _, err := sub.nomadClient.HostVolumes().Get(sub.volID,
&api.QueryOptions{Namespace: sub.namespace})
must.NoError(sub.t, err)
return vol
}
func (sub *VolumeSubmission) setClient() {
if sub.nomadClient != nil {
return
}
nomadClient, err := nomadapi.NewClient(nomadapi.DefaultConfig())
must.NoError(sub.t, err, must.Sprint("failed to create nomad API client"))
sub.nomadClient = nomadClient
}
func (sub *VolumeSubmission) run(start time.Time) {
sub.t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), sub.timeout)
defer cancel()
bytes, err := exec.CommandContext(ctx,
"nomad", "volume", "create",
"-namespace", sub.namespace,
"-detach", sub.filename).CombinedOutput()
must.NoError(sub.t, err, must.Sprint("error creating volume"))
out := string(bytes)
split := strings.Split(out, " ")
sub.volID = strings.TrimSpace(split[len(split)-1])
sub.logf("[%v] volume %q created", time.Since(start), sub.VolumeID())
}
func (sub *VolumeSubmission) waits(start time.Time) {
sub.t.Helper()
must.Wait(sub.t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
vol, _, err := sub.nomadClient.HostVolumes().Get(sub.volID,
&api.QueryOptions{Namespace: sub.namespace})
if err != nil {
return err
}
sub.nodeID = vol.NodeID
if vol.State != sub.waitState {
return fmt.Errorf("volume is not yet in %q state: %q", sub.waitState, vol.State)
}
// if we're waiting for the volume to be ready, let's also verify
// that it's correctly fingerprinted on the node
switch sub.waitState {
case nomadapi.HostVolumeStateReady:
node, _, err := sub.nomadClient.Nodes().Info(sub.nodeID, nil)
if err != nil {
return err
}
_, ok := node.HostVolumes[vol.Name]
if !ok {
return fmt.Errorf("node %q did not fingerprint volume %q", sub.nodeID, sub.volID)
}
}
return nil
}),
wait.Timeout(sub.timeout),
wait.Gap(50*time.Millisecond),
))
sub.logf("[%v] volume %q is %q on node %q",
time.Since(start), sub.volID, sub.waitState, sub.nodeID)
}
func (sub *VolumeSubmission) cleanup() {
if os.Getenv("NOMAD_TEST_SKIPCLEANUP") == "1" {
return
}
if sub.noCleanup {
return
}
if sub.volID == "" {
return
}
sub.noCleanup = true // so this isn't attempted more than once
ctx, cancel := context.WithTimeout(context.Background(), sub.timeout)
defer cancel()
sub.logf("deleting volume %q", sub.volID)
err := exec.CommandContext(ctx,
"nomad", "volume", "delete",
"-type", "host", "-namespace", sub.namespace, sub.volID).Run()
must.NoError(sub.t, err)
}
func (sub *VolumeSubmission) logf(msg string, args ...any) {
sub.t.Helper()
util3.Log3(sub.t, sub.verbose, msg, args...)
}
// WithClient forces the submission to use the Nomad API client passed from the
// calling test
func WithClient(client *nomadapi.Client) Option {
return func(sub *VolumeSubmission) {
sub.nomadClient = client
}
}
// WithNamespace sets a specific namespace for the volume and the wait
// query. The namespace should not be set in the spec if you're using this
// option.
func WithNamespace(ns string) Option {
return func(sub *VolumeSubmission) {
sub.namespace = ns
}
}
// WithTimeout changes the default timeout from 10s
func WithTimeout(timeout time.Duration) Option {
return func(sub *VolumeSubmission) {
sub.timeout = timeout
}
}
// WithWaitState changes the default state we wait for after creating the volume
// from the default of "ready"
func WithWaitState(state api.HostVolumeState) Option {
return func(sub *VolumeSubmission) {
sub.waitState = state
}
}
// WithNoCleanup is used for test debugging to skip tearing down the volume
func WithNoCleanup() Option {
return func(sub *VolumeSubmission) {
sub.noCleanup = true
}
}
// WithVerbose is used for test debugging to write more logs
func WithVerbose() Option {
return func(sub *VolumeSubmission) {
sub.verbose = true
}
}