From 1e679ccba2f34ce40bd55442382b3b004a5ded2e Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Mon, 15 May 2023 10:49:08 -0400 Subject: [PATCH] node pool: initial base work (#17163) Implementation of the base work for the new node pools feature. It includes a new `NodePool` struct and its corresponding state store table. Upon start the state store is populated with two built-in node pools that cannot be modified nor deleted: * `all` is a node pool that always includes all nodes in the cluster. * `default` is the node pool where nodes that don't specify a node pool in their configuration are placed. --- helper/raftutil/msgtypes.go | 2 + nomad/fsm.go | 38 ++ nomad/fsm_test.go | 16 + nomad/mock/mock.go | 11 + nomad/state/schema.go | 22 ++ nomad/state/state_store.go | 8 +- nomad/state/state_store_node_pools.go | 169 +++++++++ nomad/state/state_store_node_pools_test.go | 398 +++++++++++++++++++++ nomad/state/state_store_restore.go | 8 + nomad/structs/node_pool.go | 135 +++++++ nomad/structs/node_pool_test.go | 136 +++++++ nomad/structs/structs.go | 2 + 12 files changed, 943 insertions(+), 2 deletions(-) create mode 100644 nomad/state/state_store_node_pools.go create mode 100644 nomad/state/state_store_node_pools_test.go create mode 100644 nomad/structs/node_pool.go create mode 100644 nomad/structs/node_pool_test.go diff --git a/helper/raftutil/msgtypes.go b/helper/raftutil/msgtypes.go index bee2e35f8..9112eb4a6 100644 --- a/helper/raftutil/msgtypes.go +++ b/helper/raftutil/msgtypes.go @@ -63,6 +63,8 @@ var msgTypeNames = map[structs.MessageType]string{ structs.ACLAuthMethodsDeleteRequestType: "ACLAuthMethodsDeleteRequestType", structs.ACLBindingRulesUpsertRequestType: "ACLBindingRulesUpsertRequestType", structs.ACLBindingRulesDeleteRequestType: "ACLBindingRulesDeleteRequestType", + structs.NodePoolUpsertRequestType: "NodePoolUpsertRequestType", + structs.NodePoolDeleteRequestType: "NodePoolDeleteRequestType", structs.NamespaceUpsertRequestType: "NamespaceUpsertRequestType", structs.NamespaceDeleteRequestType: "NamespaceDeleteRequestType", } diff --git a/nomad/fsm.go b/nomad/fsm.go index ab5d6c495..7d726cceb 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -64,6 +64,7 @@ const ( ACLRoleSnapshot SnapshotType = 25 ACLAuthMethodSnapshot SnapshotType = 26 ACLBindingRuleSnapshot SnapshotType = 27 + NodePoolSnapshot SnapshotType = 28 // Namespace appliers were moved from enterprise and therefore start at 64 NamespaceSnapshot SnapshotType = 64 @@ -1821,6 +1822,18 @@ func (n *nomadFSM) restoreImpl(old io.ReadCloser, filter *FSMFilter) error { return err } + case NodePoolSnapshot: + pool := new(structs.NodePool) + + if err := dec.Decode(pool); err != nil { + return err + } + + // Perform the restoration. + if err := restore.NodePoolRestore(pool); err != nil { + return err + } + default: // Check if this is an enterprise only object being restored restorer, ok := n.enterpriseRestorers[snapType] @@ -2279,6 +2292,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + if err := s.persistNodePools(sink, encoder); err != nil { + sink.Cancel() + return err + } if err := s.persistJobs(sink, encoder); err != nil { sink.Cancel() return err @@ -2441,6 +2458,27 @@ func (s *nomadSnapshot) persistNodes(sink raft.SnapshotSink, return nil } +func (s *nomadSnapshot) persistNodePools(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + // Get all node pools. + ws := memdb.NewWatchSet() + pools, err := s.snap.NodePools(ws) + if err != nil { + return err + } + + // Iterate over all node pools and persist them. + for raw := pools.Next(); raw != nil; raw = pools.Next() { + pool := raw.(*structs.NodePool) + + sink.Write([]byte{byte(NodePoolSnapshot)}) + if err := encoder.Encode(pool); err != nil { + return err + } + } + return nil +} + func (s *nomadSnapshot) persistJobs(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the jobs diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 037b225e1..5d3539316 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2253,6 +2253,22 @@ func TestFSM_SnapshotRestore_Nodes(t *testing.T) { } } +func TestFSM_SnapshotRestore_NodePools(t *testing.T) { + ci.Parallel(t) + + // Add some state + fsm := testFSM(t) + state := fsm.State() + pool := mock.NodePool() + state.UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool}) + + // Verify the contents + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + out, _ := state2.NodePoolByName(nil, pool.Name) + must.Eq(t, pool, out) +} + func TestFSM_SnapshotRestore_Jobs(t *testing.T) { ci.Parallel(t) // Add some state diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 21175ea60..432ceea30 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -251,6 +251,17 @@ func Namespace() *structs.Namespace { return ns } +func NodePool() *structs.NodePool { + return &structs.NodePool{ + Name: fmt.Sprintf("pool-%s", uuid.Short()), + Description: "test node pool", + Meta: map[string]string{"team": "test"}, + SchedulerConfiguration: &structs.NodePoolSchedulerConfiguration{ + SchedulerAlgorithm: structs.SchedulerAlgorithmSpread, + }, + } +} + // ServiceRegistrations generates an array containing two unique service // registrations. func ServiceRegistrations() []*structs.ServiceRegistration { diff --git a/nomad/state/schema.go b/nomad/state/schema.go index fbb53212c..fd7b4c819 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -16,6 +16,7 @@ const ( tableIndex = "index" TableNamespaces = "namespaces" + TableNodePools = "node_pools" TableServiceRegistrations = "service_registrations" TableVariables = "variables" TableVariablesQuotas = "variables_quota" @@ -66,6 +67,7 @@ func init() { RegisterSchemaFactories([]SchemaFactory{ indexTableSchema, nodeTableSchema, + nodePoolTableSchema, jobTableSchema, jobSummarySchema, jobVersionSchema, @@ -162,6 +164,26 @@ func nodeTableSchema() *memdb.TableSchema { } } +// nodePoolTableSchema returns the MemDB schema for the node pools table. +// This table is used to store all the node pools registered in the cluster. +func nodePoolTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: TableNodePools, + Indexes: map[string]*memdb.IndexSchema{ + // Name is the primary index used for lookup and is required to be + // unique. + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "Name", + }, + }, + }, + } +} + // jobTableSchema returns the MemDB schema for the jobs table. // This table is used to store all the jobs that have been submitted. func jobTableSchema() *memdb.TableSchema { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 679005b74..bf24e902d 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -155,9 +155,13 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges) } - // Initialize the state store with the default namespace. + // Initialize the state store with the default namespace and built-in node + // pools. if err := s.namespaceInit(); err != nil { - return nil, fmt.Errorf("enterprise state store initialization failed: %v", err) + return nil, fmt.Errorf("namespace state store initialization failed: %v", err) + } + if err := s.nodePoolInit(); err != nil { + return nil, fmt.Errorf("node pool state store initialization failed: %w", err) } return s, nil diff --git a/nomad/state/state_store_node_pools.go b/nomad/state/state_store_node_pools.go new file mode 100644 index 000000000..0a481c024 --- /dev/null +++ b/nomad/state/state_store_node_pools.go @@ -0,0 +1,169 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package state + +import ( + "fmt" + + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/structs" +) + +// nodePoolInit creates the built-in node pools that should always be present +// in the cluster. +func (s *StateStore) nodePoolInit() error { + allNodePool := &structs.NodePool{ + Name: structs.NodePoolAll, + Description: structs.NodePoolAllDescription, + } + + defaultNodePool := &structs.NodePool{ + Name: structs.NodePoolDefault, + Description: structs.NodePoolDefaultDescription, + } + + return s.UpsertNodePools( + structs.NodePoolUpsertRequestType, + 1, + []*structs.NodePool{allNodePool, defaultNodePool}, + ) +} + +// NodePools returns an iterator over all node pools. +func (s *StateStore) NodePools(ws memdb.WatchSet) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + iter, err := txn.Get(TableNodePools, "id") + if err != nil { + return nil, fmt.Errorf("node pools lookup failed: %w", err) + } + + ws.Add(iter.WatchCh()) + return iter, nil +} + +// NodePoolByName returns the node pool that matches the given name or nil if +// there is no match. +func (s *StateStore) NodePoolByName(ws memdb.WatchSet, name string) (*structs.NodePool, error) { + txn := s.db.ReadTxn() + + watchCh, existing, err := txn.FirstWatch(TableNodePools, "id", name) + if err != nil { + return nil, fmt.Errorf("node pool lookup failed: %w", err) + } + ws.Add(watchCh) + + if existing == nil { + return nil, nil + } + + return existing.(*structs.NodePool), nil +} + +// NodePoolsByNamePrefix returns an interator over all node pools that match +// the given name prefix. +func (s *StateStore) NodePoolsByNamePrefix(ws memdb.WatchSet, namePrefix string) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + iter, err := txn.Get(TableNodePools, "id_prefix", namePrefix) + if err != nil { + return nil, fmt.Errorf("node pool lookup failed: %w", err) + } + ws.Add(iter.WatchCh()) + + return iter, nil +} + +// UpsertNodePools inserts or updates the given set of node pools. +func (s *StateStore) UpsertNodePools(msgType structs.MessageType, index uint64, pools []*structs.NodePool) error { + txn := s.db.WriteTxnMsgT(msgType, index) + defer txn.Abort() + + for _, pool := range pools { + if err := s.upsertNodePoolTxn(txn, index, pool); err != nil { + return err + } + } + + if err := txn.Insert("index", &IndexEntry{TableNodePools, index}); err != nil { + return fmt.Errorf("index update failed: %w", err) + } + + return txn.Commit() +} + +func (s *StateStore) upsertNodePoolTxn(txn *txn, index uint64, pool *structs.NodePool) error { + if pool == nil { + return nil + } + + existing, err := txn.First(TableNodePools, "id", pool.Name) + if err != nil { + return fmt.Errorf("node pool lookup failed: %w", err) + } + + if existing != nil { + // Prevent changes to built-in node pools. + if pool.IsBuiltIn() { + return fmt.Errorf("modifying node pool %q is not allowed", pool.Name) + } + + exist := existing.(*structs.NodePool) + pool.CreateIndex = exist.CreateIndex + pool.ModifyIndex = index + } else { + pool.CreateIndex = index + pool.ModifyIndex = index + } + + if err := txn.Insert(TableNodePools, pool); err != nil { + return fmt.Errorf("node pool insert failed: %w", err) + } + + return nil +} + +// DeleteNodePools removes the given set of node pools. +func (s *StateStore) DeleteNodePools(msgType structs.MessageType, index uint64, names []string) error { + txn := s.db.WriteTxnMsgT(msgType, index) + defer txn.Abort() + + for _, n := range names { + if err := s.deleteNodePoolTxn(txn, index, n); err != nil { + return err + } + } + + // Update index table. + if err := txn.Insert("index", &IndexEntry{TableNodePools, index}); err != nil { + return fmt.Errorf("index update failed: %w", err) + } + + return txn.Commit() +} + +func (s *StateStore) deleteNodePoolTxn(txn *txn, index uint64, name string) error { + // Check if node pool exists. + existing, err := txn.First(TableNodePools, "id", name) + if err != nil { + return fmt.Errorf("node pool lookup failed: %w", err) + } + if existing == nil { + return fmt.Errorf("node pool %s not found", name) + } + + pool := existing.(*structs.NodePool) + + // Prevent deletion of built-in node pools. + if pool.IsBuiltIn() { + return fmt.Errorf("deleting node pool %q is not allowed", pool.Name) + } + + // Delete node pool. + if err := txn.Delete(TableNodePools, pool); err != nil { + return fmt.Errorf("node pool deletion failed: %w", err) + } + + return nil +} diff --git a/nomad/state/state_store_node_pools_test.go b/nomad/state/state_store_node_pools_test.go new file mode 100644 index 000000000..b208800db --- /dev/null +++ b/nomad/state/state_store_node_pools_test.go @@ -0,0 +1,398 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package state + +import ( + "testing" + + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" +) + +func TestStateStore_NodePools(t *testing.T) { + ci.Parallel(t) + + // Create test node pools. + state := testStateStore(t) + pools := make([]*structs.NodePool, 10) + for i := 0; i < 10; i++ { + pools[i] = mock.NodePool() + } + must.NoError(t, state.UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools)) + + // Create a watchset to test that getters don't cause it to fire. + ws := memdb.NewWatchSet() + iter, err := state.NodePools(ws) + must.NoError(t, err) + + // Verify all pools are returned. + foundBuiltIn := map[string]bool{ + structs.NodePoolAll: false, + structs.NodePoolDefault: false, + } + got := make([]*structs.NodePool, 0, 10) + + for raw := iter.Next(); raw != nil; raw = iter.Next() { + pool := raw.(*structs.NodePool) + + if pool.IsBuiltIn() { + must.False(t, foundBuiltIn[pool.Name]) + foundBuiltIn[pool.Name] = true + continue + } + + got = append(got, pool) + } + + must.SliceContainsAll(t, got, pools) + must.False(t, watchFired(ws)) + for k, v := range foundBuiltIn { + must.True(t, v, must.Sprintf("built-in pool %q not found", k)) + } +} + +func TestStateStore_NodePool_ByName(t *testing.T) { + ci.Parallel(t) + + // Create test node pools. + state := testStateStore(t) + pools := make([]*structs.NodePool, 10) + for i := 0; i < 10; i++ { + pools[i] = mock.NodePool() + } + must.NoError(t, state.UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools)) + + testCases := []struct { + name string + pool string + expected *structs.NodePool + }{ + { + name: "find a pool", + pool: pools[3].Name, + expected: pools[3], + }, + { + name: "find built-in pool all", + pool: structs.NodePoolAll, + expected: &structs.NodePool{ + Name: structs.NodePoolAll, + Description: structs.NodePoolAllDescription, + CreateIndex: 1, + ModifyIndex: 1, + }, + }, + { + name: "find built-in pool default", + pool: structs.NodePoolDefault, + expected: &structs.NodePool{ + Name: structs.NodePoolDefault, + Description: structs.NodePoolDefaultDescription, + CreateIndex: 1, + ModifyIndex: 1, + }, + }, + { + name: "pool not found", + pool: "no-pool", + expected: nil, + }, + { + name: "must be exact match", + pool: pools[2].Name[:4], + expected: nil, + }, + { + name: "empty search", + pool: "", + expected: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ws := memdb.NewWatchSet() + got, err := state.NodePoolByName(ws, tc.pool) + + must.NoError(t, err) + must.Eq(t, tc.expected, got) + must.False(t, watchFired(ws)) + }) + } +} + +func TestStateStore_NodePool_ByNamePrefix(t *testing.T) { + ci.Parallel(t) + + // Create test node pools. + state := testStateStore(t) + existingPools := []*structs.NodePool{ + {Name: "prod-1"}, + {Name: "prod-2"}, + {Name: "prod-3"}, + {Name: "dev-1"}, + {Name: "dev-2"}, + {Name: "qa"}, + } + err := state.UpsertNodePools(structs.MsgTypeTestSetup, 1000, existingPools) + must.NoError(t, err) + + testCases := []struct { + name string + prefix string + expected []string + }{ + { + name: "multiple prefix match", + prefix: "prod", + expected: []string{"prod-1", "prod-2", "prod-3"}, + }, + { + name: "single prefix match", + prefix: "qa", + expected: []string{"qa"}, + }, + { + name: "no match", + prefix: "nope", + expected: []string{}, + }, + { + name: "empty prefix", + prefix: "", + expected: []string{ + "all", + "default", + "prod-1", + "prod-2", + "prod-3", + "dev-1", + "dev-2", + "qa", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ws := memdb.NewWatchSet() + iter, err := state.NodePoolsByNamePrefix(ws, tc.prefix) + must.NoError(t, err) + + got := []string{} + for raw := iter.Next(); raw != nil; raw = iter.Next() { + got = append(got, raw.(*structs.NodePool).Name) + } + must.SliceContainsAll(t, tc.expected, got) + }) + } +} + +func TestStateStore_NodePool_Upsert(t *testing.T) { + ci.Parallel(t) + + existingPools := make([]*structs.NodePool, 10) + for i := 0; i < 10; i++ { + existingPools[i] = mock.NodePool() + } + + testCases := []struct { + name string + input []*structs.NodePool + expectedErr string + }{ + { + name: "add single pool", + input: []*structs.NodePool{ + mock.NodePool(), + }, + }, + { + name: "add multiple pools", + input: []*structs.NodePool{ + mock.NodePool(), + mock.NodePool(), + mock.NodePool(), + }, + }, + { + name: "update existing pools", + input: []*structs.NodePool{ + { + Name: existingPools[0].Name, + Description: "updated", + Meta: map[string]string{ + "updated": "true", + }, + SchedulerConfiguration: &structs.NodePoolSchedulerConfiguration{ + SchedulerAlgorithm: structs.SchedulerAlgorithmBinpack, + }, + }, + { + Name: existingPools[1].Name, + Description: "use global scheduler config", + }, + }, + }, + { + name: "update with nil", + input: []*structs.NodePool{ + nil, + }, + }, + { + name: "empty name", + input: []*structs.NodePool{ + { + Name: "", + }, + }, + expectedErr: "missing primary index", + }, + { + name: "update bulit-in pool all", + input: []*structs.NodePool{ + { + Name: structs.NodePoolAll, + Description: "changed", + }, + }, + expectedErr: "not allowed", + }, + { + name: "update built-in pool default", + input: []*structs.NodePool{ + { + Name: structs.NodePoolDefault, + Description: "changed", + }, + }, + expectedErr: "not allowed", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create test pools. + state := testStateStore(t) + must.NoError(t, state.UpsertNodePools(structs.MsgTypeTestSetup, 1000, existingPools)) + + // Update pools from test case. + err := state.UpsertNodePools(structs.MsgTypeTestSetup, 1001, tc.input) + + if tc.expectedErr != "" { + must.ErrorContains(t, err, tc.expectedErr) + } else { + must.NoError(t, err) + + ws := memdb.NewWatchSet() + for _, pool := range tc.input { + if pool == nil { + continue + } + + got, err := state.NodePoolByName(ws, pool.Name) + must.NoError(t, err) + must.Eq(t, pool, got) + } + } + }) + } +} + +func TestStateStore_NodePool_Delete(t *testing.T) { + ci.Parallel(t) + + pools := make([]*structs.NodePool, 10) + for i := 0; i < 10; i++ { + pools[i] = mock.NodePool() + } + + testCases := []struct { + name string + del []string + expectedErr string + }{ + { + name: "delete one", + del: []string{pools[0].Name}, + }, + { + name: "delete multiple", + del: []string{pools[0].Name, pools[3].Name}, + }, + { + name: "delete non-existing", + del: []string{"nope"}, + expectedErr: "not found", + }, + { + name: "delete is atomic", + del: []string{pools[0].Name, "nope"}, + expectedErr: "not found", + }, + { + name: "delete built-in pool all", + del: []string{structs.NodePoolAll}, + expectedErr: "not allowed", + }, + { + name: "delete built-in pool default", + del: []string{structs.NodePoolDefault}, + expectedErr: "not allowed", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + state := testStateStore(t) + must.NoError(t, state.UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools)) + + err := state.DeleteNodePools(structs.MsgTypeTestSetup, 1001, tc.del) + if tc.expectedErr != "" { + must.ErrorContains(t, err, tc.expectedErr) + + // Make sure delete is atomic and nothing is removed if an + // error happens. + for _, p := range pools { + got, err := state.NodePoolByName(nil, p.Name) + must.NoError(t, err) + must.Eq(t, p, got) + } + } else { + must.NoError(t, err) + + // Check that the node pools is deleted. + for _, p := range tc.del { + got, err := state.NodePoolByName(nil, p) + must.NoError(t, err) + must.Nil(t, got) + } + } + }) + } +} + +func TestStateStore_NodePool_Restore(t *testing.T) { + ci.Parallel(t) + + state := testStateStore(t) + pool := mock.NodePool() + + restore, err := state.Restore() + must.NoError(t, err) + + err = restore.NodePoolRestore(pool) + must.NoError(t, err) + + restore.Commit() + + ws := memdb.NewWatchSet() + out, err := state.NodePoolByName(ws, pool.Name) + must.NoError(t, err) + must.Eq(t, out, pool) +} diff --git a/nomad/state/state_store_restore.go b/nomad/state/state_store_restore.go index 340d83550..254b767c7 100644 --- a/nomad/state/state_store_restore.go +++ b/nomad/state/state_store_restore.go @@ -34,6 +34,14 @@ func (r *StateRestore) NodeRestore(node *structs.Node) error { return nil } +// NodePoolRestore is used to restore a node pool +func (r *StateRestore) NodePoolRestore(pool *structs.NodePool) error { + if err := r.txn.Insert(TableNodePools, pool); err != nil { + return fmt.Errorf("node pool insert failed: %v", err) + } + return nil +} + // JobRestore is used to restore a job func (r *StateRestore) JobRestore(job *structs.Job) error { if err := r.txn.Insert("jobs", job); err != nil { diff --git a/nomad/structs/node_pool.go b/nomad/structs/node_pool.go new file mode 100644 index 000000000..37189ac69 --- /dev/null +++ b/nomad/structs/node_pool.go @@ -0,0 +1,135 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package structs + +import ( + "fmt" + "regexp" + + "github.com/hashicorp/go-multierror" + "golang.org/x/exp/maps" +) + +const ( + // NodePoolAll is a built-in node pool that always includes all nodes in + // the cluster. + NodePoolAll = "all" + NodePoolAllDescription = "Node pool with all nodes in the cluster." + + // NodePoolDefault is a built-in node pool for nodes that don't specify a + // node pool in their configuration. + NodePoolDefault = "default" + NodePoolDefaultDescription = "Default node pool." + + // maxNodePoolDescriptionLength is the maximum length allowed for a node + // pool description. + maxNodePoolDescriptionLength = 256 +) + +var ( + // validNodePoolName is the rule used to validate a node pool name. + validNodePoolName = regexp.MustCompile("^[a-zA-Z0-9-_]{1,128}$") +) + +// NodePool allows partioning infrastructure +type NodePool struct { + // Name is the node pool name. It must be unique. + Name string + + // Description is the human-friendly description of the node pool. + Description string + + // Meta is a set of user-provided metadata for the node pool. + Meta map[string]string + + // SchedulerConfiguration is the scheduler configuration specific to the + // node pool. + SchedulerConfiguration *NodePoolSchedulerConfiguration + + // Raft indexes. + CreateIndex uint64 + ModifyIndex uint64 +} + +// Validate returns an error if the node pool is invalid. +func (n *NodePool) Validate() error { + var mErr *multierror.Error + + if !validNodePoolName.MatchString(n.Name) { + mErr = multierror.Append(mErr, fmt.Errorf("invalid name %q, must match regex %s", n.Name, validNodePoolName)) + } + if len(n.Description) > maxNodePoolDescriptionLength { + mErr = multierror.Append(mErr, fmt.Errorf("description longer than %d", maxNodePoolDescriptionLength)) + } + + mErr = multierror.Append(mErr, n.SchedulerConfiguration.Validate()) + + return mErr.ErrorOrNil() +} + +// Copy returns a deep copy of the node pool. +func (n *NodePool) Copy() *NodePool { + if n == nil { + return nil + } + + nc := new(NodePool) + *nc = *n + nc.Meta = maps.Clone(nc.Meta) + nc.SchedulerConfiguration = nc.SchedulerConfiguration.Copy() + + return nc +} + +// IsBuiltIn returns true if the node pool is one of the built-in pools. +// +// Built-in node pools are created automatically by Nomad and can never be +// deleted or modified so they are always present in the cluster.. +func (n *NodePool) IsBuiltIn() bool { + switch n.Name { + case NodePoolAll, NodePoolDefault: + return true + default: + return false + } +} + +// NodePoolSchedulerConfiguration is the scheduler confinguration applied to a +// node pool. +type NodePoolSchedulerConfiguration struct { + + // SchedulerAlgorithm is the scheduling algorithm to use for the pool. + // If not defined, the global cluster scheduling algorithm is used. + SchedulerAlgorithm SchedulerAlgorithm `hcl:"scheduler_algorithm"` +} + +// Copy returns a deep copy of the node pool scheduler configuration. +func (n *NodePoolSchedulerConfiguration) Copy() *NodePoolSchedulerConfiguration { + if n == nil { + return nil + } + + nc := new(NodePoolSchedulerConfiguration) + *nc = *n + + return nc +} + +// Validate returns an error if the node pool scheduler confinguration is +// invalid. +func (n *NodePoolSchedulerConfiguration) Validate() error { + if n == nil { + return nil + } + + var mErr *multierror.Error + + switch n.SchedulerAlgorithm { + case "", SchedulerAlgorithmBinpack, SchedulerAlgorithmSpread: + default: + mErr = multierror.Append(mErr, fmt.Errorf("invalid scheduler algorithm %q", n.SchedulerAlgorithm)) + } + + return mErr.ErrorOrNil() +} diff --git a/nomad/structs/node_pool_test.go b/nomad/structs/node_pool_test.go new file mode 100644 index 000000000..54c2b1e1d --- /dev/null +++ b/nomad/structs/node_pool_test.go @@ -0,0 +1,136 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package structs + +import ( + "strings" + "testing" + + "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" +) + +func TestNodePool_Copy(t *testing.T) { + ci.Parallel(t) + + pool := &NodePool{ + Name: "original", + Description: "original node pool", + Meta: map[string]string{"original": "true"}, + SchedulerConfiguration: &NodePoolSchedulerConfiguration{ + SchedulerAlgorithm: SchedulerAlgorithmSpread, + }, + } + poolCopy := pool.Copy() + poolCopy.Name = "copy" + poolCopy.Description = "copy of original pool" + poolCopy.Meta["original"] = "false" + poolCopy.Meta["new_key"] = "true" + poolCopy.SchedulerConfiguration.SchedulerAlgorithm = SchedulerAlgorithmBinpack + + must.NotEq(t, pool, poolCopy) + must.NotEq(t, pool.Meta, poolCopy.Meta) + must.NotEq(t, pool.SchedulerConfiguration, poolCopy.SchedulerConfiguration) +} + +func TestNodePool_Validate(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + name string + pool *NodePool + expectedErr string + }{ + { + name: "valid pool", + pool: &NodePool{ + Name: "valid", + Description: "ok", + }, + }, + { + name: "invalid pool name character", + pool: &NodePool{ + Name: "not-valid-😢", + }, + expectedErr: "invalid name", + }, + { + name: "missing pool name", + pool: &NodePool{ + Name: "", + }, + expectedErr: "invalid name", + }, + { + name: "invalid pool description", + pool: &NodePool{ + Name: "valid", + Description: strings.Repeat("a", 300), + }, + expectedErr: "description longer", + }, + { + name: "invalid scheduling algorithm", + pool: &NodePool{ + Name: "valid", + SchedulerConfiguration: &NodePoolSchedulerConfiguration{ + SchedulerAlgorithm: "invalid", + }, + }, + expectedErr: "invalid scheduler algorithm", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.pool.Validate() + + if tc.expectedErr != "" { + must.ErrorContains(t, err, tc.expectedErr) + } else { + must.NoError(t, err) + } + }) + } +} + +func TestNodePool_IsBuiltIn(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + name string + pool *NodePool + builtIn bool + }{ + { + name: "all", + pool: &NodePool{ + Name: NodePoolAll, + }, + builtIn: true, + }, + { + name: "default", + pool: &NodePool{ + Name: NodePoolDefault, + }, + builtIn: true, + }, + { + name: "not built-in", + pool: &NodePool{ + Name: "not-built-in", + }, + builtIn: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got := tc.pool.IsBuiltIn() + must.Eq(t, tc.builtIn, got) + }) + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index fb46cb679..b318e31dc 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -125,6 +125,8 @@ const ( ACLAuthMethodsDeleteRequestType MessageType = 56 ACLBindingRulesUpsertRequestType MessageType = 57 ACLBindingRulesDeleteRequestType MessageType = 58 + NodePoolUpsertRequestType MessageType = 59 + NodePoolDeleteRequestType MessageType = 60 // Namespace types were moved from enterprise and therefore start at 64 NamespaceUpsertRequestType MessageType = 64