mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 18:05:42 +03:00
client: enable envoy bootstrap hook to set SI token
When creating the envoy bootstrap configuration, we should append the "-token=<token>" argument in the case where the sidsHook placed the token in the secrets directory.
This commit is contained in:
@@ -4,20 +4,28 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var _ interfaces.TaskPrestartHook = &envoyBootstrapHook{}
|
||||
const envoyBootstrapHookName = "envoy_bootstrap"
|
||||
|
||||
type envoyBootstrapHookConfig struct {
|
||||
alloc *structs.Allocation
|
||||
consulHTTPAddr string
|
||||
logger hclog.Logger
|
||||
}
|
||||
|
||||
const (
|
||||
envoyBaseAdminPort = 19000
|
||||
@@ -27,6 +35,7 @@ const (
|
||||
// envoyBootstrapHook writes the bootstrap config for the Connect Envoy proxy
|
||||
// sidecar.
|
||||
type envoyBootstrapHook struct {
|
||||
// alloc is the allocation with the envoy task being bootstrapped.
|
||||
alloc *structs.Allocation
|
||||
|
||||
// Bootstrapping Envoy requires talking directly to Consul to generate
|
||||
@@ -34,20 +43,23 @@ type envoyBootstrapHook struct {
|
||||
// Consul's gRPC endpoint.
|
||||
consulHTTPAddr string
|
||||
|
||||
logger log.Logger
|
||||
// executable is executable file that is consul
|
||||
executable string
|
||||
|
||||
// logger is used to log things
|
||||
logger hclog.Logger
|
||||
}
|
||||
|
||||
func newEnvoyBootstrapHook(alloc *structs.Allocation, consulHTTPAddr string, logger log.Logger) *envoyBootstrapHook {
|
||||
h := &envoyBootstrapHook{
|
||||
alloc: alloc,
|
||||
consulHTTPAddr: consulHTTPAddr,
|
||||
func newEnvoyBootstrapHook(c *envoyBootstrapHookConfig) *envoyBootstrapHook {
|
||||
return &envoyBootstrapHook{
|
||||
alloc: c.alloc,
|
||||
consulHTTPAddr: c.consulHTTPAddr,
|
||||
logger: c.logger.Named(envoyBootstrapHookName),
|
||||
}
|
||||
h.logger = logger.Named(h.Name())
|
||||
return h
|
||||
}
|
||||
|
||||
func (envoyBootstrapHook) Name() string {
|
||||
return "envoy_bootstrap"
|
||||
return envoyBootstrapHookName
|
||||
}
|
||||
|
||||
func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
|
||||
@@ -59,7 +71,7 @@ func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *interfaces.TaskP
|
||||
|
||||
serviceName := req.Task.Kind.Value()
|
||||
if serviceName == "" {
|
||||
return fmt.Errorf("Connect proxy sidecar does not specify service name")
|
||||
return errors.New("connect proxy sidecar does not specify service name")
|
||||
}
|
||||
|
||||
tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup)
|
||||
@@ -73,13 +85,12 @@ func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *interfaces.TaskP
|
||||
}
|
||||
|
||||
if service == nil {
|
||||
return fmt.Errorf("Connect proxy sidecar task exists but no services configured with a sidecar")
|
||||
return errors.New("connect proxy sidecar task exists but no services configured with a sidecar")
|
||||
}
|
||||
|
||||
h.logger.Debug("bootstrapping Connect proxy sidecar", "task", req.Task.Name, "service", serviceName)
|
||||
|
||||
//TODO Should connect directly to Consul if the sidecar is running on
|
||||
// the host netns.
|
||||
//TODO Should connect directly to Consul if the sidecar is running on the host netns.
|
||||
grpcAddr := "unix://" + allocdir.AllocGRPCSocket
|
||||
|
||||
// Envoy runs an administrative API on the loopback interface. If multiple sidecars
|
||||
@@ -92,24 +103,36 @@ func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *interfaces.TaskP
|
||||
|
||||
// Envoy bootstrap configuration may contain a Consul token, so write
|
||||
// it to the secrets directory like Vault tokens.
|
||||
fn := filepath.Join(req.TaskDir.SecretsDir, "envoy_bootstrap.json")
|
||||
bootstrapFilePath := filepath.Join(req.TaskDir.SecretsDir, "envoy_bootstrap.json")
|
||||
|
||||
id := agentconsul.MakeAllocServiceID(h.alloc.ID, "group-"+tg.Name, service)
|
||||
h.logger.Debug("bootstrapping envoy", "sidecar_for", service.Name, "boostrap_file", fn, "sidecar_for_id", id, "grpc_addr", grpcAddr, "admin_bind", envoyAdminBind)
|
||||
|
||||
h.logger.Debug("bootstrapping envoy", "sidecar_for", service.Name, "bootstrap_file", bootstrapFilePath, "sidecar_for_id", id, "grpc_addr", grpcAddr, "admin_bind", envoyAdminBind)
|
||||
|
||||
siToken, err := h.maybeLoadSIToken(req.Task.Name, req.TaskDir.SecretsDir)
|
||||
if err != nil {
|
||||
h.logger.Error("failed to generate envoy bootstrap config", "sidecar_for", service.Name)
|
||||
return errors.Wrap(err, "failed to generate envoy bootstrap config")
|
||||
}
|
||||
h.logger.Debug("check for SI token for task", "task", req.Task.Name, "exists", siToken != "")
|
||||
|
||||
bootstrapArgs := envoyBootstrapArgs{
|
||||
sidecarFor: id,
|
||||
grpcAddr: grpcAddr,
|
||||
consulHTTPAddr: h.consulHTTPAddr,
|
||||
envoyAdminBind: envoyAdminBind,
|
||||
siToken: siToken,
|
||||
}.args()
|
||||
|
||||
// put old stuff in here
|
||||
// Since Consul services are registered asynchronously with this task
|
||||
// hook running, retry a small number of times with backoff.
|
||||
for tries := 3; ; tries-- {
|
||||
cmd := exec.CommandContext(ctx, "consul", "connect", "envoy",
|
||||
"-grpc-addr", grpcAddr,
|
||||
"-http-addr", h.consulHTTPAddr,
|
||||
"-admin-bind", envoyAdminBind,
|
||||
"-bootstrap",
|
||||
"-sidecar-for", id,
|
||||
)
|
||||
|
||||
cmd := exec.CommandContext(ctx, "consul", bootstrapArgs...)
|
||||
|
||||
// Redirect output to secrets/envoy_bootstrap.json
|
||||
fd, err := os.Create(fn)
|
||||
fd, err := os.Create(bootstrapFilePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating secrets/envoy_bootstrap.json for envoy: %v", err)
|
||||
}
|
||||
@@ -122,7 +145,7 @@ func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *interfaces.TaskP
|
||||
err = cmd.Run()
|
||||
|
||||
// Close bootstrap.json
|
||||
fd.Close()
|
||||
_ = fd.Close()
|
||||
|
||||
if err == nil {
|
||||
// Happy path! Bootstrap was created, exit.
|
||||
@@ -138,7 +161,7 @@ func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *interfaces.TaskP
|
||||
// occurs, and (b) the file will either be rewritten on
|
||||
// retry or eventually garbage collected if the task
|
||||
// fails.
|
||||
os.Remove(fn)
|
||||
_ = os.Remove(bootstrapFilePath)
|
||||
|
||||
// ExitErrors are recoverable since they indicate the
|
||||
// command was runnable but exited with a unsuccessful
|
||||
@@ -174,3 +197,82 @@ func buildEnvoyAdminBind(alloc *structs.Allocation, taskName string) string {
|
||||
}
|
||||
return fmt.Sprintf("localhost:%d", port)
|
||||
}
|
||||
|
||||
func (h *envoyBootstrapHook) writeConfig(filename, config string) error {
|
||||
if err := ioutil.WriteFile(filename, []byte(config), 0440); err != nil {
|
||||
_ = os.Remove(filename)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (_ *envoyBootstrapHook) retry(ctx context.Context) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case <-time.After(2 * time.Second):
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (h *envoyBootstrapHook) execute(cmd *exec.Cmd) (string, error) {
|
||||
var (
|
||||
stdout bytes.Buffer
|
||||
stderr bytes.Buffer
|
||||
)
|
||||
|
||||
cmd.Stdout = &stdout
|
||||
cmd.Stderr = &stderr
|
||||
|
||||
if err := cmd.Run(); err != nil {
|
||||
_, recoverable := err.(*exec.ExitError)
|
||||
// ExitErrors are recoverable since they indicate the
|
||||
// command was runnable but exited with a unsuccessful
|
||||
// error code.
|
||||
return stderr.String(), structs.NewRecoverableError(err, recoverable)
|
||||
}
|
||||
return stdout.String(), nil
|
||||
}
|
||||
|
||||
type envoyBootstrapArgs struct {
|
||||
sidecarFor string
|
||||
grpcAddr string
|
||||
envoyAdminBind string
|
||||
consulHTTPAddr string
|
||||
siToken string
|
||||
}
|
||||
|
||||
func (e envoyBootstrapArgs) args() []string {
|
||||
arguments := []string{
|
||||
"connect",
|
||||
"envoy",
|
||||
"-grpc-addr", e.grpcAddr,
|
||||
"-http-addr", e.consulHTTPAddr,
|
||||
"-admin-bind", e.envoyAdminBind,
|
||||
"-bootstrap",
|
||||
"-sidecar-for", e.sidecarFor,
|
||||
}
|
||||
if e.siToken != "" {
|
||||
arguments = append(arguments, []string{"-token", e.siToken}...)
|
||||
}
|
||||
return arguments
|
||||
}
|
||||
|
||||
// maybeLoadSIToken reads the SI token saved to disk in the secretes directory
|
||||
// by the service identities prestart hook. This envoy bootstrap hook blocks
|
||||
// until the sids hook completes, so if the SI token is required to exist (i.e.
|
||||
// Consul ACLs are enabled), it will be in place by the time we try to read it.
|
||||
func (h *envoyBootstrapHook) maybeLoadSIToken(task, dir string) (string, error) {
|
||||
tokenPath := filepath.Join(dir, sidsTokenFile)
|
||||
token, err := ioutil.ReadFile(tokenPath)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
h.logger.Error("failed to load SI token", "task", task, "error", err)
|
||||
return "", errors.Wrapf(err, "failed to load SI token for %s", task)
|
||||
}
|
||||
h.logger.Trace("no SI token to load", "task", task)
|
||||
return "", nil // token file does not exist
|
||||
}
|
||||
h.logger.Trace("recovered pre-existing SI token", "task", task)
|
||||
return string(token), nil
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/args"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -24,6 +25,213 @@ import (
|
||||
|
||||
var _ interfaces.TaskPrestartHook = (*envoyBootstrapHook)(nil)
|
||||
|
||||
func writeTmp(t *testing.T, s string, fm os.FileMode) string {
|
||||
dir, err := ioutil.TempDir("", "envoy-")
|
||||
require.NoError(t, err)
|
||||
|
||||
fPath := filepath.Join(dir, sidsTokenFile)
|
||||
err = ioutil.WriteFile(fPath, []byte(s), fm)
|
||||
require.NoError(t, err)
|
||||
|
||||
return dir
|
||||
}
|
||||
|
||||
func TestEnvoyBootstrapHook_maybeLoadSIToken(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("file does not exist", func(t *testing.T) {
|
||||
h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{logger: testlog.HCLogger(t)})
|
||||
config, err := h.maybeLoadSIToken("task1", "/does/not/exist")
|
||||
require.NoError(t, err) // absence of token is not an error
|
||||
require.Equal(t, "", config)
|
||||
})
|
||||
|
||||
t.Run("load token from file", func(t *testing.T) {
|
||||
token := uuid.Generate()
|
||||
f := writeTmp(t, token, 0440)
|
||||
defer cleanupDir(t, f)
|
||||
|
||||
h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{logger: testlog.HCLogger(t)})
|
||||
config, err := h.maybeLoadSIToken("task1", f)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, token, config)
|
||||
})
|
||||
|
||||
t.Run("file is unreadable", func(t *testing.T) {
|
||||
token := uuid.Generate()
|
||||
f := writeTmp(t, token, 0200)
|
||||
defer cleanupDir(t, f)
|
||||
|
||||
h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{logger: testlog.HCLogger(t)})
|
||||
config, err := h.maybeLoadSIToken("task1", f)
|
||||
require.Error(t, err)
|
||||
require.False(t, os.IsNotExist(err))
|
||||
require.Equal(t, "", config)
|
||||
})
|
||||
}
|
||||
|
||||
func TestEnvoyBootstrapHook_envoyBootstrapArgs(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("excluding SI token", func(t *testing.T) {
|
||||
ebArgs := envoyBootstrapArgs{
|
||||
sidecarFor: "s1",
|
||||
grpcAddr: "1.1.1.1",
|
||||
consulHTTPAddr: "2.2.2.2",
|
||||
envoyAdminBind: "localhost:3333",
|
||||
}
|
||||
args := ebArgs.args()
|
||||
require.Equal(t, []string{"connect", "envoy",
|
||||
"-grpc-addr", "1.1.1.1",
|
||||
"-http-addr", "2.2.2.2",
|
||||
"-admin-bind", "localhost:3333",
|
||||
"-bootstrap",
|
||||
"-sidecar-for", "s1",
|
||||
}, args)
|
||||
})
|
||||
|
||||
t.Run("including SI token", func(t *testing.T) {
|
||||
token := uuid.Generate()
|
||||
ebArgs := envoyBootstrapArgs{
|
||||
sidecarFor: "s1",
|
||||
grpcAddr: "1.1.1.1",
|
||||
consulHTTPAddr: "2.2.2.2",
|
||||
envoyAdminBind: "localhost:3333",
|
||||
siToken: token,
|
||||
}
|
||||
args := ebArgs.args()
|
||||
require.Equal(t, []string{"connect", "envoy",
|
||||
"-grpc-addr", "1.1.1.1",
|
||||
"-http-addr", "2.2.2.2",
|
||||
"-admin-bind", "localhost:3333",
|
||||
"-bootstrap",
|
||||
"-sidecar-for", "s1",
|
||||
"-token", token,
|
||||
}, args)
|
||||
})
|
||||
}
|
||||
|
||||
// dig through envoy config to look for consul token
|
||||
type envoyConfig struct {
|
||||
DynamicResources struct {
|
||||
ADSConfig struct {
|
||||
GRPCServices struct {
|
||||
InitialMetadata []struct {
|
||||
Key string `json:"key"`
|
||||
Value string `json:"value"`
|
||||
} `json:"initial_metadata"`
|
||||
} `json:"grpc_services"`
|
||||
} `json:"ads_config"`
|
||||
} `json:"dynamic_resources"`
|
||||
}
|
||||
|
||||
func TestEnvoyBootstrapHook_with_SI_token(t *testing.T) {
|
||||
t.Parallel()
|
||||
testutil.RequireConsul(t)
|
||||
|
||||
testconsul, err := consultest.NewTestServerConfig(func(c *consultest.TestServerConfig) {
|
||||
// If -v wasn't specified squelch consul logging
|
||||
if !testing.Verbose() {
|
||||
c.Stdout = ioutil.Discard
|
||||
c.Stderr = ioutil.Discard
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("error starting test consul server: %v", err)
|
||||
}
|
||||
defer testconsul.Stop()
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.AllocatedResources.Shared.Networks = []*structs.NetworkResource{
|
||||
{
|
||||
Mode: "bridge",
|
||||
IP: "10.0.0.1",
|
||||
DynamicPorts: []structs.Port{
|
||||
{
|
||||
Label: "connect-proxy-foo",
|
||||
Value: 9999,
|
||||
To: 9999,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
tg := alloc.Job.TaskGroups[0]
|
||||
tg.Services = []*structs.Service{
|
||||
{
|
||||
Name: "foo",
|
||||
PortLabel: "9999", // Just need a valid port, nothing will bind to it
|
||||
Connect: &structs.ConsulConnect{
|
||||
SidecarService: &structs.ConsulSidecarService{},
|
||||
},
|
||||
},
|
||||
}
|
||||
sidecarTask := &structs.Task{
|
||||
Name: "sidecar",
|
||||
Kind: "connect-proxy:foo",
|
||||
}
|
||||
tg.Tasks = append(tg.Tasks, sidecarTask)
|
||||
|
||||
logger := testlog.HCLogger(t)
|
||||
|
||||
allocDir, cleanup := allocdir.TestAllocDir(t, logger, "EnvoyBootstrap")
|
||||
defer cleanup()
|
||||
|
||||
// Register Group Services
|
||||
consulConfig := consulapi.DefaultConfig()
|
||||
consulConfig.Address = testconsul.HTTPAddr
|
||||
consulAPIClient, err := consulapi.NewClient(consulConfig)
|
||||
require.NoError(t, err)
|
||||
|
||||
consulClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), logger, true)
|
||||
go consulClient.Run()
|
||||
defer consulClient.Shutdown()
|
||||
require.NoError(t, consulClient.RegisterWorkload(agentconsul.BuildAllocServices(mock.Node(), alloc, agentconsul.NoopRestarter())))
|
||||
|
||||
// Run Connect bootstrap Hook
|
||||
h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{
|
||||
alloc: alloc,
|
||||
consulHTTPAddr: testconsul.HTTPAddr,
|
||||
logger: logger,
|
||||
})
|
||||
req := &interfaces.TaskPrestartRequest{
|
||||
Task: sidecarTask,
|
||||
TaskDir: allocDir.NewTaskDir(sidecarTask.Name),
|
||||
}
|
||||
require.NoError(t, req.TaskDir.Build(false, nil))
|
||||
|
||||
// Insert service identity token in the secrets directory
|
||||
token := uuid.Generate()
|
||||
siTokenFile := filepath.Join(req.TaskDir.SecretsDir, sidsTokenFile)
|
||||
err = ioutil.WriteFile(siTokenFile, []byte(token), 0440)
|
||||
require.NoError(t, err)
|
||||
|
||||
resp := &interfaces.TaskPrestartResponse{}
|
||||
|
||||
// Run the hook
|
||||
require.NoError(t, h.Prestart(context.Background(), req, resp))
|
||||
|
||||
// Assert it is Done
|
||||
require.True(t, resp.Done)
|
||||
|
||||
// Ensure the default path matches
|
||||
env := map[string]string{
|
||||
taskenv.SecretsDir: req.TaskDir.SecretsDir,
|
||||
}
|
||||
f, err := os.Open(args.ReplaceEnv(structs.EnvoyBootstrapPath, env))
|
||||
require.NoError(t, err)
|
||||
defer f.Close()
|
||||
|
||||
// Assert bootstrap configuration is valid json
|
||||
var out envoyConfig
|
||||
require.NoError(t, json.NewDecoder(f).Decode(&out))
|
||||
|
||||
// Assert the SI token got set
|
||||
key := out.DynamicResources.ADSConfig.GRPCServices.InitialMetadata[0].Key
|
||||
value := out.DynamicResources.ADSConfig.GRPCServices.InitialMetadata[0].Value
|
||||
require.Equal(t, "x-consul-token", key)
|
||||
require.Equal(t, token, value)
|
||||
}
|
||||
|
||||
// TestTaskRunner_EnvoyBootstrapHook_Prestart asserts the EnvoyBootstrapHook
|
||||
// creates Envoy's bootstrap.json configuration based on Connect proxy sidecars
|
||||
// registered for the task.
|
||||
@@ -83,13 +291,18 @@ func TestTaskRunner_EnvoyBootstrapHook_Ok(t *testing.T) {
|
||||
consulConfig.Address = testconsul.HTTPAddr
|
||||
consulAPIClient, err := consulapi.NewClient(consulConfig)
|
||||
require.NoError(t, err)
|
||||
|
||||
consulClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), logger, true)
|
||||
go consulClient.Run()
|
||||
defer consulClient.Shutdown()
|
||||
require.NoError(t, consulClient.RegisterWorkload(agentconsul.BuildAllocServices(mock.Node(), alloc, agentconsul.NoopRestarter())))
|
||||
|
||||
// Run Connect bootstrap Hook
|
||||
h := newEnvoyBootstrapHook(alloc, testconsul.HTTPAddr, logger)
|
||||
h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{
|
||||
alloc: alloc,
|
||||
consulHTTPAddr: testconsul.HTTPAddr,
|
||||
logger: logger,
|
||||
})
|
||||
req := &interfaces.TaskPrestartRequest{
|
||||
Task: sidecarTask,
|
||||
TaskDir: allocDir.NewTaskDir(sidecarTask.Name),
|
||||
@@ -116,8 +329,14 @@ func TestTaskRunner_EnvoyBootstrapHook_Ok(t *testing.T) {
|
||||
defer f.Close()
|
||||
|
||||
// Assert bootstrap configuration is valid json
|
||||
var out map[string]interface{}
|
||||
var out envoyConfig
|
||||
require.NoError(t, json.NewDecoder(f).Decode(&out))
|
||||
|
||||
// Assert no SI token got set
|
||||
key := out.DynamicResources.ADSConfig.GRPCServices.InitialMetadata[0].Key
|
||||
value := out.DynamicResources.ADSConfig.GRPCServices.InitialMetadata[0].Value
|
||||
require.Equal(t, "x-consul-token", key)
|
||||
require.Equal(t, "", value)
|
||||
}
|
||||
|
||||
// TestTaskRunner_EnvoyBootstrapHook_Noop asserts that the Envoy bootstrap hook
|
||||
@@ -134,7 +353,11 @@ func TestTaskRunner_EnvoyBootstrapHook_Noop(t *testing.T) {
|
||||
|
||||
// Run Envoy bootstrap Hook. Use invalid Consul address as it should
|
||||
// not get hit.
|
||||
h := newEnvoyBootstrapHook(alloc, "http://127.0.0.2:1", logger)
|
||||
h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{
|
||||
alloc: alloc,
|
||||
consulHTTPAddr: "http://127.0.0.2:1",
|
||||
logger: logger,
|
||||
})
|
||||
req := &interfaces.TaskPrestartRequest{
|
||||
Task: task,
|
||||
TaskDir: allocDir.NewTaskDir(task.Name),
|
||||
@@ -214,7 +437,11 @@ func TestTaskRunner_EnvoyBootstrapHook_RecoverableError(t *testing.T) {
|
||||
// not running.
|
||||
|
||||
// Run Connect bootstrap Hook
|
||||
h := newEnvoyBootstrapHook(alloc, testconsul.HTTPAddr, logger)
|
||||
h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{
|
||||
alloc: alloc,
|
||||
consulHTTPAddr: testconsul.HTTPAddr,
|
||||
logger: logger,
|
||||
})
|
||||
req := &interfaces.TaskPrestartRequest{
|
||||
Task: sidecarTask,
|
||||
TaskDir: allocDir.NewTaskDir(sidecarTask.Name),
|
||||
@@ -225,7 +452,7 @@ func TestTaskRunner_EnvoyBootstrapHook_RecoverableError(t *testing.T) {
|
||||
|
||||
// Run the hook
|
||||
err = h.Prestart(context.Background(), req, resp)
|
||||
require.Error(t, err)
|
||||
require.EqualError(t, err, "error creating bootstrap configuration for Connect proxy sidecar: exit status 1")
|
||||
require.True(t, structs.IsRecoverable(err))
|
||||
|
||||
// Assert it is not Done
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
ti "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
@@ -51,7 +51,7 @@ type sidsHook struct {
|
||||
alloc *structs.Allocation
|
||||
|
||||
// taskName is the name of the task
|
||||
taskName string
|
||||
task *structs.Task
|
||||
|
||||
// sidsClient is the Consul client [proxy] for requesting SI tokens
|
||||
sidsClient consul.ServiceIdentityAPI
|
||||
@@ -72,7 +72,7 @@ type sidsHook struct {
|
||||
func newSIDSHook(c sidsHookConfig) *sidsHook {
|
||||
return &sidsHook{
|
||||
alloc: c.alloc,
|
||||
taskName: c.task.Name,
|
||||
task: c.task,
|
||||
sidsClient: c.sidsClient,
|
||||
lifecycle: c.lifecycle,
|
||||
logger: c.logger.Named(sidsHookName),
|
||||
@@ -87,13 +87,14 @@ func (h *sidsHook) Name() string {
|
||||
func (h *sidsHook) Prestart(
|
||||
ctx context.Context,
|
||||
req *interfaces.TaskPrestartRequest,
|
||||
_ *interfaces.TaskPrestartResponse) error {
|
||||
resp *interfaces.TaskPrestartResponse) error {
|
||||
|
||||
h.lock.Lock()
|
||||
defer h.lock.Unlock()
|
||||
|
||||
// do nothing if we have already done things
|
||||
if h.earlyExit() {
|
||||
resp.Done = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -113,6 +114,9 @@ func (h *sidsHook) Prestart(
|
||||
}
|
||||
}
|
||||
|
||||
h.logger.Info("derived SI token", "task", h.task.Name, "si_task", h.task.Kind.Value())
|
||||
|
||||
resp.Done = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -145,12 +149,13 @@ func (h *sidsHook) recoverToken(dir string) (string, error) {
|
||||
token, err := ioutil.ReadFile(tokenPath)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
h.logger.Error("failed to recover SI token", "error", err)
|
||||
return "", errors.Wrap(err, "failed to recover SI token")
|
||||
}
|
||||
h.logger.Trace("no pre-existing SI token to recover", "task", h.taskName)
|
||||
h.logger.Trace("no pre-existing SI token to recover", "task", h.task.Name)
|
||||
return "", nil // token file does not exist yet
|
||||
}
|
||||
h.logger.Trace("recovered pre-existing SI token", "task", h.taskName)
|
||||
h.logger.Trace("recovered pre-existing SI token", "task", h.task.Name)
|
||||
return string(token), nil
|
||||
}
|
||||
|
||||
@@ -185,25 +190,30 @@ func (h *sidsHook) kill(ctx context.Context, err error) {
|
||||
|
||||
// tryDerive loops forever until a token is created, or ctx is done.
|
||||
func (h *sidsHook) tryDerive(ctx context.Context, ch chan<- string) {
|
||||
// Derive the SI token using the name of the proxied / native task, not the
|
||||
// name of the literal sidecar task. The virtual ACL policy of the SI token
|
||||
// is oriented this way.
|
||||
siTaskName := h.task.Kind.Value()
|
||||
|
||||
for attempt := 0; backoff(ctx, attempt); attempt++ {
|
||||
|
||||
tokens, err := h.sidsClient.DeriveSITokens(h.alloc, []string{h.taskName})
|
||||
tokens, err := h.sidsClient.DeriveSITokens(h.alloc, []string{siTaskName})
|
||||
|
||||
switch {
|
||||
|
||||
case err == nil:
|
||||
// nothing broke and we can return the token for the task
|
||||
ch <- tokens[h.taskName]
|
||||
ch <- tokens[siTaskName]
|
||||
return
|
||||
|
||||
case structs.IsServerSide(err):
|
||||
// the error is known to be a server problem, just die
|
||||
h.logger.Error("failed to derive SI token", "error", err, "server_side", true)
|
||||
h.logger.Error("failed to derive SI token", "error", err, "task", h.task.Name, "si_task", siTaskName, "server_side", true)
|
||||
h.kill(ctx, errors.Wrap(err, "consul: failed to derive SI token"))
|
||||
|
||||
case !structs.IsRecoverable(err):
|
||||
// the error is known not to be recoverable, just die
|
||||
h.logger.Error("failed to derive SI token", "error", err, "recoverable", false)
|
||||
h.logger.Error("failed to derive SI token", "error", err, "task", h.task.Name, "si_task", siTaskName, "recoverable", false)
|
||||
h.kill(ctx, errors.Wrap(err, "consul: failed to derive SI token"))
|
||||
|
||||
default:
|
||||
@@ -213,8 +223,8 @@ func (h *sidsHook) tryDerive(ctx context.Context, ch chan<- string) {
|
||||
}
|
||||
}
|
||||
|
||||
func backoff(ctx context.Context, i int) bool {
|
||||
next := computeBackoff(i)
|
||||
func backoff(ctx context.Context, attempt int) bool {
|
||||
next := computeBackoff(attempt)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
|
||||
@@ -28,6 +28,12 @@ func cleanupDir(t *testing.T, dir string) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func sidecar(task string) (string, structs.TaskKind) {
|
||||
name := structs.ConnectProxyPrefix + "-" + task
|
||||
kind := structs.TaskKind(structs.ConnectProxyPrefix + ":" + task)
|
||||
return name, kind
|
||||
}
|
||||
|
||||
func TestSIDSHook_recoverToken(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
@@ -35,8 +41,13 @@ func TestSIDSHook_recoverToken(t *testing.T) {
|
||||
secrets := tmpDir(t)
|
||||
defer cleanupDir(t, secrets)
|
||||
|
||||
taskName, taskKind := sidecar("foo")
|
||||
|
||||
h := newSIDSHook(sidsHookConfig{
|
||||
task: &structs.Task{Name: "task1"},
|
||||
task: &structs.Task{
|
||||
Name: taskName,
|
||||
Kind: taskKind,
|
||||
},
|
||||
logger: testlog.HCLogger(t),
|
||||
})
|
||||
|
||||
@@ -56,8 +67,13 @@ func TestSIDSHook_recoverToken_empty(t *testing.T) {
|
||||
secrets := tmpDir(t)
|
||||
defer cleanupDir(t, secrets)
|
||||
|
||||
taskName, taskKind := sidecar("foo")
|
||||
|
||||
h := newSIDSHook(sidsHookConfig{
|
||||
task: &structs.Task{Name: "task1"},
|
||||
task: &structs.Task{
|
||||
Name: taskName,
|
||||
Kind: taskKind,
|
||||
},
|
||||
logger: testlog.HCLogger(t),
|
||||
})
|
||||
|
||||
@@ -73,9 +89,14 @@ func TestSIDSHook_deriveSIToken(t *testing.T) {
|
||||
secrets := tmpDir(t)
|
||||
defer cleanupDir(t, secrets)
|
||||
|
||||
taskName, taskKind := sidecar("task1")
|
||||
|
||||
h := newSIDSHook(sidsHookConfig{
|
||||
alloc: &structs.Allocation{ID: "a1"},
|
||||
task: &structs.Task{Name: "task1"},
|
||||
alloc: &structs.Allocation{ID: "a1"},
|
||||
task: &structs.Task{
|
||||
Name: taskName,
|
||||
Kind: taskKind,
|
||||
},
|
||||
logger: testlog.HCLogger(t),
|
||||
sidsClient: consul.NewMockServiceIdentitiesClient(),
|
||||
})
|
||||
@@ -83,7 +104,7 @@ func TestSIDSHook_deriveSIToken(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
token, err := h.deriveSIToken(ctx)
|
||||
r.NoError(err)
|
||||
r.True(helper.IsUUID(token))
|
||||
r.True(helper.IsUUID(token), "token: %q", token)
|
||||
}
|
||||
|
||||
func TestSIDSHook_computeBackoff(t *testing.T) {
|
||||
|
||||
@@ -67,7 +67,6 @@ func (tr *TaskRunner) initHooks() {
|
||||
newArtifactHook(tr, hookLogger),
|
||||
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger),
|
||||
newDeviceHook(tr.devicemanager, hookLogger),
|
||||
newEnvoyBootstrapHook(alloc, tr.clientConfig.ConsulConfig.Addr, hookLogger),
|
||||
}
|
||||
|
||||
// If Vault is enabled, add the hook
|
||||
@@ -107,6 +106,8 @@ func (tr *TaskRunner) initHooks() {
|
||||
}))
|
||||
}
|
||||
|
||||
// If this is a Connect sidecar proxy (or a Connect Native) service,
|
||||
// add the sidsHook for requesting a Service Identity token (if ACLs).
|
||||
if task.UsesConnect() {
|
||||
tr.runnerHooks = append(tr.runnerHooks, newSIDSHook(sidsHookConfig{
|
||||
alloc: tr.Alloc(),
|
||||
@@ -117,6 +118,13 @@ func (tr *TaskRunner) initHooks() {
|
||||
}))
|
||||
}
|
||||
|
||||
// envoy bootstrap must execute after sidsHook maybe sets SI token
|
||||
tr.runnerHooks = append(tr.runnerHooks, newEnvoyBootstrapHook(&envoyBootstrapHookConfig{
|
||||
alloc: alloc,
|
||||
consulHTTPAddr: tr.clientConfig.ConsulConfig.Addr,
|
||||
logger: hookLogger,
|
||||
}))
|
||||
|
||||
// If there are any script checks, add the hook
|
||||
scriptCheckHook := newScriptCheckHook(scriptCheckHookConfig{
|
||||
alloc: tr.Alloc(),
|
||||
|
||||
@@ -1082,29 +1082,31 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) {
|
||||
actualEvents[i] = string(e.Type)
|
||||
}
|
||||
require.Equal(t, actualEvents, expectedEvents)
|
||||
|
||||
require.Equal(t, structs.TaskStateDead, state.State)
|
||||
require.True(t, state.Failed, pretty.Sprint(state))
|
||||
}
|
||||
|
||||
type mockEnvoyBootstrapHook struct{}
|
||||
type mockEnvoyBootstrapHook struct {
|
||||
// nothing
|
||||
}
|
||||
|
||||
func (mockEnvoyBootstrapHook) Name() string {
|
||||
func (_ *mockEnvoyBootstrapHook) Name() string {
|
||||
return "mock_envoy_bootstrap"
|
||||
}
|
||||
|
||||
func (*mockEnvoyBootstrapHook) Prestart(_ context.Context, _ *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
|
||||
func (m *mockEnvoyBootstrapHook) Prestart(_ context.Context, _ *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
|
||||
resp.Done = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// The envoy bootstrap hook tries to connect to consul and run the envoy
|
||||
// bootstrap command, so turn it off when testing connect jobs that are not
|
||||
// using envoy (for now?).
|
||||
func disableEnvoyBootstrapHook(tr *TaskRunner) {
|
||||
// using envoy.
|
||||
func useMockEnvoyBootstrapHook(tr *TaskRunner) {
|
||||
mock := new(mockEnvoyBootstrapHook)
|
||||
for i, hook := range tr.runnerHooks {
|
||||
if _, ok := hook.(*envoyBootstrapHook); ok {
|
||||
tr.runnerHooks[i] = new(mockEnvoyBootstrapHook)
|
||||
tr.runnerHooks[i] = mock
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1138,7 +1140,8 @@ func TestTaskRunner_BlockForSIDSToken(t *testing.T) {
|
||||
tr, err := NewTaskRunner(trConfig)
|
||||
r.NoError(err)
|
||||
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||
disableEnvoyBootstrapHook(tr) // turn off envoy bootstrap
|
||||
useMockEnvoyBootstrapHook(tr) // mock the envoy bootstrap hook
|
||||
|
||||
go tr.Run()
|
||||
|
||||
// assert task runner blocks on SI token
|
||||
@@ -1189,10 +1192,12 @@ func TestTaskRunner_DeriveSIToken_Retry(t *testing.T) {
|
||||
|
||||
// control when we get a Consul SI token
|
||||
token := "12345678-1234-1234-1234-1234567890"
|
||||
siTaskName := task.Kind.Value()
|
||||
deriveCount := 0
|
||||
deriveFn := func(*structs.Allocation, []string) (map[string]string, error) {
|
||||
if deriveCount > 0 {
|
||||
return map[string]string{task.Name: token}, nil
|
||||
|
||||
return map[string]string{siTaskName: token}, nil
|
||||
}
|
||||
deriveCount++
|
||||
return nil, structs.NewRecoverableError(errors.New("try again later"), true)
|
||||
@@ -1204,7 +1209,7 @@ func TestTaskRunner_DeriveSIToken_Retry(t *testing.T) {
|
||||
tr, err := NewTaskRunner(trConfig)
|
||||
r.NoError(err)
|
||||
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||
disableEnvoyBootstrapHook(tr) // turn off envoy bootstrap
|
||||
useMockEnvoyBootstrapHook(tr) // mock the envoy bootstrap
|
||||
go tr.Run()
|
||||
|
||||
// assert task runner blocks on SI token
|
||||
@@ -1247,14 +1252,15 @@ func TestTaskRunner_DeriveSIToken_Unrecoverable(t *testing.T) {
|
||||
defer cleanup()
|
||||
|
||||
// SI token derivation suffers a non-retryable error
|
||||
siTaskName := task.Kind.Value()
|
||||
siClient := trConfig.ConsulSI.(*consulapi.MockServiceIdentitiesClient)
|
||||
siClient.SetDeriveTokenError(alloc.ID, []string{task.Name}, errors.New("non-recoverable"))
|
||||
siClient.SetDeriveTokenError(alloc.ID, []string{siTaskName}, errors.New("non-recoverable"))
|
||||
|
||||
tr, err := NewTaskRunner(trConfig)
|
||||
r.NoError(err)
|
||||
|
||||
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||
disableEnvoyBootstrapHook(tr) // turn off envoy bootstrap
|
||||
useMockEnvoyBootstrapHook(tr) // mock the envoy bootstrap hook
|
||||
go tr.Run()
|
||||
|
||||
// Wait for the task to die
|
||||
@@ -1826,7 +1832,7 @@ func TestTaskRunner_RestartSignalTask_NotRunning(t *testing.T) {
|
||||
require.Fail(t, "timed out waiting for task to complete")
|
||||
}
|
||||
|
||||
// Assert the task ran and never restarted
|
||||
// Assert the task unblocked and never restarted
|
||||
state := tr.TaskState()
|
||||
require.Equal(t, structs.TaskStateDead, state.State)
|
||||
require.False(t, state.Failed)
|
||||
|
||||
@@ -148,7 +148,7 @@ func (t *tasklet) run() *taskletHandle {
|
||||
|
||||
select {
|
||||
case <-t.shutdownCh:
|
||||
// We've been told to exit and just ran so exit
|
||||
// We've been told to exit and just unblocked so exit
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
@@ -96,7 +96,7 @@ func (h *volumeHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartR
|
||||
return err
|
||||
}
|
||||
|
||||
// Because this hook is also ran on restores, we only add mounts that do not
|
||||
// Because this hook is also unblocked on restores, we only add mounts that do not
|
||||
// already exist. Although this loop is somewhat expensive, there are only
|
||||
// a small number of mounts that exist within most individual tasks. We may
|
||||
// want to revisit this using a `hookdata` param to be "mount only once"
|
||||
|
||||
Reference in New Issue
Block a user