client: new consul_hook (#18557)

This PR introduces a new allocrunner-level consul_hook which iterates over
services and tasks, if their provider is consul, fetches consul tokens for all of
them, stores them in AllocHookResources and in task secret dirs.

Ref: hashicorp/team-nomad#404

---------

Co-authored-by: Tim Gross <tgross@hashicorp.com>
This commit is contained in:
Piotr Kazmierczak
2023-09-29 17:41:48 +02:00
committed by GitHub
parent 0a75a42d94
commit 5dab41881b
9 changed files with 405 additions and 43 deletions

View File

@@ -8,7 +8,6 @@ import (
"time"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/taskenv"
@@ -120,6 +119,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
ar.runnerHooks = []interfaces.RunnerHook{
newIdentityHook(hookLogger, ar.widmgr),
newAllocDirHook(hookLogger, ar.allocDir),
newConsulHook(hookLogger, ar.alloc, ar.allocDir, ar.widmgr, ar.clientConfig.GetConsulConfigs(hookLogger), ar.hookResources),
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
newCPUPartsHook(hookLogger, ar.partitions, alloc),

View File

@@ -0,0 +1,247 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package allocrunner
import (
"fmt"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/consul"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/widmgr"
"github.com/hashicorp/nomad/nomad/structs"
structsc "github.com/hashicorp/nomad/nomad/structs/config"
)
const (
// consulServicesAuthMethodName is the JWT auth method name that has to be
// configured in Consul in order to authenticate Nomad services.
consulServicesAuthMethodName = "nomad-workloads"
// consulTasksAuthMethodName the JWT auth method name that has to be
// configured in Consul in order to authenticate Nomad tasks (used by
// templates).
consulTasksAuthMethodName = "nomad-tasks"
)
type consulHook struct {
alloc *structs.Allocation
allocdir *allocdir.AllocDir
widmgr widmgr.IdentityManager
consulConfigs map[string]*structsc.ConsulConfig
hookResources *cstructs.AllocHookResources
logger log.Logger
}
func newConsulHook(logger log.Logger, alloc *structs.Allocation,
allocdir *allocdir.AllocDir,
widmgr widmgr.IdentityManager,
consulConfigs map[string]*structsc.ConsulConfig,
hookResources *cstructs.AllocHookResources,
) *consulHook {
h := &consulHook{
alloc: alloc,
allocdir: allocdir,
widmgr: widmgr,
consulConfigs: consulConfigs,
hookResources: hookResources,
}
h.logger = logger.Named(h.Name())
return h
}
func (*consulHook) Name() string {
return "consul"
}
func (h *consulHook) Prerun() error {
job := h.alloc.Job
if job == nil {
// this is always a programming error
err := fmt.Errorf("alloc %v does not have a job", h.alloc.Name)
h.logger.Error(err.Error())
return err
}
mErr := multierror.Error{}
// tokens are a map of Consul cluster to service identity name to Consul
// ACL token
tokens := map[string]map[string]string{}
for _, tg := range job.TaskGroups {
if err := h.prepareConsulTokensForServices(tg.Services, tokens); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
for _, task := range tg.Tasks {
if err := h.prepareConsulTokensForServices(task.Services, tokens); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
if err := h.prepareConsulTokensForTask(job, task, tokens); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
}
// write the tokens to hookResources
h.hookResources.SetConsulTokens(tokens)
return mErr.ErrorOrNil()
}
func (h *consulHook) prepareConsulTokensForTask(job *structs.Job, task *structs.Task, tokens map[string]map[string]string) error {
// if UseIdentity is unset of set to false, quit
// FIXME Fetch from Task.Consul.Cluster once #18557 is in
consulConfig := h.consulConfigs[structs.ConsulDefaultCluster]
if consulConfig.UseIdentity == nil || !*consulConfig.UseIdentity {
return nil
}
// default identity
ti := widmgr.TaskIdentity{
TaskName: task.Name,
IdentityName: task.Identity.Name,
}
req, err := h.prepareConsulClientReq(ti, consulTasksAuthMethodName)
if err != nil {
return err
}
jwt, err := h.widmgr.Get(ti)
if err != nil {
h.logger.Error("error getting signed identity", "error", err)
return err
}
req[task.Identity.Name] = consul.JWTLoginRequest{
JWT: jwt.JWT,
AuthMethodName: consulTasksAuthMethodName,
}
// FIXME Fetch from Task.Consul.Cluster once #18557 is in
if err := h.getConsulTokens(structs.ConsulDefaultCluster, task.Identity.Name, tokens, req); err != nil {
return err
}
// alt identities
mErr := multierror.Error{}
for _, i := range task.Identities {
ti := widmgr.TaskIdentity{
TaskName: task.Name,
IdentityName: i.Name,
}
req, err := h.prepareConsulClientReq(ti, consulTasksAuthMethodName)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
jwt, err := h.widmgr.Get(ti)
if err != nil {
h.logger.Error("error getting signed identity", "error", err)
mErr.Errors = append(mErr.Errors, err)
continue
}
req[task.Identity.Name] = consul.JWTLoginRequest{
JWT: jwt.JWT,
AuthMethodName: consulTasksAuthMethodName,
}
// FIXME Fetch from Task.Consul.Cluster once #18557 is in
if err := h.getConsulTokens(structs.ConsulDefaultCluster, ti.IdentityName, tokens, req); err != nil {
return err
}
}
return mErr.ErrorOrNil()
}
func (h *consulHook) prepareConsulTokensForServices(services []*structs.Service, tokens map[string]map[string]string) error {
if len(services) == 0 {
return nil
}
mErr := multierror.Error{}
for _, service := range services {
// see if maybe we can quit early
if service == nil || !service.IsConsul() {
continue
}
if service.Identity == nil {
continue
}
ti := widmgr.TaskIdentity{
TaskName: service.TaskName,
IdentityName: service.Identity.Name,
}
req, err := h.prepareConsulClientReq(ti, consulServicesAuthMethodName)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
// in case no service needs a consul token
if len(req) == 0 {
continue
}
if err := h.getConsulTokens(service.Cluster, service.Identity.Name, tokens, req); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
}
return mErr.ErrorOrNil()
}
func (h *consulHook) getConsulTokens(cluster, identityName string, tokens map[string]map[string]string, req map[string]consul.JWTLoginRequest) error {
// Consul auth
consulConf, ok := h.consulConfigs[cluster]
if !ok {
return fmt.Errorf("unable to find configuration for consul cluster %v", cluster)
}
client, err := consul.NewConsulClient(consulConf, h.logger)
if err != nil {
return err
}
// get consul acl tokens
t, err := client.DeriveSITokenWithJWT(req)
if err != nil {
return err
}
if tokens[cluster] == nil {
tokens[cluster] = map[string]string{}
}
tokens[cluster][identityName] = t[identityName]
return nil
}
func (h *consulHook) prepareConsulClientReq(identity widmgr.TaskIdentity, authMethodName string) (map[string]consul.JWTLoginRequest, error) {
req := map[string]consul.JWTLoginRequest{}
jwt, err := h.widmgr.Get(identity)
if err != nil {
h.logger.Error("error getting signed identity", "error", err)
return req, err
}
req[identity.IdentityName] = consul.JWTLoginRequest{
JWT: jwt.JWT,
AuthMethodName: authMethodName,
}
return req, nil
}

View File

@@ -10,7 +10,6 @@ import (
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/widmgr"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
@@ -68,7 +67,7 @@ func TestIdentityHook_Prerun(t *testing.T) {
must.NoError(t, hook.Prerun())
time.Sleep(time.Second) // give goroutines a moment to run
sid, err := hook.widmgr.Get(cstructs.TaskIdentity{
sid, err := hook.widmgr.Get(widmgr.TaskIdentity{
TaskName: task.Name,
IdentityName: task.Identities[0].Name},
)

View File

@@ -0,0 +1,68 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package taskrunner
import (
"context"
"fmt"
"os"
"path/filepath"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// consulTokenFilePrefix is the begging of the name of the file holding the
// Consul SI token inside the task's secret directory. Full name of the file is
// always consulTokenFilePrefix_identityName
consulTokenFilePrefix = "nomad_consul"
// consulTokenFilePerms is the level of file permissions granted on the file in
// the secrets directory for the task
consulTokenFilePerms = 0440
)
type consulHook struct {
task *structs.Task
tokenDir string
hookResources *cstructs.AllocHookResources
logger log.Logger
}
func newConsulHook(logger log.Logger, tr *TaskRunner, hookResources *cstructs.AllocHookResources) *consulHook {
h := &consulHook{
task: tr.Task(),
tokenDir: tr.taskDir.SecretsDir,
hookResources: hookResources,
}
h.logger = logger.Named(h.Name())
return h
}
func (*consulHook) Name() string {
return "consul"
}
func (h *consulHook) Prestart(context.Context, *interfaces.TaskPrestartRequest, *interfaces.TaskPrestartResponse) error {
mErr := multierror.Error{}
tokens := h.hookResources.GetConsulTokens()
// Write tokens to tasks' secret dirs
for cluster, t := range tokens {
for identity, token := range t {
filename := fmt.Sprintf("%s_%s_%s", consulTokenFilePrefix, cluster, identity)
tokenPath := filepath.Join(h.tokenDir, filename)
if err := os.WriteFile(tokenPath, []byte(token), consulTokenFilePerms); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("failed to write Consul SI token: %w", err))
}
}
}
return mErr.ErrorOrNil()
}

View File

@@ -11,7 +11,6 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/client/widmgr"
"github.com/hashicorp/nomad/helper/users"
@@ -83,7 +82,7 @@ func (h *identityHook) Prestart(context.Context, *interfaces.TaskPrestartRequest
}
func (h *identityHook) watchIdentity(wid *structs.WorkloadIdentity) {
id := cstructs.TaskIdentity{TaskName: h.task.Name, IdentityName: wid.Name}
id := widmgr.TaskIdentity{TaskName: h.task.Name, IdentityName: wid.Name}
signedIdentitiesChan, stopWatching := h.widmgr.Watch(id)
defer stopWatching()

View File

@@ -41,13 +41,12 @@ type SupportedProxiesAPI interface {
// JWTLoginRequest is an object representing a login request with JWT
type JWTLoginRequest struct {
JWT string
Role string
AuthMethodName string
}
// ConsulClient is the interface that the nomad client uses to interact with
// Client is the interface that the nomad client uses to interact with
// Consul.
type ConsulClient interface {
type Client interface {
// DeriveSITokenWithJWT logs into Consul using JWT and retrieves a Consul
// SI ACL token.
DeriveSITokenWithJWT(map[string]JWTLoginRequest) (map[string]string, error)
@@ -71,11 +70,6 @@ func NewConsulClient(config *config.ConsulConfig, logger hclog.Logger) (*consulC
logger = logger.Named("consul")
// if UseIdentity is unset of set to false, return an empty client
if config.UseIdentity == nil || !*config.UseIdentity {
return nil, nil
}
c := &consulClient{
config: config,
logger: logger,
@@ -102,7 +96,7 @@ func NewConsulClient(config *config.ConsulConfig, logger hclog.Logger) (*consulC
}
// DeriveSITokenWithJWT takes a JWT from request and returns a consul token for
// each workload in the request
// each identity in the request
func (c *consulClient) DeriveSITokenWithJWT(reqs map[string]JWTLoginRequest) (map[string]string, error) {
tokens := make(map[string]string, len(reqs))
var mErr *multierror.Error

View File

@@ -10,24 +10,21 @@ import (
"github.com/hashicorp/nomad/helper"
)
type TaskIdentity struct {
TaskName string
IdentityName string
}
// AllocHookResources contains data that is provided by AllocRunner Hooks for
// consumption by TaskRunners. This should be instantiated once in the
// AllocRunner and then only accessed via getters and setters that hold the
// lock.
type AllocHookResources struct {
csiMounts map[string]*csimanager.MountInfo
csiMounts map[string]*csimanager.MountInfo
consulTokens map[string]map[string]string // Consul cluster -> service identity -> token
mu sync.RWMutex
}
func NewAllocHookResources() *AllocHookResources {
return &AllocHookResources{
csiMounts: map[string]*csimanager.MountInfo{},
csiMounts: map[string]*csimanager.MountInfo{},
consulTokens: map[string]map[string]string{},
}
}
@@ -48,3 +45,24 @@ func (a *AllocHookResources) SetCSIMounts(m map[string]*csimanager.MountInfo) {
a.csiMounts = m
}
// GetConsulTokens returns all the Consul tokens previously written by the
// consul allocrunner hook
func (a *AllocHookResources) GetConsulTokens() map[string]map[string]string {
a.mu.RLock()
defer a.mu.RUnlock()
return a.consulTokens
}
// SetConsulTokens merges a given map of Consul cluster names to task
// identities to Consul tokens with previously written data. This method is
// called by the allocrunner consul hook.
func (a *AllocHookResources) SetConsulTokens(m map[string]map[string]string) {
a.mu.Lock()
defer a.mu.Unlock()
for k, v := range m {
a.consulTokens[k] = v
}
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/go-jose/go-jose/v3"
"github.com/go-jose/go-jose/v3/jwt"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -91,3 +90,31 @@ func (m *MockWIDSigner) SignIdentities(minIndex uint64, req []*structs.WorkloadI
}
return swids, nil
}
// MockWIDMgr mocks IdentityManager interface allowing to only get identities
// signed by the mock signer.
type MockWIDMgr struct {
swids map[TaskIdentity]*structs.SignedWorkloadIdentity
}
func NewMockWIDMgr(swids map[TaskIdentity]*structs.SignedWorkloadIdentity) *MockWIDMgr {
return &MockWIDMgr{swids: swids}
}
// Run does not run a renewal loop in this mock
func (m MockWIDMgr) Run() error { return nil }
func (m MockWIDMgr) Get(identity TaskIdentity) (*structs.SignedWorkloadIdentity, error) {
sid, ok := m.swids[identity]
if !ok {
return nil, fmt.Errorf("identity not found")
}
return sid, nil
}
// Watch does not do anything, this mock doesn't support watching.
func (m MockWIDMgr) Watch(identity TaskIdentity) (<-chan *structs.SignedWorkloadIdentity, func()) {
return nil, nil
}
func (m MockWIDMgr) Shutdown() {}

View File

@@ -11,17 +11,23 @@ import (
"time"
"github.com/hashicorp/go-hclog"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
// TaskIdentity maps the name of the task to the name of a workload identity. Any
// task can have multiple identities.
type TaskIdentity struct {
TaskName string
IdentityName string
}
// IdentityManager defines a manager responsible for signing and renewing
// signed identities. At runtime it is implemented by *widmgr.WIDMgr.
type IdentityManager interface {
Run() error
Get(cstructs.TaskIdentity) (*structs.SignedWorkloadIdentity, error)
Watch(cstructs.TaskIdentity) (<-chan *structs.SignedWorkloadIdentity, func())
Get(TaskIdentity) (*structs.SignedWorkloadIdentity, error)
Watch(TaskIdentity) (<-chan *structs.SignedWorkloadIdentity, func())
Shutdown()
}
@@ -34,12 +40,12 @@ type WIDMgr struct {
// lastToken are the last retrieved signed workload identifiers keyed by
// TaskIdentity
lastToken map[cstructs.TaskIdentity]*structs.SignedWorkloadIdentity
lastToken map[TaskIdentity]*structs.SignedWorkloadIdentity
lastTokenLock sync.RWMutex
// watchers is a map of task identities to slices of channels (each identity
// can have multiple watchers)
watchers map[cstructs.TaskIdentity][]chan *structs.SignedWorkloadIdentity
watchers map[TaskIdentity][]chan *structs.SignedWorkloadIdentity
watchersLock sync.Mutex
// minWait is the minimum amount of time to wait before renewing. Settable to
@@ -73,8 +79,8 @@ func NewWIDMgr(signer IdentitySigner, a *structs.Allocation, logger hclog.Logger
widSpecs: widspecs,
signer: signer,
minWait: 10 * time.Second,
lastToken: map[cstructs.TaskIdentity]*structs.SignedWorkloadIdentity{},
watchers: map[cstructs.TaskIdentity][]chan *structs.SignedWorkloadIdentity{},
lastToken: map[TaskIdentity]*structs.SignedWorkloadIdentity{},
watchers: map[TaskIdentity][]chan *structs.SignedWorkloadIdentity{},
stopCtx: stopCtx,
stop: stop,
logger: logger.Named("widmgr"),
@@ -92,7 +98,7 @@ func (m *WIDMgr) SetMinWait(t time.Duration) {
// If an error is returned the identities could not be fetched and the renewal
// goroutine was not started.
func (m *WIDMgr) Run() error {
if len(m.widSpecs) == 0 {
if len(m.widSpecs) == 0 && len(m.defaultSignedIdentities) == 0 {
m.logger.Debug("no workload identities to retrieve or renew")
return nil
}
@@ -114,7 +120,7 @@ func (m *WIDMgr) Run() error {
// For retrieving tokens which might be renewed callers should use Watch
// instead to avoid missing new tokens retrieved by Run between Get and Watch
// calls.
func (m *WIDMgr) Get(id cstructs.TaskIdentity) (*structs.SignedWorkloadIdentity, error) {
func (m *WIDMgr) Get(id TaskIdentity) (*structs.SignedWorkloadIdentity, error) {
token := m.get(id)
if token == nil {
// This is an error as every identity should have a token by the time Get
@@ -125,7 +131,7 @@ func (m *WIDMgr) Get(id cstructs.TaskIdentity) (*structs.SignedWorkloadIdentity,
return token, nil
}
func (m *WIDMgr) get(id cstructs.TaskIdentity) *structs.SignedWorkloadIdentity {
func (m *WIDMgr) get(id TaskIdentity) *structs.SignedWorkloadIdentity {
m.lastTokenLock.RLock()
defer m.lastTokenLock.RUnlock()
@@ -137,7 +143,7 @@ func (m *WIDMgr) get(id cstructs.TaskIdentity) *structs.SignedWorkloadIdentity {
//
// The caller must call the returned func to stop watching and ensure the
// watched id actually exists, otherwise the channel never returns a result.
func (m *WIDMgr) Watch(id cstructs.TaskIdentity) (<-chan *structs.SignedWorkloadIdentity, func()) {
func (m *WIDMgr) Watch(id TaskIdentity) (<-chan *structs.SignedWorkloadIdentity, func()) {
// If Shutdown has been called return a closed chan
if m.stopCtx.Err() != nil {
c := make(chan *structs.SignedWorkloadIdentity)
@@ -190,9 +196,9 @@ func (m *WIDMgr) Shutdown() {
// getIdentities fetches all signed identities or returns an error.
func (m *WIDMgr) getIdentities() error {
// get the default identity signed by the plan applier
defaultTokens := map[cstructs.TaskIdentity]*structs.SignedWorkloadIdentity{}
defaultTokens := map[TaskIdentity]*structs.SignedWorkloadIdentity{}
for taskName, signature := range m.defaultSignedIdentities {
id := cstructs.TaskIdentity{
id := TaskIdentity{
TaskName: taskName,
IdentityName: "default",
}
@@ -208,7 +214,7 @@ func (m *WIDMgr) getIdentities() error {
}
}
if len(m.widSpecs) == 0 {
if len(m.widSpecs) == 0 && len(defaultTokens) == 0 {
return nil
}
@@ -227,9 +233,13 @@ func (m *WIDMgr) getIdentities() error {
}
// Get signed workload identities
signedWIDs, err := m.signer.SignIdentities(m.minIndex, reqs)
if err != nil {
return err
signedWIDs := []*structs.SignedWorkloadIdentity{}
if len(m.widSpecs) != 0 {
var err error
signedWIDs, err = m.signer.SignIdentities(m.minIndex, reqs)
if err != nil {
return err
}
}
// Store default identity tokens
@@ -239,7 +249,7 @@ func (m *WIDMgr) getIdentities() error {
// Index initial workload identities by name
for _, swid := range signedWIDs {
id := cstructs.TaskIdentity{
id := TaskIdentity{
TaskName: swid.TaskName,
IdentityName: swid.IdentityName,
}
@@ -288,7 +298,7 @@ func (m *WIDMgr) renew() {
}
//FIXME make this less ugly
token := m.get(cstructs.TaskIdentity{
token := m.get(TaskIdentity{
TaskName: taskName,
IdentityName: wid.Name,
})
@@ -356,7 +366,7 @@ func (m *WIDMgr) renew() {
minExp = time.Time{}
for _, token := range tokens {
id := cstructs.TaskIdentity{
id := TaskIdentity{
TaskName: token.TaskName,
IdentityName: token.IdentityName,
}
@@ -384,7 +394,7 @@ func (m *WIDMgr) renew() {
}
// send must be called while holding the m.watchersLock
func (m *WIDMgr) send(id cstructs.TaskIdentity, token *structs.SignedWorkloadIdentity) {
func (m *WIDMgr) send(id TaskIdentity, token *structs.SignedWorkloadIdentity) {
w, ok := m.watchers[id]
if !ok {
// No watchers