mirror of
https://github.com/kemko/nomad.git
synced 2026-01-03 17:05:43 +03:00
Adds a new configuration to clients to optionally allow them to drain their workloads on shutdown. The client sends the `Node.UpdateDrain` RPC targeting itself and then monitors the drain state as seen by the server until the drain is complete or the deadline expires. If it loses connection with the server, it will monitor local client status instead to ensure allocations are stopped before exiting.
202 lines
5.5 KiB
Go
202 lines
5.5 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/shoenig/test/must"
|
|
"github.com/shoenig/test/wait"
|
|
|
|
"github.com/hashicorp/nomad/ci"
|
|
"github.com/hashicorp/nomad/client/config"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
nstructs "github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
)
|
|
|
|
// TestClient_SelfDrainConfig is an integration test of the client's Leave
|
|
// method that exercises the behavior of the drain_on_shutdown configuration
|
|
func TestClient_SelfDrainConfig(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
srv, _, cleanupSRV := testServer(t, nil)
|
|
defer cleanupSRV()
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
|
|
c1, cleanupC1 := TestClient(t, func(c *config.Config) {
|
|
c.RPCHandler = srv
|
|
c.DevMode = false
|
|
c.Drain = &config.DrainConfig{
|
|
Deadline: 10 * time.Second,
|
|
IgnoreSystemJobs: true,
|
|
}
|
|
})
|
|
defer cleanupC1()
|
|
|
|
jobID := "service-job-" + uuid.Short()
|
|
sysJobID := "system-job-" + uuid.Short()
|
|
testSelfDrainSetup(t, srv, c1.Node().ID, jobID, sysJobID)
|
|
t.Log("setup complete successful, self-draining node")
|
|
|
|
testCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
|
defer cancel()
|
|
|
|
errCh := make(chan error)
|
|
go func() {
|
|
errCh <- c1.Leave()
|
|
}()
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
must.NoError(t, err)
|
|
case <-testCtx.Done():
|
|
t.Fatal("expected drain complete before deadline")
|
|
}
|
|
|
|
c1.allocLock.RLock()
|
|
defer c1.allocLock.RUnlock()
|
|
for _, runner := range c1.allocs {
|
|
if runner.Alloc().JobID == sysJobID {
|
|
must.Eq(t, structs.AllocClientStatusRunning, runner.AllocState().ClientStatus)
|
|
} else {
|
|
must.Eq(t, structs.AllocClientStatusComplete, runner.AllocState().ClientStatus)
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// TestClient_SelfDrain_FailLocal is an integration test of the client's Leave
|
|
// method that exercises the behavior when the client loses connection with the
|
|
// server
|
|
func TestClient_SelfDrain_FailLocal(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
srv, _, cleanupSRV := testServer(t, nil)
|
|
defer cleanupSRV()
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
|
|
c1, cleanupC1 := TestClient(t, func(c *config.Config) {
|
|
c.RPCHandler = srv
|
|
c.DevMode = false
|
|
c.Drain = &config.DrainConfig{Deadline: 5 * time.Second}
|
|
})
|
|
defer cleanupC1()
|
|
|
|
jobID := "service-job-" + uuid.Short()
|
|
sysJobID := "system-job-" + uuid.Short()
|
|
testSelfDrainSetup(t, srv, c1.Node().ID, jobID, sysJobID)
|
|
|
|
t.Log("setup complete successful, self-draining node and disconnecting node from server")
|
|
|
|
// note: this timeout has to cover the drain deadline plus the RPC timeout
|
|
// when we fail to make the RPC to the leader
|
|
testCtx, cancel := context.WithTimeout(context.Background(), time.Second*20)
|
|
defer cancel()
|
|
|
|
errCh := make(chan error)
|
|
go func() {
|
|
errCh <- c1.Leave()
|
|
}()
|
|
|
|
// We want to disconnect the server so that self-drain is forced to fallback
|
|
// to local drain behavior. But if we disconnect the server before we start
|
|
// the self-drain, the drain won't happen at all. So this attempts to
|
|
// interleave disconnecting the server between when the drain starts and the
|
|
// server marks the drain successful.
|
|
go func() {
|
|
req := structs.NodeSpecificRequest{
|
|
NodeID: c1.Node().ID,
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
var out structs.SingleNodeResponse
|
|
for {
|
|
select {
|
|
case <-testCtx.Done():
|
|
return
|
|
default:
|
|
}
|
|
err := srv.RPC("Node.GetNode", &req, &out)
|
|
must.NoError(t, err)
|
|
if out.Node.DrainStrategy != nil {
|
|
cleanupSRV()
|
|
return
|
|
} else if out.Node.LastDrain != nil {
|
|
return // the drain is already complete
|
|
}
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
if err != nil {
|
|
// we might not be able to interleave the disconnection, so it's
|
|
// possible the Leave works just fine
|
|
must.EqError(t, err, "self-drain exceeded deadline")
|
|
}
|
|
case <-testCtx.Done():
|
|
t.Fatal("expected drain complete before test timeout")
|
|
}
|
|
}
|
|
|
|
func testSelfDrainSetup(t *testing.T, srv *nomad.Server, nodeID, jobID, sysJobID string) {
|
|
req := structs.NodeSpecificRequest{
|
|
NodeID: nodeID,
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
var out structs.SingleNodeResponse
|
|
|
|
// Wait for the node to register before we drain
|
|
must.Wait(t, wait.InitialSuccess(
|
|
wait.BoolFunc(func() bool {
|
|
err := srv.RPC("Node.GetNode", &req, &out)
|
|
must.NoError(t, err)
|
|
return out.Node != nil
|
|
}),
|
|
wait.Timeout(5*time.Second),
|
|
wait.Gap(10*time.Millisecond),
|
|
))
|
|
|
|
// Run a job that starts quickly
|
|
job := mock.Job()
|
|
job.ID = jobID
|
|
job.Constraints = nil
|
|
job.TaskGroups[0].Constraints = nil
|
|
job.TaskGroups[0].Count = 1
|
|
job.TaskGroups[0].Migrate = nstructs.DefaultMigrateStrategy()
|
|
job.TaskGroups[0].Migrate.MinHealthyTime = 100 * time.Millisecond
|
|
job.TaskGroups[0].Networks = []*structs.NetworkResource{}
|
|
job.TaskGroups[0].Tasks[0] = &structs.Task{
|
|
Name: "mock",
|
|
Driver: "mock_driver",
|
|
Config: map[string]interface{}{"run_for": "1m"},
|
|
LogConfig: structs.DefaultLogConfig(),
|
|
Resources: &structs.Resources{
|
|
CPU: 50,
|
|
MemoryMB: 25,
|
|
},
|
|
}
|
|
testutil.WaitForRunning(t, srv.RPC, job.Copy())
|
|
|
|
sysJob := mock.SystemJob()
|
|
sysJob.ID = sysJobID
|
|
sysJob.Constraints = nil
|
|
sysJob.TaskGroups[0].Constraints = nil
|
|
sysJob.TaskGroups[0].Networks = []*structs.NetworkResource{}
|
|
sysJob.TaskGroups[0].Tasks[0] = &structs.Task{
|
|
Name: "mock",
|
|
Driver: "mock_driver",
|
|
Config: map[string]interface{}{"run_for": "1m"},
|
|
LogConfig: structs.DefaultLogConfig(),
|
|
Resources: &structs.Resources{
|
|
CPU: 50,
|
|
MemoryMB: 25,
|
|
},
|
|
}
|
|
testutil.WaitForRunning(t, srv.RPC, sysJob.Copy())
|
|
|
|
}
|