From 2c77bf72f1aaba5397f09dbfb350fd7627a9997c Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 13 Jun 2023 09:57:42 -0400 Subject: [PATCH] node pools: protect against deleting occupied pools (#17457) We don't want to delete node pools that have nodes or non-terminal jobs. Add a check in the `DeleteNodePools` RPC to check locally and in federated regions, similar to how we check that it's safe to delete namespaces. --- nomad/node_pool_endpoint.go | 101 +++++++++++++++++++++- nomad/node_pool_endpoint_test.go | 139 +++++++++++++++++++++++++++++-- 2 files changed, 230 insertions(+), 10 deletions(-) diff --git a/nomad/node_pool_endpoint.go b/nomad/node_pool_endpoint.go index 5b2e50eed..008ebbb49 100644 --- a/nomad/node_pool_endpoint.go +++ b/nomad/node_pool_endpoint.go @@ -11,6 +11,7 @@ import ( metrics "github.com/armon/go-metrics" "github.com/hashicorp/go-memdb" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" @@ -257,17 +258,115 @@ func (n *NodePool) DeleteNodePools(args *structs.NodePoolDeleteRequest, reply *s } } - // TODO(luiz): verify that the node pool is not being used. + // Verify that the node pools we're deleting do not have nodes or + // non-terminal jobs in this region or in any federated region. + var mErr multierror.Error + for _, name := range args.Names { + regionsWithNonTerminal, regionsWithNodes, err := n.nodePoolRegionsInUse(args.AuthToken, name) + if err != nil { + _ = multierror.Append(&mErr, err) + } + if len(regionsWithNonTerminal) != 0 { + _ = multierror.Append(&mErr, fmt.Errorf( + "node pool %q has non-terminal jobs in regions: %v", name, regionsWithNonTerminal)) + } + if len(regionsWithNodes) != 0 { + _ = multierror.Append(&mErr, fmt.Errorf( + "node pool %q has nodes in regions: %v", name, regionsWithNodes)) + } + } + + if err := mErr.ErrorOrNil(); err != nil { + return err + } // Delete via Raft. _, index, err := n.srv.raftApply(structs.NodePoolDeleteRequestType, args) if err != nil { return err } + reply.Index = index return nil } +// nodePoolRegionsInUse returns a list of regions where the node pool is still +// in use for non-terminal jobs, and a list of regions where it is in use by +// nodes. +func (n *NodePool) nodePoolRegionsInUse(token, poolName string) ([]string, []string, error) { + regions := n.srv.Regions() + thisRegion := n.srv.Region() + hasNodes := make([]string, 0, len(regions)) + hasNonTerminal := make([]string, 0, len(regions)) + + // Check if the pool in use in this region + snap, err := n.srv.State().Snapshot() + if err != nil { + return nil, nil, err + } + iter, err := snap.NodesByNodePool(nil, poolName) + if err != nil { + return nil, nil, err + } + found := iter.Next() + if found != nil { + hasNodes = append(hasNodes, thisRegion) + } + iter, err = snap.JobsByPool(nil, poolName) + for raw := iter.Next(); raw != nil; raw = iter.Next() { + job := raw.(*structs.Job) + if job.Status != structs.JobStatusDead { + hasNonTerminal = append(hasNonTerminal, thisRegion) + break + } + } + + for _, region := range regions { + if region == thisRegion { + continue + } + + nodesReq := &structs.NodePoolNodesRequest{ + Name: poolName, + QueryOptions: structs.QueryOptions{ + Region: region, + AuthToken: token, + PerPage: 1, // we only care if there are any + }, + } + var nodesResp structs.NodePoolNodesResponse + err := n.srv.RPC("NodePool.ListNodes", nodesReq, &nodesResp) + if err != nil { + return hasNodes, hasNonTerminal, err + } + if len(nodesResp.Nodes) != 0 { + hasNodes = append(hasNodes, region) + } + + jobsReq := &structs.NodePoolJobsRequest{ + Name: poolName, + QueryOptions: structs.QueryOptions{ + Region: region, + AuthToken: token, + PerPage: 1, // we only care if there are any + Filter: `Status != "dead"`, + }, + } + var jobsResp structs.NodePoolJobsResponse + err = n.srv.RPC("NodePool.ListJobs", jobsReq, &jobsResp) + if err != nil { + return hasNodes, hasNonTerminal, err + } + + if len(jobsResp.Jobs) != 0 { + hasNonTerminal = append(hasNonTerminal, region) + } + + } + + return hasNonTerminal, hasNodes, err +} + // ListJobs is used to retrieve a list of jobs for a given node pool. It supports // pagination and filtering. func (n *NodePool) ListJobs(args *structs.NodePoolJobsRequest, reply *structs.NodePoolJobsResponse) error { diff --git a/nomad/node_pool_endpoint_test.go b/nomad/node_pool_endpoint_test.go index 72781da1e..1886f9edc 100644 --- a/nomad/node_pool_endpoint_test.go +++ b/nomad/node_pool_endpoint_test.go @@ -883,6 +883,7 @@ func TestNodePoolEndpoint_DeleteNodePools(t *testing.T) { defer cleanupS() codec := rpcClient(t, s) + store := s.fsm.State() testutil.WaitForLeader(t, s.RPC) // Insert a few node pools that we can delete. @@ -891,6 +892,16 @@ func TestNodePoolEndpoint_DeleteNodePools(t *testing.T) { pools = append(pools, mock.NodePool()) } + // Insert a node and job to block deleting + node := mock.Node() + node.NodePool = pools[3].Name + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node)) + + job := mock.MinJob() + job.NodePool = pools[4].Name + job.Status = structs.JobStatusRunning + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job)) + testCases := []struct { name string pools []string @@ -907,6 +918,18 @@ func TestNodePoolEndpoint_DeleteNodePools(t *testing.T) { pools[2].Name, }, }, + { + name: "delete pool occupied by node", + pools: []string{pools[3].Name}, + expectedErr: fmt.Sprintf( + "node pool %q has nodes in regions: [global]", pools[3].Name), + }, + { + name: "delete pool occupied by job", + pools: []string{pools[4].Name}, + expectedErr: fmt.Sprintf( + "node pool %q has non-terminal jobs in regions: [global]", pools[4].Name), + }, { name: "pool doesn't exist", pools: []string{"doesnt-exist"}, @@ -936,7 +959,7 @@ func TestNodePoolEndpoint_DeleteNodePools(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools) + err := store.UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools) must.NoError(t, err) req := &structs.NodePoolDeleteRequest{ @@ -955,7 +978,7 @@ func TestNodePoolEndpoint_DeleteNodePools(t *testing.T) { for _, pool := range tc.pools { ws := memdb.NewWatchSet() - got, err := s.fsm.State().NodePoolByName(ws, pool) + got, err := store.NodePoolByName(ws, pool) must.NoError(t, err) must.Nil(t, got) } @@ -971,20 +994,21 @@ func TestNodePoolEndpoint_DeleteNodePools_ACL(t *testing.T) { defer cleanupS() codec := rpcClient(t, s) + store := s.fsm.State() testutil.WaitForLeader(t, s.RPC) // Create test ACL tokens. - devToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1001, "dev-node-pools", + devToken := mock.CreatePolicyAndToken(t, store, 1001, "dev-node-pools", mock.NodePoolPolicy("dev-*", "write", nil), ) - devSpecificToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1003, "dev-1-node-pools", + devSpecificToken := mock.CreatePolicyAndToken(t, store, 1003, "dev-1-node-pools", mock.NodePoolPolicy("dev-1", "write", nil), ) - prodToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1005, "prod-node-pools", + prodToken := mock.CreatePolicyAndToken(t, store, 1005, "prod-node-pools", mock.NodePoolPolicy("prod-*", "", []string{"delete"}), ) - noPolicyToken := mock.CreateToken(t, s.fsm.State(), 1007, nil) - noDeleteToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1009, "node-pools-no-delete", + noPolicyToken := mock.CreateToken(t, store, 1007, nil) + noDeleteToken := mock.CreatePolicyAndToken(t, store, 1009, "node-pools-no-delete", mock.NodePoolPolicy("*", "", []string{"read", "write"}), ) @@ -1004,6 +1028,16 @@ func TestNodePoolEndpoint_DeleteNodePools_ACL(t *testing.T) { pools = append(pools, qaPool) } + // Insert a node and job to block deleting + node := mock.Node() + node.NodePool = "prod-3" + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node)) + + job := mock.MinJob() + job.NodePool = "prod-4" + job.Status = structs.JobStatusRunning + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job)) + testCases := []struct { name string token string @@ -1067,11 +1101,23 @@ func TestNodePoolEndpoint_DeleteNodePools_ACL(t *testing.T) { pools: []string{"dev-1"}, expectedErr: structs.ErrPermissionDenied.Error(), }, + { + name: "no delete pool occupied by node", + token: root.SecretID, + pools: []string{"prod-3"}, + expectedErr: "node pool \"prod-3\" has nodes in regions: [global]", + }, + { + name: "no delete pool occupied by job", + token: root.SecretID, + pools: []string{"prod-4"}, + expectedErr: "node pool \"prod-4\" has non-terminal jobs in regions: [global]", + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools) + err := store.UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools) must.NoError(t, err) req := &structs.NodePoolDeleteRequest{ @@ -1091,7 +1137,7 @@ func TestNodePoolEndpoint_DeleteNodePools_ACL(t *testing.T) { for _, pool := range tc.pools { ws := memdb.NewWatchSet() - got, err := s.fsm.State().NodePoolByName(ws, pool) + got, err := store.NodePoolByName(ws, pool) must.NoError(t, err) must.Nil(t, got) } @@ -1100,6 +1146,81 @@ func TestNodePoolEndpoint_DeleteNodePools_ACL(t *testing.T) { } } +func TestNodePoolEndpoint_DeleteNodePools_NonLocal(t *testing.T) { + ci.Parallel(t) + + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { + c.Region = "region1" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + }) + defer cleanupS1() + codec := rpcClient(t, s1) + + s2, _, cleanupS2 := TestACLServer(t, func(c *Config) { + c.Region = "region2" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + c.ReplicationBackoff = 20 * time.Millisecond + c.ReplicationToken = root.SecretID + }) + defer cleanupS2() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + // Write a node pool to the authoritative region + np1 := mock.NodePool() + index, _ := s1.State().LatestIndex() // we need indexes to be correct here + must.NoError(t, s1.State().UpsertNodePools( + structs.MsgTypeTestSetup, index, []*structs.NodePool{np1})) + + // Wait for the node pool to replicate + testutil.WaitForResult(func() (bool, error) { + store := s2.State() + out, err := store.NodePoolByName(nil, np1.Name) + return out != nil, err + }, func(err error) { + t.Fatalf("should replicate node pool") + }) + + // Create a job in the node pool on the non-authoritative region + job := mock.SystemJob() + job.NodePool = np1.Name + index, _ = s1.State().LatestIndex() + must.NoError(t, s2.State().UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) + + // Deleting the node pool should fail + req := &structs.NodePoolDeleteRequest{ + Names: []string{np1.Name}, + WriteRequest: structs.WriteRequest{ + Region: s1.Region(), + AuthToken: root.SecretID, + }, + } + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "NodePool.DeleteNodePools", req, &resp) + must.ErrorContains(t, err, fmt.Sprintf( + "node pool %q has non-terminal jobs in regions: [%s]", np1.Name, s2.Region())) + + // Stop the job and now deleting the node pool will work + job = job.Copy() + job.Stop = true + index, _ = s1.State().LatestIndex() + index++ + must.NoError(t, s2.State().UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "NodePool.DeleteNodePools", req, &resp)) + + // Wait for the namespace deletion to replicate + testutil.WaitForResult(func() (bool, error) { + store := s2.State() + out, err := store.NodePoolByName(nil, np1.Name) + return out == nil, err + }, func(err error) { + t.Fatalf("should replicate node pool deletion") + }) +} + func TestNodePoolEndpoint_ListJobs_ACLs(t *testing.T) { ci.Parallel(t)