mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 17:35:43 +03:00
node pool: initial base work (#17163)
Implementation of the base work for the new node pools feature. It includes a new `NodePool` struct and its corresponding state store table. Upon start the state store is populated with two built-in node pools that cannot be modified nor deleted: * `all` is a node pool that always includes all nodes in the cluster. * `default` is the node pool where nodes that don't specify a node pool in their configuration are placed.
This commit is contained in:
@@ -63,6 +63,8 @@ var msgTypeNames = map[structs.MessageType]string{
|
||||
structs.ACLAuthMethodsDeleteRequestType: "ACLAuthMethodsDeleteRequestType",
|
||||
structs.ACLBindingRulesUpsertRequestType: "ACLBindingRulesUpsertRequestType",
|
||||
structs.ACLBindingRulesDeleteRequestType: "ACLBindingRulesDeleteRequestType",
|
||||
structs.NodePoolUpsertRequestType: "NodePoolUpsertRequestType",
|
||||
structs.NodePoolDeleteRequestType: "NodePoolDeleteRequestType",
|
||||
structs.NamespaceUpsertRequestType: "NamespaceUpsertRequestType",
|
||||
structs.NamespaceDeleteRequestType: "NamespaceDeleteRequestType",
|
||||
}
|
||||
|
||||
38
nomad/fsm.go
38
nomad/fsm.go
@@ -64,6 +64,7 @@ const (
|
||||
ACLRoleSnapshot SnapshotType = 25
|
||||
ACLAuthMethodSnapshot SnapshotType = 26
|
||||
ACLBindingRuleSnapshot SnapshotType = 27
|
||||
NodePoolSnapshot SnapshotType = 28
|
||||
|
||||
// Namespace appliers were moved from enterprise and therefore start at 64
|
||||
NamespaceSnapshot SnapshotType = 64
|
||||
@@ -1821,6 +1822,18 @@ func (n *nomadFSM) restoreImpl(old io.ReadCloser, filter *FSMFilter) error {
|
||||
return err
|
||||
}
|
||||
|
||||
case NodePoolSnapshot:
|
||||
pool := new(structs.NodePool)
|
||||
|
||||
if err := dec.Decode(pool); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Perform the restoration.
|
||||
if err := restore.NodePoolRestore(pool); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
// Check if this is an enterprise only object being restored
|
||||
restorer, ok := n.enterpriseRestorers[snapType]
|
||||
@@ -2279,6 +2292,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
if err := s.persistNodePools(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
if err := s.persistJobs(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
@@ -2441,6 +2458,27 @@ func (s *nomadSnapshot) persistNodes(sink raft.SnapshotSink,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *nomadSnapshot) persistNodePools(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
// Get all node pools.
|
||||
ws := memdb.NewWatchSet()
|
||||
pools, err := s.snap.NodePools(ws)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Iterate over all node pools and persist them.
|
||||
for raw := pools.Next(); raw != nil; raw = pools.Next() {
|
||||
pool := raw.(*structs.NodePool)
|
||||
|
||||
sink.Write([]byte{byte(NodePoolSnapshot)})
|
||||
if err := encoder.Encode(pool); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *nomadSnapshot) persistJobs(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
// Get all the jobs
|
||||
|
||||
@@ -2253,6 +2253,22 @@ func TestFSM_SnapshotRestore_Nodes(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_SnapshotRestore_NodePools(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
// Add some state
|
||||
fsm := testFSM(t)
|
||||
state := fsm.State()
|
||||
pool := mock.NodePool()
|
||||
state.UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool})
|
||||
|
||||
// Verify the contents
|
||||
fsm2 := testSnapshotRestore(t, fsm)
|
||||
state2 := fsm2.State()
|
||||
out, _ := state2.NodePoolByName(nil, pool.Name)
|
||||
must.Eq(t, pool, out)
|
||||
}
|
||||
|
||||
func TestFSM_SnapshotRestore_Jobs(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
// Add some state
|
||||
|
||||
@@ -251,6 +251,17 @@ func Namespace() *structs.Namespace {
|
||||
return ns
|
||||
}
|
||||
|
||||
func NodePool() *structs.NodePool {
|
||||
return &structs.NodePool{
|
||||
Name: fmt.Sprintf("pool-%s", uuid.Short()),
|
||||
Description: "test node pool",
|
||||
Meta: map[string]string{"team": "test"},
|
||||
SchedulerConfiguration: &structs.NodePoolSchedulerConfiguration{
|
||||
SchedulerAlgorithm: structs.SchedulerAlgorithmSpread,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// ServiceRegistrations generates an array containing two unique service
|
||||
// registrations.
|
||||
func ServiceRegistrations() []*structs.ServiceRegistration {
|
||||
|
||||
@@ -16,6 +16,7 @@ const (
|
||||
tableIndex = "index"
|
||||
|
||||
TableNamespaces = "namespaces"
|
||||
TableNodePools = "node_pools"
|
||||
TableServiceRegistrations = "service_registrations"
|
||||
TableVariables = "variables"
|
||||
TableVariablesQuotas = "variables_quota"
|
||||
@@ -66,6 +67,7 @@ func init() {
|
||||
RegisterSchemaFactories([]SchemaFactory{
|
||||
indexTableSchema,
|
||||
nodeTableSchema,
|
||||
nodePoolTableSchema,
|
||||
jobTableSchema,
|
||||
jobSummarySchema,
|
||||
jobVersionSchema,
|
||||
@@ -162,6 +164,26 @@ func nodeTableSchema() *memdb.TableSchema {
|
||||
}
|
||||
}
|
||||
|
||||
// nodePoolTableSchema returns the MemDB schema for the node pools table.
|
||||
// This table is used to store all the node pools registered in the cluster.
|
||||
func nodePoolTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: TableNodePools,
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
// Name is the primary index used for lookup and is required to be
|
||||
// unique.
|
||||
"id": {
|
||||
Name: "id",
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "Name",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// jobTableSchema returns the MemDB schema for the jobs table.
|
||||
// This table is used to store all the jobs that have been submitted.
|
||||
func jobTableSchema() *memdb.TableSchema {
|
||||
|
||||
@@ -155,9 +155,13 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
|
||||
s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges)
|
||||
}
|
||||
|
||||
// Initialize the state store with the default namespace.
|
||||
// Initialize the state store with the default namespace and built-in node
|
||||
// pools.
|
||||
if err := s.namespaceInit(); err != nil {
|
||||
return nil, fmt.Errorf("enterprise state store initialization failed: %v", err)
|
||||
return nil, fmt.Errorf("namespace state store initialization failed: %v", err)
|
||||
}
|
||||
if err := s.nodePoolInit(); err != nil {
|
||||
return nil, fmt.Errorf("node pool state store initialization failed: %w", err)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
|
||||
169
nomad/state/state_store_node_pools.go
Normal file
169
nomad/state/state_store_node_pools.go
Normal file
@@ -0,0 +1,169 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// nodePoolInit creates the built-in node pools that should always be present
|
||||
// in the cluster.
|
||||
func (s *StateStore) nodePoolInit() error {
|
||||
allNodePool := &structs.NodePool{
|
||||
Name: structs.NodePoolAll,
|
||||
Description: structs.NodePoolAllDescription,
|
||||
}
|
||||
|
||||
defaultNodePool := &structs.NodePool{
|
||||
Name: structs.NodePoolDefault,
|
||||
Description: structs.NodePoolDefaultDescription,
|
||||
}
|
||||
|
||||
return s.UpsertNodePools(
|
||||
structs.NodePoolUpsertRequestType,
|
||||
1,
|
||||
[]*structs.NodePool{allNodePool, defaultNodePool},
|
||||
)
|
||||
}
|
||||
|
||||
// NodePools returns an iterator over all node pools.
|
||||
func (s *StateStore) NodePools(ws memdb.WatchSet) (memdb.ResultIterator, error) {
|
||||
txn := s.db.ReadTxn()
|
||||
|
||||
iter, err := txn.Get(TableNodePools, "id")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("node pools lookup failed: %w", err)
|
||||
}
|
||||
|
||||
ws.Add(iter.WatchCh())
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// NodePoolByName returns the node pool that matches the given name or nil if
|
||||
// there is no match.
|
||||
func (s *StateStore) NodePoolByName(ws memdb.WatchSet, name string) (*structs.NodePool, error) {
|
||||
txn := s.db.ReadTxn()
|
||||
|
||||
watchCh, existing, err := txn.FirstWatch(TableNodePools, "id", name)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("node pool lookup failed: %w", err)
|
||||
}
|
||||
ws.Add(watchCh)
|
||||
|
||||
if existing == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return existing.(*structs.NodePool), nil
|
||||
}
|
||||
|
||||
// NodePoolsByNamePrefix returns an interator over all node pools that match
|
||||
// the given name prefix.
|
||||
func (s *StateStore) NodePoolsByNamePrefix(ws memdb.WatchSet, namePrefix string) (memdb.ResultIterator, error) {
|
||||
txn := s.db.ReadTxn()
|
||||
|
||||
iter, err := txn.Get(TableNodePools, "id_prefix", namePrefix)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("node pool lookup failed: %w", err)
|
||||
}
|
||||
ws.Add(iter.WatchCh())
|
||||
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// UpsertNodePools inserts or updates the given set of node pools.
|
||||
func (s *StateStore) UpsertNodePools(msgType structs.MessageType, index uint64, pools []*structs.NodePool) error {
|
||||
txn := s.db.WriteTxnMsgT(msgType, index)
|
||||
defer txn.Abort()
|
||||
|
||||
for _, pool := range pools {
|
||||
if err := s.upsertNodePoolTxn(txn, index, pool); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.Insert("index", &IndexEntry{TableNodePools, index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %w", err)
|
||||
}
|
||||
|
||||
return txn.Commit()
|
||||
}
|
||||
|
||||
func (s *StateStore) upsertNodePoolTxn(txn *txn, index uint64, pool *structs.NodePool) error {
|
||||
if pool == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
existing, err := txn.First(TableNodePools, "id", pool.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("node pool lookup failed: %w", err)
|
||||
}
|
||||
|
||||
if existing != nil {
|
||||
// Prevent changes to built-in node pools.
|
||||
if pool.IsBuiltIn() {
|
||||
return fmt.Errorf("modifying node pool %q is not allowed", pool.Name)
|
||||
}
|
||||
|
||||
exist := existing.(*structs.NodePool)
|
||||
pool.CreateIndex = exist.CreateIndex
|
||||
pool.ModifyIndex = index
|
||||
} else {
|
||||
pool.CreateIndex = index
|
||||
pool.ModifyIndex = index
|
||||
}
|
||||
|
||||
if err := txn.Insert(TableNodePools, pool); err != nil {
|
||||
return fmt.Errorf("node pool insert failed: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteNodePools removes the given set of node pools.
|
||||
func (s *StateStore) DeleteNodePools(msgType structs.MessageType, index uint64, names []string) error {
|
||||
txn := s.db.WriteTxnMsgT(msgType, index)
|
||||
defer txn.Abort()
|
||||
|
||||
for _, n := range names {
|
||||
if err := s.deleteNodePoolTxn(txn, index, n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Update index table.
|
||||
if err := txn.Insert("index", &IndexEntry{TableNodePools, index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %w", err)
|
||||
}
|
||||
|
||||
return txn.Commit()
|
||||
}
|
||||
|
||||
func (s *StateStore) deleteNodePoolTxn(txn *txn, index uint64, name string) error {
|
||||
// Check if node pool exists.
|
||||
existing, err := txn.First(TableNodePools, "id", name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("node pool lookup failed: %w", err)
|
||||
}
|
||||
if existing == nil {
|
||||
return fmt.Errorf("node pool %s not found", name)
|
||||
}
|
||||
|
||||
pool := existing.(*structs.NodePool)
|
||||
|
||||
// Prevent deletion of built-in node pools.
|
||||
if pool.IsBuiltIn() {
|
||||
return fmt.Errorf("deleting node pool %q is not allowed", pool.Name)
|
||||
}
|
||||
|
||||
// Delete node pool.
|
||||
if err := txn.Delete(TableNodePools, pool); err != nil {
|
||||
return fmt.Errorf("node pool deletion failed: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
398
nomad/state/state_store_node_pools_test.go
Normal file
398
nomad/state/state_store_node_pools_test.go
Normal file
@@ -0,0 +1,398 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package state
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
func TestStateStore_NodePools(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
// Create test node pools.
|
||||
state := testStateStore(t)
|
||||
pools := make([]*structs.NodePool, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
pools[i] = mock.NodePool()
|
||||
}
|
||||
must.NoError(t, state.UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools))
|
||||
|
||||
// Create a watchset to test that getters don't cause it to fire.
|
||||
ws := memdb.NewWatchSet()
|
||||
iter, err := state.NodePools(ws)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Verify all pools are returned.
|
||||
foundBuiltIn := map[string]bool{
|
||||
structs.NodePoolAll: false,
|
||||
structs.NodePoolDefault: false,
|
||||
}
|
||||
got := make([]*structs.NodePool, 0, 10)
|
||||
|
||||
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
||||
pool := raw.(*structs.NodePool)
|
||||
|
||||
if pool.IsBuiltIn() {
|
||||
must.False(t, foundBuiltIn[pool.Name])
|
||||
foundBuiltIn[pool.Name] = true
|
||||
continue
|
||||
}
|
||||
|
||||
got = append(got, pool)
|
||||
}
|
||||
|
||||
must.SliceContainsAll(t, got, pools)
|
||||
must.False(t, watchFired(ws))
|
||||
for k, v := range foundBuiltIn {
|
||||
must.True(t, v, must.Sprintf("built-in pool %q not found", k))
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_NodePool_ByName(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
// Create test node pools.
|
||||
state := testStateStore(t)
|
||||
pools := make([]*structs.NodePool, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
pools[i] = mock.NodePool()
|
||||
}
|
||||
must.NoError(t, state.UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools))
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
pool string
|
||||
expected *structs.NodePool
|
||||
}{
|
||||
{
|
||||
name: "find a pool",
|
||||
pool: pools[3].Name,
|
||||
expected: pools[3],
|
||||
},
|
||||
{
|
||||
name: "find built-in pool all",
|
||||
pool: structs.NodePoolAll,
|
||||
expected: &structs.NodePool{
|
||||
Name: structs.NodePoolAll,
|
||||
Description: structs.NodePoolAllDescription,
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "find built-in pool default",
|
||||
pool: structs.NodePoolDefault,
|
||||
expected: &structs.NodePool{
|
||||
Name: structs.NodePoolDefault,
|
||||
Description: structs.NodePoolDefaultDescription,
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "pool not found",
|
||||
pool: "no-pool",
|
||||
expected: nil,
|
||||
},
|
||||
{
|
||||
name: "must be exact match",
|
||||
pool: pools[2].Name[:4],
|
||||
expected: nil,
|
||||
},
|
||||
{
|
||||
name: "empty search",
|
||||
pool: "",
|
||||
expected: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ws := memdb.NewWatchSet()
|
||||
got, err := state.NodePoolByName(ws, tc.pool)
|
||||
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, tc.expected, got)
|
||||
must.False(t, watchFired(ws))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_NodePool_ByNamePrefix(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
// Create test node pools.
|
||||
state := testStateStore(t)
|
||||
existingPools := []*structs.NodePool{
|
||||
{Name: "prod-1"},
|
||||
{Name: "prod-2"},
|
||||
{Name: "prod-3"},
|
||||
{Name: "dev-1"},
|
||||
{Name: "dev-2"},
|
||||
{Name: "qa"},
|
||||
}
|
||||
err := state.UpsertNodePools(structs.MsgTypeTestSetup, 1000, existingPools)
|
||||
must.NoError(t, err)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
prefix string
|
||||
expected []string
|
||||
}{
|
||||
{
|
||||
name: "multiple prefix match",
|
||||
prefix: "prod",
|
||||
expected: []string{"prod-1", "prod-2", "prod-3"},
|
||||
},
|
||||
{
|
||||
name: "single prefix match",
|
||||
prefix: "qa",
|
||||
expected: []string{"qa"},
|
||||
},
|
||||
{
|
||||
name: "no match",
|
||||
prefix: "nope",
|
||||
expected: []string{},
|
||||
},
|
||||
{
|
||||
name: "empty prefix",
|
||||
prefix: "",
|
||||
expected: []string{
|
||||
"all",
|
||||
"default",
|
||||
"prod-1",
|
||||
"prod-2",
|
||||
"prod-3",
|
||||
"dev-1",
|
||||
"dev-2",
|
||||
"qa",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ws := memdb.NewWatchSet()
|
||||
iter, err := state.NodePoolsByNamePrefix(ws, tc.prefix)
|
||||
must.NoError(t, err)
|
||||
|
||||
got := []string{}
|
||||
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
||||
got = append(got, raw.(*structs.NodePool).Name)
|
||||
}
|
||||
must.SliceContainsAll(t, tc.expected, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_NodePool_Upsert(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
existingPools := make([]*structs.NodePool, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
existingPools[i] = mock.NodePool()
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
input []*structs.NodePool
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "add single pool",
|
||||
input: []*structs.NodePool{
|
||||
mock.NodePool(),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "add multiple pools",
|
||||
input: []*structs.NodePool{
|
||||
mock.NodePool(),
|
||||
mock.NodePool(),
|
||||
mock.NodePool(),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "update existing pools",
|
||||
input: []*structs.NodePool{
|
||||
{
|
||||
Name: existingPools[0].Name,
|
||||
Description: "updated",
|
||||
Meta: map[string]string{
|
||||
"updated": "true",
|
||||
},
|
||||
SchedulerConfiguration: &structs.NodePoolSchedulerConfiguration{
|
||||
SchedulerAlgorithm: structs.SchedulerAlgorithmBinpack,
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: existingPools[1].Name,
|
||||
Description: "use global scheduler config",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "update with nil",
|
||||
input: []*structs.NodePool{
|
||||
nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty name",
|
||||
input: []*structs.NodePool{
|
||||
{
|
||||
Name: "",
|
||||
},
|
||||
},
|
||||
expectedErr: "missing primary index",
|
||||
},
|
||||
{
|
||||
name: "update bulit-in pool all",
|
||||
input: []*structs.NodePool{
|
||||
{
|
||||
Name: structs.NodePoolAll,
|
||||
Description: "changed",
|
||||
},
|
||||
},
|
||||
expectedErr: "not allowed",
|
||||
},
|
||||
{
|
||||
name: "update built-in pool default",
|
||||
input: []*structs.NodePool{
|
||||
{
|
||||
Name: structs.NodePoolDefault,
|
||||
Description: "changed",
|
||||
},
|
||||
},
|
||||
expectedErr: "not allowed",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Create test pools.
|
||||
state := testStateStore(t)
|
||||
must.NoError(t, state.UpsertNodePools(structs.MsgTypeTestSetup, 1000, existingPools))
|
||||
|
||||
// Update pools from test case.
|
||||
err := state.UpsertNodePools(structs.MsgTypeTestSetup, 1001, tc.input)
|
||||
|
||||
if tc.expectedErr != "" {
|
||||
must.ErrorContains(t, err, tc.expectedErr)
|
||||
} else {
|
||||
must.NoError(t, err)
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
for _, pool := range tc.input {
|
||||
if pool == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
got, err := state.NodePoolByName(ws, pool.Name)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, pool, got)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_NodePool_Delete(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
pools := make([]*structs.NodePool, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
pools[i] = mock.NodePool()
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
del []string
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "delete one",
|
||||
del: []string{pools[0].Name},
|
||||
},
|
||||
{
|
||||
name: "delete multiple",
|
||||
del: []string{pools[0].Name, pools[3].Name},
|
||||
},
|
||||
{
|
||||
name: "delete non-existing",
|
||||
del: []string{"nope"},
|
||||
expectedErr: "not found",
|
||||
},
|
||||
{
|
||||
name: "delete is atomic",
|
||||
del: []string{pools[0].Name, "nope"},
|
||||
expectedErr: "not found",
|
||||
},
|
||||
{
|
||||
name: "delete built-in pool all",
|
||||
del: []string{structs.NodePoolAll},
|
||||
expectedErr: "not allowed",
|
||||
},
|
||||
{
|
||||
name: "delete built-in pool default",
|
||||
del: []string{structs.NodePoolDefault},
|
||||
expectedErr: "not allowed",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
must.NoError(t, state.UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools))
|
||||
|
||||
err := state.DeleteNodePools(structs.MsgTypeTestSetup, 1001, tc.del)
|
||||
if tc.expectedErr != "" {
|
||||
must.ErrorContains(t, err, tc.expectedErr)
|
||||
|
||||
// Make sure delete is atomic and nothing is removed if an
|
||||
// error happens.
|
||||
for _, p := range pools {
|
||||
got, err := state.NodePoolByName(nil, p.Name)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, p, got)
|
||||
}
|
||||
} else {
|
||||
must.NoError(t, err)
|
||||
|
||||
// Check that the node pools is deleted.
|
||||
for _, p := range tc.del {
|
||||
got, err := state.NodePoolByName(nil, p)
|
||||
must.NoError(t, err)
|
||||
must.Nil(t, got)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_NodePool_Restore(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
state := testStateStore(t)
|
||||
pool := mock.NodePool()
|
||||
|
||||
restore, err := state.Restore()
|
||||
must.NoError(t, err)
|
||||
|
||||
err = restore.NodePoolRestore(pool)
|
||||
must.NoError(t, err)
|
||||
|
||||
restore.Commit()
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
out, err := state.NodePoolByName(ws, pool.Name)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, out, pool)
|
||||
}
|
||||
@@ -34,6 +34,14 @@ func (r *StateRestore) NodeRestore(node *structs.Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NodePoolRestore is used to restore a node pool
|
||||
func (r *StateRestore) NodePoolRestore(pool *structs.NodePool) error {
|
||||
if err := r.txn.Insert(TableNodePools, pool); err != nil {
|
||||
return fmt.Errorf("node pool insert failed: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// JobRestore is used to restore a job
|
||||
func (r *StateRestore) JobRestore(job *structs.Job) error {
|
||||
if err := r.txn.Insert("jobs", job); err != nil {
|
||||
|
||||
135
nomad/structs/node_pool.go
Normal file
135
nomad/structs/node_pool.go
Normal file
@@ -0,0 +1,135 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package structs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"golang.org/x/exp/maps"
|
||||
)
|
||||
|
||||
const (
|
||||
// NodePoolAll is a built-in node pool that always includes all nodes in
|
||||
// the cluster.
|
||||
NodePoolAll = "all"
|
||||
NodePoolAllDescription = "Node pool with all nodes in the cluster."
|
||||
|
||||
// NodePoolDefault is a built-in node pool for nodes that don't specify a
|
||||
// node pool in their configuration.
|
||||
NodePoolDefault = "default"
|
||||
NodePoolDefaultDescription = "Default node pool."
|
||||
|
||||
// maxNodePoolDescriptionLength is the maximum length allowed for a node
|
||||
// pool description.
|
||||
maxNodePoolDescriptionLength = 256
|
||||
)
|
||||
|
||||
var (
|
||||
// validNodePoolName is the rule used to validate a node pool name.
|
||||
validNodePoolName = regexp.MustCompile("^[a-zA-Z0-9-_]{1,128}$")
|
||||
)
|
||||
|
||||
// NodePool allows partioning infrastructure
|
||||
type NodePool struct {
|
||||
// Name is the node pool name. It must be unique.
|
||||
Name string
|
||||
|
||||
// Description is the human-friendly description of the node pool.
|
||||
Description string
|
||||
|
||||
// Meta is a set of user-provided metadata for the node pool.
|
||||
Meta map[string]string
|
||||
|
||||
// SchedulerConfiguration is the scheduler configuration specific to the
|
||||
// node pool.
|
||||
SchedulerConfiguration *NodePoolSchedulerConfiguration
|
||||
|
||||
// Raft indexes.
|
||||
CreateIndex uint64
|
||||
ModifyIndex uint64
|
||||
}
|
||||
|
||||
// Validate returns an error if the node pool is invalid.
|
||||
func (n *NodePool) Validate() error {
|
||||
var mErr *multierror.Error
|
||||
|
||||
if !validNodePoolName.MatchString(n.Name) {
|
||||
mErr = multierror.Append(mErr, fmt.Errorf("invalid name %q, must match regex %s", n.Name, validNodePoolName))
|
||||
}
|
||||
if len(n.Description) > maxNodePoolDescriptionLength {
|
||||
mErr = multierror.Append(mErr, fmt.Errorf("description longer than %d", maxNodePoolDescriptionLength))
|
||||
}
|
||||
|
||||
mErr = multierror.Append(mErr, n.SchedulerConfiguration.Validate())
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// Copy returns a deep copy of the node pool.
|
||||
func (n *NodePool) Copy() *NodePool {
|
||||
if n == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
nc := new(NodePool)
|
||||
*nc = *n
|
||||
nc.Meta = maps.Clone(nc.Meta)
|
||||
nc.SchedulerConfiguration = nc.SchedulerConfiguration.Copy()
|
||||
|
||||
return nc
|
||||
}
|
||||
|
||||
// IsBuiltIn returns true if the node pool is one of the built-in pools.
|
||||
//
|
||||
// Built-in node pools are created automatically by Nomad and can never be
|
||||
// deleted or modified so they are always present in the cluster..
|
||||
func (n *NodePool) IsBuiltIn() bool {
|
||||
switch n.Name {
|
||||
case NodePoolAll, NodePoolDefault:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// NodePoolSchedulerConfiguration is the scheduler confinguration applied to a
|
||||
// node pool.
|
||||
type NodePoolSchedulerConfiguration struct {
|
||||
|
||||
// SchedulerAlgorithm is the scheduling algorithm to use for the pool.
|
||||
// If not defined, the global cluster scheduling algorithm is used.
|
||||
SchedulerAlgorithm SchedulerAlgorithm `hcl:"scheduler_algorithm"`
|
||||
}
|
||||
|
||||
// Copy returns a deep copy of the node pool scheduler configuration.
|
||||
func (n *NodePoolSchedulerConfiguration) Copy() *NodePoolSchedulerConfiguration {
|
||||
if n == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
nc := new(NodePoolSchedulerConfiguration)
|
||||
*nc = *n
|
||||
|
||||
return nc
|
||||
}
|
||||
|
||||
// Validate returns an error if the node pool scheduler confinguration is
|
||||
// invalid.
|
||||
func (n *NodePoolSchedulerConfiguration) Validate() error {
|
||||
if n == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var mErr *multierror.Error
|
||||
|
||||
switch n.SchedulerAlgorithm {
|
||||
case "", SchedulerAlgorithmBinpack, SchedulerAlgorithmSpread:
|
||||
default:
|
||||
mErr = multierror.Append(mErr, fmt.Errorf("invalid scheduler algorithm %q", n.SchedulerAlgorithm))
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
136
nomad/structs/node_pool_test.go
Normal file
136
nomad/structs/node_pool_test.go
Normal file
@@ -0,0 +1,136 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package structs
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
func TestNodePool_Copy(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
pool := &NodePool{
|
||||
Name: "original",
|
||||
Description: "original node pool",
|
||||
Meta: map[string]string{"original": "true"},
|
||||
SchedulerConfiguration: &NodePoolSchedulerConfiguration{
|
||||
SchedulerAlgorithm: SchedulerAlgorithmSpread,
|
||||
},
|
||||
}
|
||||
poolCopy := pool.Copy()
|
||||
poolCopy.Name = "copy"
|
||||
poolCopy.Description = "copy of original pool"
|
||||
poolCopy.Meta["original"] = "false"
|
||||
poolCopy.Meta["new_key"] = "true"
|
||||
poolCopy.SchedulerConfiguration.SchedulerAlgorithm = SchedulerAlgorithmBinpack
|
||||
|
||||
must.NotEq(t, pool, poolCopy)
|
||||
must.NotEq(t, pool.Meta, poolCopy.Meta)
|
||||
must.NotEq(t, pool.SchedulerConfiguration, poolCopy.SchedulerConfiguration)
|
||||
}
|
||||
|
||||
func TestNodePool_Validate(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
pool *NodePool
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "valid pool",
|
||||
pool: &NodePool{
|
||||
Name: "valid",
|
||||
Description: "ok",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid pool name character",
|
||||
pool: &NodePool{
|
||||
Name: "not-valid-😢",
|
||||
},
|
||||
expectedErr: "invalid name",
|
||||
},
|
||||
{
|
||||
name: "missing pool name",
|
||||
pool: &NodePool{
|
||||
Name: "",
|
||||
},
|
||||
expectedErr: "invalid name",
|
||||
},
|
||||
{
|
||||
name: "invalid pool description",
|
||||
pool: &NodePool{
|
||||
Name: "valid",
|
||||
Description: strings.Repeat("a", 300),
|
||||
},
|
||||
expectedErr: "description longer",
|
||||
},
|
||||
{
|
||||
name: "invalid scheduling algorithm",
|
||||
pool: &NodePool{
|
||||
Name: "valid",
|
||||
SchedulerConfiguration: &NodePoolSchedulerConfiguration{
|
||||
SchedulerAlgorithm: "invalid",
|
||||
},
|
||||
},
|
||||
expectedErr: "invalid scheduler algorithm",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
err := tc.pool.Validate()
|
||||
|
||||
if tc.expectedErr != "" {
|
||||
must.ErrorContains(t, err, tc.expectedErr)
|
||||
} else {
|
||||
must.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodePool_IsBuiltIn(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
pool *NodePool
|
||||
builtIn bool
|
||||
}{
|
||||
{
|
||||
name: "all",
|
||||
pool: &NodePool{
|
||||
Name: NodePoolAll,
|
||||
},
|
||||
builtIn: true,
|
||||
},
|
||||
{
|
||||
name: "default",
|
||||
pool: &NodePool{
|
||||
Name: NodePoolDefault,
|
||||
},
|
||||
builtIn: true,
|
||||
},
|
||||
{
|
||||
name: "not built-in",
|
||||
pool: &NodePool{
|
||||
Name: "not-built-in",
|
||||
},
|
||||
builtIn: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := tc.pool.IsBuiltIn()
|
||||
must.Eq(t, tc.builtIn, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -125,6 +125,8 @@ const (
|
||||
ACLAuthMethodsDeleteRequestType MessageType = 56
|
||||
ACLBindingRulesUpsertRequestType MessageType = 57
|
||||
ACLBindingRulesDeleteRequestType MessageType = 58
|
||||
NodePoolUpsertRequestType MessageType = 59
|
||||
NodePoolDeleteRequestType MessageType = 60
|
||||
|
||||
// Namespace types were moved from enterprise and therefore start at 64
|
||||
NamespaceUpsertRequestType MessageType = 64
|
||||
|
||||
Reference in New Issue
Block a user