mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
e2e: test for allocations replacement on disconnected clients (#12375)
This test exercises the behavior of clients that become disconnected and have their allocations replaced. Future test cases will exercise the `max_client_disconnect` field on the job spec.
This commit is contained in:
138
e2e/disconnectedclients/disconnectedclients.go
Normal file
138
e2e/disconnectedclients/disconnectedclients.go
Normal file
@@ -0,0 +1,138 @@
|
||||
package disconnectedclients
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/e2e/e2eutil"
|
||||
"github.com/hashicorp/nomad/e2e/framework"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
)
|
||||
|
||||
type DisconnectedClientsE2ETest struct {
|
||||
framework.TC
|
||||
jobIDs []string
|
||||
nodeIDs []string
|
||||
}
|
||||
|
||||
const ns = ""
|
||||
|
||||
func init() {
|
||||
framework.AddSuites(&framework.TestSuite{
|
||||
Component: "DisconnectedClients",
|
||||
CanRunLocal: true,
|
||||
Cases: []framework.TestCase{
|
||||
new(DisconnectedClientsE2ETest),
|
||||
},
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func (tc *DisconnectedClientsE2ETest) BeforeAll(f *framework.F) {
|
||||
e2eutil.WaitForLeader(f.T(), tc.Nomad())
|
||||
e2eutil.WaitForNodesReady(f.T(), tc.Nomad(), 2) // needs at least 2 to test replacement
|
||||
|
||||
nodeStatuses, err := e2eutil.NodeStatusList()
|
||||
f.NoError(err)
|
||||
for _, nodeStatus := range nodeStatuses {
|
||||
tc.nodeIDs = append(tc.nodeIDs, nodeStatus["ID"])
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *DisconnectedClientsE2ETest) AfterEach(f *framework.F) {
|
||||
if os.Getenv("NOMAD_TEST_SKIPCLEANUP") == "1" {
|
||||
return
|
||||
}
|
||||
|
||||
for _, id := range tc.jobIDs {
|
||||
_, err := e2eutil.Command("nomad", "job", "stop", "-purge", id)
|
||||
f.Assert().NoError(err)
|
||||
}
|
||||
tc.jobIDs = []string{}
|
||||
|
||||
_, err := e2eutil.Command("nomad", "system", "gc")
|
||||
f.Assert().NoError(err)
|
||||
|
||||
// make sure we've waited for all the nodes to come back up
|
||||
e2eutil.WaitForNodesReady(f.T(), tc.Nomad(), len(tc.nodeIDs))
|
||||
}
|
||||
|
||||
// TestDisconnectedClients_AllocReplacement tests that allocations on
|
||||
// disconnected clients are replaced
|
||||
func (tc *DisconnectedClientsE2ETest) TestDisconnectedClients_AllocReplacment(f *framework.F) {
|
||||
jobID := "test-lost-allocs-" + uuid.Generate()[0:8]
|
||||
|
||||
f.NoError(e2eutil.Register(jobID, "disconnectedclients/input/lost_simple.nomad"))
|
||||
tc.jobIDs = append(tc.jobIDs, jobID)
|
||||
f.NoError(e2eutil.WaitForAllocStatusExpected(jobID, ns,
|
||||
[]string{"running", "running"}), "job should be running")
|
||||
|
||||
// pick a node to make our lost node
|
||||
allocs, err := e2eutil.AllocsForJob(jobID, ns)
|
||||
f.NoError(err, "could not query allocs for job")
|
||||
f.Len(allocs, 2, "could not find 2 allocs for job")
|
||||
|
||||
lostAlloc := allocs[0]
|
||||
lostAllocID := lostAlloc["ID"]
|
||||
disconnectedNodeID := lostAlloc["Node ID"]
|
||||
otherAllocID := allocs[0]["ID"]
|
||||
|
||||
restartJobID, err := e2eutil.AgentRestartAfter(disconnectedNodeID, 30*time.Second)
|
||||
f.NoError(err, "expected agent restart job to register")
|
||||
tc.jobIDs = append(tc.jobIDs, restartJobID)
|
||||
|
||||
err = e2eutil.WaitForNodeStatus(disconnectedNodeID, "down", nil)
|
||||
f.NoError(err, "expected node to go down")
|
||||
|
||||
err = waitForAllocStatusMap(jobID, map[string]string{
|
||||
lostAllocID: "lost",
|
||||
otherAllocID: "running",
|
||||
"": "running",
|
||||
}, &e2eutil.WaitConfig{Interval: time.Second, Retries: 60})
|
||||
f.NoError(err, "expected alloc on disconnected client to be marked lost and replaced")
|
||||
|
||||
allocs, err = e2eutil.AllocsForJob(jobID, ns)
|
||||
f.NoError(err, "could not query allocs for job")
|
||||
f.Len(allocs, 3, "could not find 3 allocs for job")
|
||||
|
||||
err = e2eutil.WaitForNodeStatus(disconnectedNodeID, "ready", nil)
|
||||
f.NoError(err, "expected node to come back up")
|
||||
|
||||
err = waitForAllocStatusMap(jobID, map[string]string{
|
||||
lostAllocID: "dead",
|
||||
otherAllocID: "running",
|
||||
"": "running",
|
||||
}, &e2eutil.WaitConfig{Interval: time.Second, Retries: 30})
|
||||
f.NoError(err, "expected lost alloc on reconnected client to be marked dead and replaced")
|
||||
}
|
||||
|
||||
func waitForAllocStatusMap(jobID string, allocsToStatus map[string]string, wc *e2eutil.WaitConfig) error {
|
||||
var err error
|
||||
interval, retries := wc.OrDefault()
|
||||
testutil.WaitForResultRetries(retries, func() (bool, error) {
|
||||
time.Sleep(interval)
|
||||
allocs, err := e2eutil.AllocsForJob(jobID, ns)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, alloc := range allocs {
|
||||
if expectedAllocStatus, ok := allocsToStatus[alloc["ID"]]; ok {
|
||||
if alloc["Status"] != expectedAllocStatus {
|
||||
return false, fmt.Errorf("expected status of alloc %q to be %q, got %q",
|
||||
alloc["ID"], expectedAllocStatus, alloc["Status"])
|
||||
}
|
||||
} else {
|
||||
if alloc["Status"] != allocsToStatus[""] {
|
||||
return false, fmt.Errorf("expected status of alloc %q to be %q, got %q",
|
||||
alloc["ID"], expectedAllocStatus, alloc["Status"])
|
||||
}
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}, func(e error) {
|
||||
err = e
|
||||
})
|
||||
return err
|
||||
}
|
||||
34
e2e/disconnectedclients/input/lost_simple.nomad
Normal file
34
e2e/disconnectedclients/input/lost_simple.nomad
Normal file
@@ -0,0 +1,34 @@
|
||||
job "lost_simple" {
|
||||
|
||||
datacenters = ["dc1", "dc2"]
|
||||
|
||||
group "group" {
|
||||
|
||||
count = 2
|
||||
|
||||
constraint {
|
||||
attribute = "${attr.kernel.name}"
|
||||
value = "linux"
|
||||
}
|
||||
|
||||
constraint {
|
||||
operator = "distinct_hosts"
|
||||
value = "true"
|
||||
}
|
||||
|
||||
task "task" {
|
||||
driver = "docker"
|
||||
|
||||
config {
|
||||
image = "busybox:1"
|
||||
command = "httpd"
|
||||
args = ["-v", "-f", "-p", "8001", "-h", "/var/www"]
|
||||
}
|
||||
|
||||
resources {
|
||||
cpu = 128
|
||||
memory = 128
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
_ "github.com/hashicorp/nomad/e2e/consultemplate"
|
||||
_ "github.com/hashicorp/nomad/e2e/csi"
|
||||
_ "github.com/hashicorp/nomad/e2e/deployment"
|
||||
_ "github.com/hashicorp/nomad/e2e/disconnectedclients"
|
||||
_ "github.com/hashicorp/nomad/e2e/eval_priority"
|
||||
_ "github.com/hashicorp/nomad/e2e/events"
|
||||
_ "github.com/hashicorp/nomad/e2e/example"
|
||||
|
||||
35
e2e/e2eutil/input/restart-node.nomad
Normal file
35
e2e/e2eutil/input/restart-node.nomad
Normal file
@@ -0,0 +1,35 @@
|
||||
variable "nodeID" {
|
||||
type = string
|
||||
}
|
||||
|
||||
variable "time" {
|
||||
type = string
|
||||
default = "0"
|
||||
}
|
||||
|
||||
job "restart-node" {
|
||||
type = "batch"
|
||||
datacenters = ["dc1", "dc2"]
|
||||
|
||||
group "group" {
|
||||
|
||||
constraint {
|
||||
attribute = "${attr.kernel.name}"
|
||||
value = "linux"
|
||||
}
|
||||
constraint {
|
||||
attribute = "${node.unique.id}"
|
||||
value = "${var.nodeID}"
|
||||
}
|
||||
|
||||
task "task" {
|
||||
driver = "raw_exec"
|
||||
config {
|
||||
command = "/bin/sh"
|
||||
args = ["-c",
|
||||
"systemctl stop nomad; sleep ${var.time}; systemctl start nomad"]
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -5,27 +5,33 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/api"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
)
|
||||
|
||||
// AgentRestartAfter is a test helper function that runs a raw_exec
|
||||
// job that will stop a client and restart it after the specified
|
||||
// period of time. The node must be running under systemd.
|
||||
//
|
||||
// Returns once the job is registered with the job ID of the restart
|
||||
// job and any registration errors, not after the duration, so that
|
||||
// callers can take actions while the client is down.
|
||||
func AgentRestartAfter(nodeID string, after time.Duration) (string, error) {
|
||||
jobID := "restart-" + nodeID
|
||||
vars := []string{"-var", "nodeID=" + nodeID}
|
||||
if after > 0 {
|
||||
vars = append(vars, "-var", fmt.Sprintf("time=%d", int(after.Seconds())))
|
||||
}
|
||||
|
||||
err := RegisterWithArgs(jobID, "e2eutil/input/restart-node.nomad", vars...)
|
||||
return jobID, err
|
||||
}
|
||||
|
||||
// AgentRestart is a test helper function that restarts a client node
|
||||
// running under systemd using a raw_exec job. Returns the job ID of
|
||||
// the restart job so that callers can clean it up.
|
||||
func AgentRestart(client *api.Client, nodeID string) (string, error) {
|
||||
ok, err := isUbuntu(client, nodeID)
|
||||
if !ok {
|
||||
// TODO(tgross): we're checking this because we want to use
|
||||
// systemctl to restart the node, but we should also figure
|
||||
// out a way to detect dev mode targets.
|
||||
return "", fmt.Errorf("AgentRestart only works against ubuntu targets")
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
job := newRestartJob(nodeID)
|
||||
jobID := *job.ID
|
||||
_, _, err = client.Jobs().Register(job, nil)
|
||||
jobID, err := AgentRestartAfter(nodeID, 0)
|
||||
if err != nil {
|
||||
return jobID, err
|
||||
}
|
||||
@@ -62,57 +68,6 @@ func AgentRestart(client *api.Client, nodeID string) (string, error) {
|
||||
return jobID, fmt.Errorf("node did not become ready: %v", reasonErr)
|
||||
}
|
||||
|
||||
func isUbuntu(client *api.Client, nodeID string) (bool, error) {
|
||||
node, _, err := client.Nodes().Info(nodeID, nil)
|
||||
if err != nil || node == nil {
|
||||
return false, err
|
||||
}
|
||||
if name, ok := node.Attributes["os.name"]; ok {
|
||||
return name == "ubuntu", nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func newRestartJob(nodeID string) *api.Job {
|
||||
jobType := "batch"
|
||||
name := "restart"
|
||||
jobID := "restart-" + uuid.Generate()[0:8]
|
||||
attempts := 0
|
||||
job := &api.Job{
|
||||
Name: &name,
|
||||
ID: &jobID,
|
||||
Datacenters: []string{"dc1"},
|
||||
Type: &jobType,
|
||||
TaskGroups: []*api.TaskGroup{
|
||||
{
|
||||
Name: &name,
|
||||
Constraints: []*api.Constraint{
|
||||
{
|
||||
LTarget: "${node.unique.id}",
|
||||
RTarget: nodeID,
|
||||
Operand: "=",
|
||||
},
|
||||
},
|
||||
RestartPolicy: &api.RestartPolicy{
|
||||
Attempts: &attempts,
|
||||
},
|
||||
Tasks: []*api.Task{
|
||||
{
|
||||
Name: name,
|
||||
Driver: "raw_exec",
|
||||
Config: map[string]interface{}{
|
||||
"command": "systemctl",
|
||||
"args": []string{"restart", "nomad"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
job.Canonicalize()
|
||||
return job
|
||||
}
|
||||
|
||||
// ListWindowsClientNodes returns a list of Windows client IDs, so that tests
|
||||
// can skip operating-specific tests if there are no Windows clients available.
|
||||
// Returns an error only on client errors.
|
||||
@@ -184,3 +139,29 @@ func NodeStatusListFiltered(filterFn func(string) bool) ([]map[string]string, er
|
||||
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
func WaitForNodeStatus(nodeID, status string, wc *WaitConfig) error {
|
||||
var got string
|
||||
var err error
|
||||
interval, retries := wc.OrDefault()
|
||||
testutil.WaitForResultRetries(retries, func() (bool, error) {
|
||||
time.Sleep(interval)
|
||||
|
||||
nodeStatuses, err := NodeStatusList()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, nodeStatus := range nodeStatuses {
|
||||
if nodeStatus["ID"] == nodeID {
|
||||
got = nodeStatus["Status"]
|
||||
if got == status {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}, func(e error) {
|
||||
err = fmt.Errorf("node status check failed: got %#v", got)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user