node pools: protect against deleting occupied pools (#17457)

We don't want to delete node pools that have nodes or non-terminal jobs. Add a
check in the `DeleteNodePools` RPC to check locally and in federated regions,
similar to how we check that it's safe to delete namespaces.
This commit is contained in:
Tim Gross
2023-06-13 09:57:42 -04:00
committed by GitHub
parent 887d3060c4
commit 2c77bf72f1
2 changed files with 230 additions and 10 deletions

View File

@@ -11,6 +11,7 @@ import (
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
@@ -257,17 +258,115 @@ func (n *NodePool) DeleteNodePools(args *structs.NodePoolDeleteRequest, reply *s
}
}
// TODO(luiz): verify that the node pool is not being used.
// Verify that the node pools we're deleting do not have nodes or
// non-terminal jobs in this region or in any federated region.
var mErr multierror.Error
for _, name := range args.Names {
regionsWithNonTerminal, regionsWithNodes, err := n.nodePoolRegionsInUse(args.AuthToken, name)
if err != nil {
_ = multierror.Append(&mErr, err)
}
if len(regionsWithNonTerminal) != 0 {
_ = multierror.Append(&mErr, fmt.Errorf(
"node pool %q has non-terminal jobs in regions: %v", name, regionsWithNonTerminal))
}
if len(regionsWithNodes) != 0 {
_ = multierror.Append(&mErr, fmt.Errorf(
"node pool %q has nodes in regions: %v", name, regionsWithNodes))
}
}
if err := mErr.ErrorOrNil(); err != nil {
return err
}
// Delete via Raft.
_, index, err := n.srv.raftApply(structs.NodePoolDeleteRequestType, args)
if err != nil {
return err
}
reply.Index = index
return nil
}
// nodePoolRegionsInUse returns a list of regions where the node pool is still
// in use for non-terminal jobs, and a list of regions where it is in use by
// nodes.
func (n *NodePool) nodePoolRegionsInUse(token, poolName string) ([]string, []string, error) {
regions := n.srv.Regions()
thisRegion := n.srv.Region()
hasNodes := make([]string, 0, len(regions))
hasNonTerminal := make([]string, 0, len(regions))
// Check if the pool in use in this region
snap, err := n.srv.State().Snapshot()
if err != nil {
return nil, nil, err
}
iter, err := snap.NodesByNodePool(nil, poolName)
if err != nil {
return nil, nil, err
}
found := iter.Next()
if found != nil {
hasNodes = append(hasNodes, thisRegion)
}
iter, err = snap.JobsByPool(nil, poolName)
for raw := iter.Next(); raw != nil; raw = iter.Next() {
job := raw.(*structs.Job)
if job.Status != structs.JobStatusDead {
hasNonTerminal = append(hasNonTerminal, thisRegion)
break
}
}
for _, region := range regions {
if region == thisRegion {
continue
}
nodesReq := &structs.NodePoolNodesRequest{
Name: poolName,
QueryOptions: structs.QueryOptions{
Region: region,
AuthToken: token,
PerPage: 1, // we only care if there are any
},
}
var nodesResp structs.NodePoolNodesResponse
err := n.srv.RPC("NodePool.ListNodes", nodesReq, &nodesResp)
if err != nil {
return hasNodes, hasNonTerminal, err
}
if len(nodesResp.Nodes) != 0 {
hasNodes = append(hasNodes, region)
}
jobsReq := &structs.NodePoolJobsRequest{
Name: poolName,
QueryOptions: structs.QueryOptions{
Region: region,
AuthToken: token,
PerPage: 1, // we only care if there are any
Filter: `Status != "dead"`,
},
}
var jobsResp structs.NodePoolJobsResponse
err = n.srv.RPC("NodePool.ListJobs", jobsReq, &jobsResp)
if err != nil {
return hasNodes, hasNonTerminal, err
}
if len(jobsResp.Jobs) != 0 {
hasNonTerminal = append(hasNonTerminal, region)
}
}
return hasNonTerminal, hasNodes, err
}
// ListJobs is used to retrieve a list of jobs for a given node pool. It supports
// pagination and filtering.
func (n *NodePool) ListJobs(args *structs.NodePoolJobsRequest, reply *structs.NodePoolJobsResponse) error {

View File

@@ -883,6 +883,7 @@ func TestNodePoolEndpoint_DeleteNodePools(t *testing.T) {
defer cleanupS()
codec := rpcClient(t, s)
store := s.fsm.State()
testutil.WaitForLeader(t, s.RPC)
// Insert a few node pools that we can delete.
@@ -891,6 +892,16 @@ func TestNodePoolEndpoint_DeleteNodePools(t *testing.T) {
pools = append(pools, mock.NodePool())
}
// Insert a node and job to block deleting
node := mock.Node()
node.NodePool = pools[3].Name
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node))
job := mock.MinJob()
job.NodePool = pools[4].Name
job.Status = structs.JobStatusRunning
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job))
testCases := []struct {
name string
pools []string
@@ -907,6 +918,18 @@ func TestNodePoolEndpoint_DeleteNodePools(t *testing.T) {
pools[2].Name,
},
},
{
name: "delete pool occupied by node",
pools: []string{pools[3].Name},
expectedErr: fmt.Sprintf(
"node pool %q has nodes in regions: [global]", pools[3].Name),
},
{
name: "delete pool occupied by job",
pools: []string{pools[4].Name},
expectedErr: fmt.Sprintf(
"node pool %q has non-terminal jobs in regions: [global]", pools[4].Name),
},
{
name: "pool doesn't exist",
pools: []string{"doesnt-exist"},
@@ -936,7 +959,7 @@ func TestNodePoolEndpoint_DeleteNodePools(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools)
err := store.UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools)
must.NoError(t, err)
req := &structs.NodePoolDeleteRequest{
@@ -955,7 +978,7 @@ func TestNodePoolEndpoint_DeleteNodePools(t *testing.T) {
for _, pool := range tc.pools {
ws := memdb.NewWatchSet()
got, err := s.fsm.State().NodePoolByName(ws, pool)
got, err := store.NodePoolByName(ws, pool)
must.NoError(t, err)
must.Nil(t, got)
}
@@ -971,20 +994,21 @@ func TestNodePoolEndpoint_DeleteNodePools_ACL(t *testing.T) {
defer cleanupS()
codec := rpcClient(t, s)
store := s.fsm.State()
testutil.WaitForLeader(t, s.RPC)
// Create test ACL tokens.
devToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1001, "dev-node-pools",
devToken := mock.CreatePolicyAndToken(t, store, 1001, "dev-node-pools",
mock.NodePoolPolicy("dev-*", "write", nil),
)
devSpecificToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1003, "dev-1-node-pools",
devSpecificToken := mock.CreatePolicyAndToken(t, store, 1003, "dev-1-node-pools",
mock.NodePoolPolicy("dev-1", "write", nil),
)
prodToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1005, "prod-node-pools",
prodToken := mock.CreatePolicyAndToken(t, store, 1005, "prod-node-pools",
mock.NodePoolPolicy("prod-*", "", []string{"delete"}),
)
noPolicyToken := mock.CreateToken(t, s.fsm.State(), 1007, nil)
noDeleteToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1009, "node-pools-no-delete",
noPolicyToken := mock.CreateToken(t, store, 1007, nil)
noDeleteToken := mock.CreatePolicyAndToken(t, store, 1009, "node-pools-no-delete",
mock.NodePoolPolicy("*", "", []string{"read", "write"}),
)
@@ -1004,6 +1028,16 @@ func TestNodePoolEndpoint_DeleteNodePools_ACL(t *testing.T) {
pools = append(pools, qaPool)
}
// Insert a node and job to block deleting
node := mock.Node()
node.NodePool = "prod-3"
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node))
job := mock.MinJob()
job.NodePool = "prod-4"
job.Status = structs.JobStatusRunning
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job))
testCases := []struct {
name string
token string
@@ -1067,11 +1101,23 @@ func TestNodePoolEndpoint_DeleteNodePools_ACL(t *testing.T) {
pools: []string{"dev-1"},
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "no delete pool occupied by node",
token: root.SecretID,
pools: []string{"prod-3"},
expectedErr: "node pool \"prod-3\" has nodes in regions: [global]",
},
{
name: "no delete pool occupied by job",
token: root.SecretID,
pools: []string{"prod-4"},
expectedErr: "node pool \"prod-4\" has non-terminal jobs in regions: [global]",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools)
err := store.UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools)
must.NoError(t, err)
req := &structs.NodePoolDeleteRequest{
@@ -1091,7 +1137,7 @@ func TestNodePoolEndpoint_DeleteNodePools_ACL(t *testing.T) {
for _, pool := range tc.pools {
ws := memdb.NewWatchSet()
got, err := s.fsm.State().NodePoolByName(ws, pool)
got, err := store.NodePoolByName(ws, pool)
must.NoError(t, err)
must.Nil(t, got)
}
@@ -1100,6 +1146,81 @@ func TestNodePoolEndpoint_DeleteNodePools_ACL(t *testing.T) {
}
}
func TestNodePoolEndpoint_DeleteNodePools_NonLocal(t *testing.T) {
ci.Parallel(t)
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.Region = "region1"
c.AuthoritativeRegion = "region1"
c.ACLEnabled = true
})
defer cleanupS1()
codec := rpcClient(t, s1)
s2, _, cleanupS2 := TestACLServer(t, func(c *Config) {
c.Region = "region2"
c.AuthoritativeRegion = "region1"
c.ACLEnabled = true
c.ReplicationBackoff = 20 * time.Millisecond
c.ReplicationToken = root.SecretID
})
defer cleanupS2()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
// Write a node pool to the authoritative region
np1 := mock.NodePool()
index, _ := s1.State().LatestIndex() // we need indexes to be correct here
must.NoError(t, s1.State().UpsertNodePools(
structs.MsgTypeTestSetup, index, []*structs.NodePool{np1}))
// Wait for the node pool to replicate
testutil.WaitForResult(func() (bool, error) {
store := s2.State()
out, err := store.NodePoolByName(nil, np1.Name)
return out != nil, err
}, func(err error) {
t.Fatalf("should replicate node pool")
})
// Create a job in the node pool on the non-authoritative region
job := mock.SystemJob()
job.NodePool = np1.Name
index, _ = s1.State().LatestIndex()
must.NoError(t, s2.State().UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
// Deleting the node pool should fail
req := &structs.NodePoolDeleteRequest{
Names: []string{np1.Name},
WriteRequest: structs.WriteRequest{
Region: s1.Region(),
AuthToken: root.SecretID,
},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.DeleteNodePools", req, &resp)
must.ErrorContains(t, err, fmt.Sprintf(
"node pool %q has non-terminal jobs in regions: [%s]", np1.Name, s2.Region()))
// Stop the job and now deleting the node pool will work
job = job.Copy()
job.Stop = true
index, _ = s1.State().LatestIndex()
index++
must.NoError(t, s2.State().UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
must.NoError(t, msgpackrpc.CallWithCodec(codec, "NodePool.DeleteNodePools", req, &resp))
// Wait for the namespace deletion to replicate
testutil.WaitForResult(func() (bool, error) {
store := s2.State()
out, err := store.NodePoolByName(nil, np1.Name)
return out == nil, err
}, func(err error) {
t.Fatalf("should replicate node pool deletion")
})
}
func TestNodePoolEndpoint_ListJobs_ACLs(t *testing.T) {
ci.Parallel(t)