scheduler: adds implicit constraint for secrets plugin node attributes (#26303)

This commit is contained in:
Michael Smithhisler
2025-07-21 11:25:01 -04:00
parent 6dcd155bf8
commit ac32b0864d
7 changed files with 242 additions and 0 deletions

View File

@@ -43,6 +43,7 @@ var (
"nomad": NewNomadFingerprint,
"plugins_cni": NewPluginsCNIFingerprint,
"host_volume_plugins": NewPluginsHostVolumeFingerprint,
"secrets": NewPluginsSecretsFingerprint,
"signal": NewSignalFingerprint,
"storage": NewStorageFingerprint,
"vault": NewVaultFingerprint,

View File

@@ -0,0 +1,33 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package fingerprint
import (
"time"
"github.com/hashicorp/go-hclog"
)
type SecretsPluginFingerprint struct {
logger hclog.Logger
}
func NewPluginsSecretsFingerprint(logger hclog.Logger) Fingerprint {
return &SecretsPluginFingerprint{
logger: logger.Named("secrets_plugins"),
}
}
func (s *SecretsPluginFingerprint) Fingerprint(request *FingerprintRequest, response *FingerprintResponse) error {
// Add builtin secrets providers
response.AddAttribute("plugins.secrets.nomad.version", "1.0.0")
response.AddAttribute("plugins.secrets.vault.version", "1.0.0")
response.Detected = true
return nil
}
func (s *SecretsPluginFingerprint) Periodic() (bool, time.Duration) {
return false, 0
}

View File

@@ -0,0 +1,24 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package fingerprint
import (
"testing"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/shoenig/test/must"
)
func TestPluginsSecretsFingerprint(t *testing.T) {
fp := NewPluginsSecretsFingerprint(testlog.HCLogger(t))
resp := FingerprintResponse{}
err := fp.Fingerprint(&FingerprintRequest{}, &resp)
must.NoError(t, err)
must.True(t, resp.Detected)
must.MapContainsKeys(t, resp.Attributes, []string{
"plugins.secrets.nomad.version",
"plugins.secrets.vault.version",
})
}

View File

@@ -35,6 +35,7 @@ const (
FilterConstraintCSIVolumeGCdAllocationTemplate = "CSI volume %s has exhausted its available writer claims and is claimed by a garbage collected allocation %s; waiting for claim to be released"
FilterConstraintDrivers = "missing drivers"
FilterConstraintDevices = "missing devices"
FilterConstraintSecrets = "missing secrets provider"
FilterConstraintsCSIPluginTopology = "did not meet topology requirement"
)
@@ -161,6 +162,48 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
return NewStaticIterator(ctx, nodes)
}
type SecretsProviderChecker struct {
ctx Context
secrets map[string]struct{}
}
func NewSecretsProviderChecker(ctx Context, secrets map[string]struct{}) *SecretsProviderChecker {
return &SecretsProviderChecker{
ctx: ctx,
secrets: secrets,
}
}
func (s *SecretsProviderChecker) SetSecrets(secrets map[string]struct{}) {
s.secrets = secrets
}
func (s *SecretsProviderChecker) Feasible(option *structs.Node) bool {
// Use this node if possible
if s.hasSecrets(option) {
return true
}
s.ctx.Metrics().FilterNode(option, FilterConstraintSecrets)
return false
}
// hasSecrets is used to check if the node has all the appropriate
// secrets provider for this task group. Secrets providers are registered
// as node attributes "secrets.plugin.*.version".
func (s *SecretsProviderChecker) hasSecrets(option *structs.Node) bool {
for secret := range s.secrets {
secretStr := fmt.Sprintf("plugins.secrets.%s.version", secret)
_, ok := option.Attributes[secretStr]
if !ok {
return false
}
}
return true
}
// HostVolumeChecker is a FeasibilityChecker which returns whether a node has
// the host volumes necessary to schedule a task group.
type HostVolumeChecker struct {

View File

@@ -88,6 +88,59 @@ func TestRandomIterator(t *testing.T) {
}
func TestSecretsChecker(t *testing.T) {
ci.Parallel(t)
_, ctx := MockContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
mock.Node(),
}
nodes[1].Attributes["plugins.secrets.foo.version"] = "1.0.0"
nodes[2].Attributes["plugins.secrets.bar.version"] = "1.0.0"
m := map[string]struct{}{
"foo": struct{}{},
}
checker := NewSecretsProviderChecker(ctx, m)
cases := []struct {
Node *structs.Node
Secrets map[string]struct{}
Result bool
}{
{
Node: nodes[0],
Secrets: map[string]struct{}{
"foo": {},
},
Result: false,
},
{
Node: nodes[1],
Secrets: map[string]struct{}{
"foo": {},
},
Result: true,
},
{
Node: nodes[2],
Secrets: map[string]struct{}{
"foo": {},
},
Result: false,
},
}
for i, c := range cases {
checker.SetSecrets(c.Secrets)
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
}
}
func TestHostVolumeChecker_Static(t *testing.T) {
ci.Parallel(t)

View File

@@ -61,6 +61,7 @@ type GenericStack struct {
taskGroupHostVolumes *HostVolumeChecker
taskGroupCSIVolumes *CSIVolumeChecker
taskGroupNetwork *NetworkChecker
taskGroupSecrets *SecretsProviderChecker
distinctHostsConstraint *DistinctHostsIterator
distinctPropertyConstraint *DistinctPropertyIterator
@@ -164,6 +165,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
}
s.taskGroupSecrets.SetSecrets(tgConstr.Secrets)
s.distinctHostsConstraint.SetTaskGroup(tg)
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
@@ -218,6 +220,7 @@ type SystemStack struct {
taskGroupHostVolumes *HostVolumeChecker
taskGroupCSIVolumes *CSIVolumeChecker
taskGroupNetwork *NetworkChecker
taskGroupSecrets *SecretsProviderChecker
distinctPropertyConstraint *DistinctPropertyIterator
binPack *BinPackIterator
@@ -258,6 +261,9 @@ func NewSystemStack(sysbatch bool, ctx Context) *SystemStack {
// Filter on available client networks
s.taskGroupNetwork = NewNetworkChecker(ctx)
// Filter on task group secrets
s.taskGroupSecrets = NewSecretsProviderChecker(ctx, nil)
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
@@ -268,6 +274,7 @@ func NewSystemStack(sysbatch bool, ctx Context) *SystemStack {
s.taskGroupConstraint,
s.taskGroupDevices,
s.taskGroupNetwork,
s.taskGroupSecrets,
}
avail := []FeasibilityChecker{
s.taskGroupHostVolumes,
@@ -359,6 +366,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
}
s.taskGroupSecrets.SetSecrets(tgConstr.Secrets)
s.wrappedChecks.SetTaskGroup(tg.Name)
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.binPack.SetTaskGroup(tg)
@@ -409,6 +417,9 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Filter on available client networks
s.taskGroupNetwork = NewNetworkChecker(ctx)
// Filter on task group secrets
s.taskGroupSecrets = NewSecretsProviderChecker(ctx, nil)
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
@@ -419,6 +430,7 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
s.taskGroupConstraint,
s.taskGroupDevices,
s.taskGroupNetwork,
s.taskGroupSecrets,
}
avail := []FeasibilityChecker{
s.taskGroupHostVolumes,
@@ -480,12 +492,17 @@ func TaskGroupConstraints(tg *structs.TaskGroup) TgConstrainTuple {
c := TgConstrainTuple{
Constraints: make([]*structs.Constraint, 0, len(tg.Constraints)),
Drivers: make(map[string]struct{}),
Secrets: make(map[string]struct{}),
}
c.Constraints = append(c.Constraints, tg.Constraints...)
for _, task := range tg.Tasks {
c.Drivers[task.Driver] = struct{}{}
c.Constraints = append(c.Constraints, task.Constraints...)
for _, s := range task.Secrets {
c.Secrets[s.Provider] = struct{}{}
}
}
return c
@@ -498,4 +515,7 @@ type TgConstrainTuple struct {
// The set of required Drivers within the task group.
Drivers map[string]struct{}
// The secret providers being utilized by the task group
Secrets map[string]struct{}
}

View File

@@ -220,6 +220,36 @@ func TestServiceStack_Select_DriverFilter(t *testing.T) {
must.Eq(t, zero, node.Node)
}
func TestServiceStack_Select_Secrets(t *testing.T) {
ci.Parallel(t)
_, ctx := MockContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}
zero := nodes[0]
zero.Attributes["plugins.secrets.foo.version"] = "0.0.1"
must.NoError(t, zero.ComputeClass())
stack := NewGenericStack(false, ctx)
stack.SetNodes(nodes)
job := mock.Job()
job.TaskGroups[0].Tasks[0].Secrets = []*structs.Secret{
{
Provider: "foo",
},
}
stack.SetJob(job)
selectOptions := &SelectOptions{}
node := stack.Select(job.TaskGroups[0], selectOptions)
must.NotNil(t, node, must.Sprintf("missing node %#v", ctx.Metrics()))
must.Eq(t, zero, node.Node)
}
func TestServiceStack_Select_HostVolume(t *testing.T) {
ci.Parallel(t)
@@ -571,6 +601,44 @@ func TestSystemStack_Select_DriverFilter(t *testing.T) {
must.Nil(t, node)
}
func TestSystemStack_Select_Secrets(t *testing.T) {
ci.Parallel(t)
_, ctx := MockContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}
zero := nodes[0]
zero.Attributes["plugins.secrets.foo.version"] = "0.0.1"
must.NoError(t, zero.ComputeClass())
stack := NewSystemStack(false, ctx)
stack.SetNodes(nodes)
job := mock.Job()
job.TaskGroups[0].Tasks[0].Secrets = []*structs.Secret{
{
Provider: "foo",
},
}
stack.SetJob(job)
selectOptions := &SelectOptions{}
node := stack.Select(job.TaskGroups[0], selectOptions)
must.NotNil(t, node, must.Sprintf("missing node %#v", ctx.Metrics()))
must.Eq(t, zero, node.Node)
delete(zero.Attributes, "plugins.secrets.foo.version")
must.NoError(t, zero.ComputeClass())
stack = NewSystemStack(false, ctx)
stack.SetNodes(nodes)
stack.SetJob(job)
node = stack.Select(job.TaskGroups[0], selectOptions)
must.Nil(t, node)
}
func TestSystemStack_Select_ConstraintFilter(t *testing.T) {
ci.Parallel(t)