Files
nomad/client/drain_test.go
James Rasell d5b2d5078b rpc: Generate node identities with node RPC handlers when needed. (#26165)
When a Nomad client register or re-registers, the RPC handler will
generate and return a node identity if required. When an identity
is generated, the signing key ID will be stored within the node
object, to ensure a root key is not deleted until it is not used.

During normal client operation it will periodically heartbeat to
the Nomad servers to indicate aliveness. The RPC handler that
is used for this action has also been updated to conditionally
perform identity generation. Performing it here means no extra RPC
handlers are required and we inherit the jitter in identity
generation from the heartbeat mechanism.

The identity generation check methods are performed from the RPC
request arguments, so they a scoped to the required behaviour and
can handle the nuance of each RPC. Failure to generate an identity
is considered terminal to the RPC call. The client will include
behaviour to retry this error which is always caused by the
encrypter not being ready unless the servers keyring has been
corrupted.
2025-07-01 16:07:21 +01:00

207 lines
5.7 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package client
import (
"context"
"testing"
"time"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
nstructs "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
// TestClient_SelfDrainConfig is an integration test of the client's Leave
// method that exercises the behavior of the drain_on_shutdown configuration
func TestClient_SelfDrainConfig(t *testing.T) {
ci.Parallel(t)
srv, _, cleanupSRV := testServer(t, nil)
defer cleanupSRV()
testutil.WaitForLeader(t, srv.RPC)
testutil.WaitForKeyring(t, srv.RPC, srv.Region())
c1, cleanupC1 := TestClient(t, func(c *config.Config) {
c.RPCHandler = srv
c.DevMode = false
c.Drain = &config.DrainConfig{
Deadline: 10 * time.Second,
IgnoreSystemJobs: true,
}
})
defer cleanupC1()
jobID := "service-job-" + uuid.Short()
sysJobID := "system-job-" + uuid.Short()
testSelfDrainSetup(t, srv, c1.Node().ID, jobID, sysJobID)
t.Log("setup complete successful, self-draining node")
testCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
errCh := make(chan error)
go func() {
errCh <- c1.Leave()
}()
select {
case err := <-errCh:
must.NoError(t, err)
case <-testCtx.Done():
t.Fatal("expected drain complete before deadline")
}
c1.allocLock.RLock()
defer c1.allocLock.RUnlock()
for _, runner := range c1.allocs {
if runner.Alloc().JobID == sysJobID {
must.Eq(t, structs.AllocClientStatusRunning, runner.AllocState().ClientStatus)
} else {
must.Eq(t, structs.AllocClientStatusComplete, runner.AllocState().ClientStatus)
}
}
}
// TestClient_SelfDrain_FailLocal is an integration test of the client's Leave
// method that exercises the behavior when the client loses connection with the
// server
func TestClient_SelfDrain_FailLocal(t *testing.T) {
ci.Parallel(t)
srv, _, cleanupSRV := testServer(t, nil)
defer cleanupSRV()
testutil.WaitForLeader(t, srv.RPC)
testutil.WaitForKeyring(t, srv.RPC, srv.Region())
c1, cleanupC1 := TestClient(t, func(c *config.Config) {
c.RPCHandler = srv
c.DevMode = false
c.Drain = &config.DrainConfig{Deadline: 5 * time.Second}
})
defer cleanupC1()
jobID := "service-job-" + uuid.Short()
sysJobID := "system-job-" + uuid.Short()
testSelfDrainSetup(t, srv, c1.Node().ID, jobID, sysJobID)
t.Log("setup complete successful, self-draining node and disconnecting node from server")
// note: this timeout has to cover the drain deadline plus the RPC timeout
// when we fail to make the RPC to the leader
testCtx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()
errCh := make(chan error)
go func() {
errCh <- c1.Leave()
}()
// We want to disconnect the server so that self-drain is forced to fallback
// to local drain behavior. But if we disconnect the server before we start
// the self-drain, the drain won't happen at all. So this attempts to
// interleave disconnecting the server between when the drain starts and the
// server marks the drain successful.
go func() {
req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var out structs.SingleNodeResponse
for {
select {
case <-testCtx.Done():
return
default:
}
err := srv.RPC("Node.GetNode", &req, &out)
must.NoError(t, err)
if out.Node.DrainStrategy != nil {
cleanupSRV()
return
} else if out.Node.LastDrain != nil {
return // the drain is already complete
}
}
}()
select {
case err := <-errCh:
if err != nil {
// we might not be able to interleave the disconnection, so it's
// possible the Leave works just fine
must.EqError(t, err, "self-drain exceeded deadline")
}
case <-testCtx.Done():
t.Fatal("expected drain complete before test timeout")
}
}
func testSelfDrainSetup(t *testing.T, srv *nomad.Server, nodeID, jobID, sysJobID string) {
req := structs.NodeSpecificRequest{
NodeID: nodeID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var out structs.SingleNodeResponse
// Wait for the node to register before we drain
must.Wait(t, wait.InitialSuccess(
wait.BoolFunc(func() bool {
err := srv.RPC("Node.GetNode", &req, &out)
must.NoError(t, err)
return out.Node != nil
}),
wait.Timeout(5*time.Second),
wait.Gap(10*time.Millisecond),
))
// Run a job that starts quickly
job := mock.Job()
job.ID = jobID
job.Constraints = nil
job.TaskGroups[0].Constraints = nil
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Migrate = nstructs.DefaultMigrateStrategy()
job.TaskGroups[0].Migrate.MinHealthyTime = 100 * time.Millisecond
job.TaskGroups[0].Networks = []*structs.NetworkResource{}
job.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "mock",
Driver: "mock_driver",
Config: map[string]interface{}{"run_for": "1m"},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 50,
MemoryMB: 25,
},
}
testutil.WaitForRunning(t, srv.RPC, job.Copy())
sysJob := mock.SystemJob()
sysJob.ID = sysJobID
sysJob.Constraints = nil
sysJob.TaskGroups[0].Constraints = nil
sysJob.TaskGroups[0].Networks = []*structs.NetworkResource{}
sysJob.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "mock",
Driver: "mock_driver",
Config: map[string]interface{}{"run_for": "1m"},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 50,
MemoryMB: 25,
},
}
testutil.WaitForRunning(t, srv.RPC, sysJob.Copy())
}