mirror of
https://github.com/kemko/nomad.git
synced 2026-01-08 11:25:41 +03:00
E2E: test for nodes disconnected by netsplit (#12407)
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/e2e/e2eutil"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
@@ -17,13 +18,151 @@ const ns = ""
|
||||
var wait30s = &e2eutil.WaitConfig{Interval: time.Second, Retries: 30}
|
||||
var wait60s = &e2eutil.WaitConfig{Interval: time.Second, Retries: 60}
|
||||
|
||||
func TestDisconnectedClients(t *testing.T) {
|
||||
nomad := e2eutil.NomadClient(t)
|
||||
type expectedAllocStatus struct {
|
||||
disconnected string
|
||||
unchanged string
|
||||
replacement string
|
||||
}
|
||||
|
||||
func TestDisconnectedClients(t *testing.T) {
|
||||
|
||||
nomad := e2eutil.NomadClient(t)
|
||||
e2eutil.WaitForLeader(t, nomad)
|
||||
e2eutil.WaitForNodesReady(t, nomad, 2) // needs at least 2 to test replacement
|
||||
|
||||
t.Run("AllocReplacementOnShutdown", testDisconnected_AllocReplacementOnShutdown)
|
||||
testCases := []struct {
|
||||
name string
|
||||
jobFile string
|
||||
disconnectFn func(string, time.Duration) (string, error)
|
||||
expectedAfterDisconnect expectedAllocStatus
|
||||
expectedAfterReconnect expectedAllocStatus
|
||||
}{
|
||||
{
|
||||
// test that allocations on clients that are netsplit and
|
||||
// marked disconnected are replaced
|
||||
name: "netsplit client no max disconnect",
|
||||
jobFile: "./input/lost_simple.nomad",
|
||||
disconnectFn: e2eutil.AgentDisconnect,
|
||||
expectedAfterDisconnect: expectedAllocStatus{
|
||||
disconnected: "lost",
|
||||
unchanged: "running",
|
||||
replacement: "running",
|
||||
},
|
||||
expectedAfterReconnect: expectedAllocStatus{
|
||||
disconnected: "complete",
|
||||
unchanged: "running",
|
||||
replacement: "running",
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
// test that allocations on clients that are netsplit and
|
||||
// marked disconnected are replaced but that the
|
||||
// replacements are rolled back after reconnection
|
||||
name: "netsplit client with max disconnect",
|
||||
jobFile: "./input/lost_max_disconnect.nomad",
|
||||
disconnectFn: e2eutil.AgentDisconnect,
|
||||
expectedAfterDisconnect: expectedAllocStatus{
|
||||
disconnected: "unknown",
|
||||
unchanged: "running",
|
||||
replacement: "running",
|
||||
},
|
||||
expectedAfterReconnect: expectedAllocStatus{
|
||||
disconnected: "running",
|
||||
unchanged: "running",
|
||||
replacement: "complete",
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
// test that allocations on clients that are shutdown and
|
||||
// marked disconnected are replaced
|
||||
name: "shutdown client no max disconnect",
|
||||
jobFile: "./input/lost_simple.nomad",
|
||||
disconnectFn: e2eutil.AgentDisconnect,
|
||||
expectedAfterDisconnect: expectedAllocStatus{
|
||||
disconnected: "lost",
|
||||
unchanged: "running",
|
||||
replacement: "running",
|
||||
},
|
||||
expectedAfterReconnect: expectedAllocStatus{
|
||||
disconnected: "complete",
|
||||
unchanged: "running",
|
||||
replacement: "running",
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
// test that allocations on clients that are shutdown and
|
||||
// marked disconnected are replaced
|
||||
name: "shutdown client with max disconnect",
|
||||
jobFile: "./input/lost_max_disconnect.nomad",
|
||||
disconnectFn: e2eutil.AgentDisconnect,
|
||||
expectedAfterDisconnect: expectedAllocStatus{
|
||||
disconnected: "unknown",
|
||||
unchanged: "running",
|
||||
replacement: "running",
|
||||
},
|
||||
expectedAfterReconnect: expectedAllocStatus{
|
||||
disconnected: "running",
|
||||
unchanged: "running",
|
||||
replacement: "complete",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
jobIDs := []string{}
|
||||
t.Cleanup(disconnectedClientsCleanup(t))
|
||||
t.Cleanup(e2eutil.CleanupJobsAndGC(t, &jobIDs))
|
||||
|
||||
jobID := "test-disconnected-clients-" + uuid.Short()
|
||||
|
||||
err := e2eutil.Register(jobID, tc.jobFile)
|
||||
require.NoError(t, err)
|
||||
jobIDs = append(jobIDs, jobID)
|
||||
|
||||
err = e2eutil.WaitForAllocStatusExpected(jobID, ns,
|
||||
[]string{"running", "running"})
|
||||
require.NoError(t, err, "job should be running")
|
||||
|
||||
// pick one alloc to make our disconnected alloc (and its node)
|
||||
allocs, err := e2eutil.AllocsForJob(jobID, ns)
|
||||
require.NoError(t, err, "could not query allocs for job")
|
||||
require.Len(t, allocs, 2, "could not find 2 allocs for job")
|
||||
|
||||
disconnectedAllocID := allocs[0]["ID"]
|
||||
disconnectedNodeID := allocs[0]["Node ID"]
|
||||
unchangedAllocID := allocs[1]["ID"]
|
||||
|
||||
// disconnect the node and wait for the results
|
||||
|
||||
restartJobID, err := tc.disconnectFn(disconnectedNodeID, 30*time.Second)
|
||||
require.NoError(t, err, "expected agent disconnect job to register")
|
||||
jobIDs = append(jobIDs, restartJobID)
|
||||
|
||||
err = e2eutil.WaitForNodeStatus(disconnectedNodeID, "disconnected", wait60s)
|
||||
require.NoError(t, err, "expected node to go down")
|
||||
|
||||
require.NoError(t, waitForAllocStatusMap(
|
||||
jobID, disconnectedAllocID, unchangedAllocID, tc.expectedAfterDisconnect, wait60s))
|
||||
|
||||
allocs, err = e2eutil.AllocsForJob(jobID, ns)
|
||||
require.NoError(t, err, "could not query allocs for job")
|
||||
require.Len(t, allocs, 3, "could not find 3 allocs for job")
|
||||
|
||||
// wait for the reconnect and wait for the results
|
||||
|
||||
err = e2eutil.WaitForNodeStatus(disconnectedNodeID, "ready", wait30s)
|
||||
require.NoError(t, err, "expected node to come back up")
|
||||
require.NoError(t, waitForAllocStatusMap(
|
||||
jobID, disconnectedAllocID, unchangedAllocID, tc.expectedAfterReconnect, wait60s))
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// disconnectedClientsCleanup sets up a cleanup function to make sure
|
||||
@@ -37,68 +176,12 @@ func disconnectedClientsCleanup(t *testing.T) func() {
|
||||
}
|
||||
return func() {
|
||||
nomad := e2eutil.NomadClient(t)
|
||||
t.Logf("waiting for %d nodes to become ready again", len(nodeIDs))
|
||||
e2eutil.WaitForNodesReady(t, nomad, len(nodeIDs))
|
||||
}
|
||||
}
|
||||
|
||||
// testDisconnected_AllocReplacementOnShutdown tests that allocations on
|
||||
// clients that are shut down and marked disconnected are replaced
|
||||
func testDisconnected_AllocReplacementOnShutdown(t *testing.T) {
|
||||
|
||||
jobIDs := []string{}
|
||||
t.Cleanup(disconnectedClientsCleanup(t))
|
||||
t.Cleanup(e2eutil.CleanupJobsAndGC(t, &jobIDs))
|
||||
|
||||
jobID := "test-lost-allocs-" + uuid.Short()
|
||||
|
||||
err := e2eutil.Register(jobID, "./input/lost_simple.nomad")
|
||||
require.NoError(t, err)
|
||||
jobIDs = append(jobIDs, jobID)
|
||||
|
||||
err = e2eutil.WaitForAllocStatusExpected(jobID, ns,
|
||||
[]string{"running", "running"})
|
||||
require.NoError(t, err, "job should be running")
|
||||
|
||||
// pick a node to make our lost node
|
||||
allocs, err := e2eutil.AllocsForJob(jobID, ns)
|
||||
require.NoError(t, err, "could not query allocs for job")
|
||||
require.Len(t, allocs, 2, "could not find 2 allocs for job")
|
||||
|
||||
lostAlloc := allocs[0]
|
||||
lostAllocID := lostAlloc["ID"]
|
||||
disconnectedNodeID := lostAlloc["Node ID"]
|
||||
otherAllocID := allocs[1]["ID"]
|
||||
|
||||
restartJobID, err := e2eutil.AgentRestartAfter(disconnectedNodeID, 30*time.Second)
|
||||
require.NoError(t, err, "expected agent restart job to register")
|
||||
jobIDs = append(jobIDs, restartJobID)
|
||||
|
||||
err = e2eutil.WaitForNodeStatus(disconnectedNodeID, "down", wait30s)
|
||||
require.NoError(t, err, "expected node to go down")
|
||||
|
||||
err = waitForAllocStatusMap(jobID, map[string]string{
|
||||
lostAllocID: "lost",
|
||||
otherAllocID: "running",
|
||||
"": "running",
|
||||
}, wait60s)
|
||||
require.NoError(t, err, "expected alloc on disconnected client to be marked lost and replaced")
|
||||
|
||||
allocs, err = e2eutil.AllocsForJob(jobID, ns)
|
||||
require.NoError(t, err, "could not query allocs for job")
|
||||
require.Len(t, allocs, 3, "could not find 3 allocs for job")
|
||||
|
||||
err = e2eutil.WaitForNodeStatus(disconnectedNodeID, "ready", wait30s)
|
||||
require.NoError(t, err, "expected node to come back up")
|
||||
|
||||
err = waitForAllocStatusMap(jobID, map[string]string{
|
||||
lostAllocID: "complete",
|
||||
otherAllocID: "running",
|
||||
"": "running",
|
||||
}, wait30s)
|
||||
require.NoError(t, err, "expected lost alloc on reconnected client to be marked complete and replaced")
|
||||
}
|
||||
|
||||
func waitForAllocStatusMap(jobID string, allocsToStatus map[string]string, wc *e2eutil.WaitConfig) error {
|
||||
func waitForAllocStatusMap(jobID, disconnectedAllocID, unchangedAllocID string, expected expectedAllocStatus, wc *e2eutil.WaitConfig) error {
|
||||
var err error
|
||||
interval, retries := wc.OrDefault()
|
||||
testutil.WaitForResultRetries(retries, func() (bool, error) {
|
||||
@@ -107,19 +190,35 @@ func waitForAllocStatusMap(jobID string, allocsToStatus map[string]string, wc *e
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
var merr *multierror.Error
|
||||
|
||||
for _, alloc := range allocs {
|
||||
if expectedAllocStatus, ok := allocsToStatus[alloc["ID"]]; ok {
|
||||
if alloc["Status"] != expectedAllocStatus {
|
||||
return false, fmt.Errorf("expected status of alloc %q to be %q, got %q",
|
||||
alloc["ID"], expectedAllocStatus, alloc["Status"])
|
||||
switch allocID, allocStatus := alloc["ID"], alloc["Status"]; allocID {
|
||||
case disconnectedAllocID:
|
||||
if allocStatus != expected.disconnected {
|
||||
merr = multierror.Append(merr, fmt.Errorf(
|
||||
"disconnected alloc should be %q, got %q",
|
||||
expected.disconnected, allocStatus))
|
||||
}
|
||||
} else {
|
||||
if alloc["Status"] != allocsToStatus[""] {
|
||||
return false, fmt.Errorf("expected status of alloc %q to be %q, got %q",
|
||||
alloc["ID"], expectedAllocStatus, alloc["Status"])
|
||||
case unchangedAllocID:
|
||||
if allocStatus != expected.unchanged {
|
||||
merr = multierror.Append(merr, fmt.Errorf(
|
||||
"unchanged alloc should be %q, got %q",
|
||||
expected.unchanged, allocStatus))
|
||||
}
|
||||
default:
|
||||
if allocStatus != expected.replacement {
|
||||
merr = multierror.Append(merr, fmt.Errorf(
|
||||
"replacement alloc should be %q, got %q",
|
||||
expected.replacement, allocStatus))
|
||||
}
|
||||
}
|
||||
}
|
||||
if merr != nil {
|
||||
return false, merr.ErrorOrNil()
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(e error) {
|
||||
err = e
|
||||
|
||||
37
e2e/disconnectedclients/input/lost_max_disconnect.nomad
Normal file
37
e2e/disconnectedclients/input/lost_max_disconnect.nomad
Normal file
@@ -0,0 +1,37 @@
|
||||
job "lost_max_disconnect" {
|
||||
|
||||
datacenters = ["dc1", "dc2"]
|
||||
|
||||
group "group" {
|
||||
|
||||
max_client_disconnect = "1h"
|
||||
|
||||
count = 2
|
||||
|
||||
constraint {
|
||||
attribute = "${attr.kernel.name}"
|
||||
value = "linux"
|
||||
}
|
||||
|
||||
constraint {
|
||||
operator = "distinct_hosts"
|
||||
value = "true"
|
||||
}
|
||||
|
||||
task "task" {
|
||||
driver = "docker"
|
||||
|
||||
config {
|
||||
image = "busybox:1"
|
||||
command = "httpd"
|
||||
args = ["-v", "-f", "-p", "8001", "-h", "/var/www"]
|
||||
}
|
||||
|
||||
resources {
|
||||
cpu = 128
|
||||
memory = 128
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
42
e2e/e2eutil/input/disconnect-node.nomad
Normal file
42
e2e/e2eutil/input/disconnect-node.nomad
Normal file
@@ -0,0 +1,42 @@
|
||||
variable "nodeID" {
|
||||
type = string
|
||||
}
|
||||
|
||||
variable "time" {
|
||||
type = string
|
||||
default = "0"
|
||||
}
|
||||
|
||||
job "disconnect-node" {
|
||||
type = "batch"
|
||||
datacenters = ["dc1", "dc2"]
|
||||
|
||||
group "group" {
|
||||
|
||||
# need to prevent the task from being restarted on reconnect, if
|
||||
# we're stopped long enough for the node to be marked down
|
||||
max_client_disconnect = "1h"
|
||||
|
||||
constraint {
|
||||
attribute = "${attr.kernel.name}"
|
||||
value = "linux"
|
||||
}
|
||||
constraint {
|
||||
attribute = "${node.unique.id}"
|
||||
value = "${var.nodeID}"
|
||||
}
|
||||
|
||||
task "task" {
|
||||
driver = "raw_exec"
|
||||
config {
|
||||
command = "/bin/sh"
|
||||
args = ["-c",
|
||||
# before disconnecting, we need to sleep long enough for the
|
||||
# task to register itself, otherwise we end up trying to
|
||||
# re-run the task immediately on reconnect
|
||||
"sleep 5; iptables -I OUTPUT -p tcp --dport 4647 -j DROP; sleep ${var.time}; iptables -D OUTPUT -p tcp --dport 4647 -j DROP"]
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,10 @@ job "restart-node" {
|
||||
|
||||
group "group" {
|
||||
|
||||
# need to prevent the task from being restarted on reconnect, if
|
||||
# we're stopped long enough for the node to be marked down
|
||||
max_client_disconnect = "1h"
|
||||
|
||||
constraint {
|
||||
attribute = "${attr.kernel.name}"
|
||||
value = "linux"
|
||||
@@ -27,7 +31,10 @@ job "restart-node" {
|
||||
config {
|
||||
command = "/bin/sh"
|
||||
args = ["-c",
|
||||
"systemctl stop nomad; sleep ${var.time}; systemctl start nomad"]
|
||||
# before disconnecting, we need to sleep long enough for the
|
||||
# task to register itself, otherwise we end up trying to
|
||||
# re-run the task immediately on reconnect
|
||||
"sleep 5; systemctl stop nomad; sleep ${var.time}; systemctl start nomad"]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,37 @@ import (
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
)
|
||||
|
||||
// AgentDisconnect is a test helper function that runs a raw_exec job
|
||||
// that will disconnect a client at the network level and reconnect it
|
||||
// after the specified period of time.
|
||||
//
|
||||
// Returns once the job is registered with the job ID of the restart
|
||||
// job and any registration errors, not after the duration, so that
|
||||
// callers can take actions while the client is down.
|
||||
func AgentDisconnect(nodeID string, after time.Duration) (string, error) {
|
||||
jobID := "disconnect-" + nodeID
|
||||
vars := []string{"-var", "nodeID=" + nodeID}
|
||||
if after > 0 {
|
||||
vars = append(vars, "-var", fmt.Sprintf("time=%d", int(after.Seconds())))
|
||||
}
|
||||
|
||||
jobFilePath := "../e2eutil/input/disconnect-node.nomad"
|
||||
|
||||
// TODO: temporary hack around having older tests running on the
|
||||
// framework vs new tests not, as the framework has a different
|
||||
// working directory
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if filepath.Base(dir) == "e2e" {
|
||||
jobFilePath = "e2eutil/input/disconnect-node.nomad"
|
||||
}
|
||||
|
||||
err = RegisterWithArgs(jobID, jobFilePath, vars...)
|
||||
return jobID, err
|
||||
}
|
||||
|
||||
// AgentRestartAfter is a test helper function that runs a raw_exec
|
||||
// job that will stop a client and restart it after the specified
|
||||
// period of time. The node must be running under systemd.
|
||||
|
||||
Reference in New Issue
Block a user