Files
nomad/client/allocrunner/consul_grpc_sock_hook_test.go
Tim Gross e168548341 provide allocrunner hooks with prebuilt taskenv and fix mutation bugs (#25373)
Some of our allocrunner hooks require a task environment for interpolating values based on the node or allocation. But several of the hooks accept an already-built environment or builder and then keep that in memory. Both of these retain a copy of all the node attributes and allocation metadata, which balloons memory usage until the allocation is GC'd.

While we'd like to look into ways to avoid keeping the allocrunner around entirely (see #25372), for now we can significantly reduce memory usage by creating the task environment on-demand when calling allocrunner methods, rather than persisting it in the allocrunner hooks.

In doing so, we uncover two other bugs:
* The WID manager, the group service hook, and the checks hook have to interpolate services for specific tasks. They mutated a taskenv builder to do so, but each time they mutate the builder, they write to the same environment map. When a group has multiple tasks, it's possible for one task to set an environment variable that would then be interpolated in the service definition for another task if that task did not have that environment variable. Only the service definition interpolation is impacted. This does not leak env vars across running tasks, as each taskrunner has its own builder.

  To fix this, we move the `UpdateTask` method off the builder and onto the taskenv as the `WithTask` method. This makes a shallow copy of the taskenv with a deep clone of the environment map used for interpolation, and then overwrites the environment from the task.

* The checks hook interpolates Nomad native service checks only on `Prerun` and not on `Update`. This could cause unexpected deregistration and registration of checks during in-place updates. To fix this, we make sure we interpolate in the `Update` method.

I also bumped into an incorrectly implemented interface in the CSI hook. I've pulled that and some better guardrails out to https://github.com/hashicorp/nomad/pull/25472.

Fixes: https://github.com/hashicorp/nomad/issues/25269
Fixes: https://hashicorp.atlassian.net/browse/NET-12310
Ref: https://github.com/hashicorp/nomad/issues/25372
2025-03-24 12:05:04 -04:00

279 lines
6.9 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package allocrunner
import (
"bytes"
"context"
"fmt"
"net"
"path/filepath"
"sync"
"testing"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestConsulGRPCSocketHook_PrerunPostrun_Ok asserts that a proxy is started when the
// Consul unix socket hook's Prerun method is called and stopped with the
// Postrun method is called.
func TestConsulGRPCSocketHook_PrerunPostrun_Ok(t *testing.T) {
ci.Parallel(t)
// As of Consul 1.6.0 the test server does not support the gRPC
// endpoint so we have to fake it.
fakeConsul, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer fakeConsul.Close()
consulConfigs := map[string]*config.ConsulConfig{
structs.ConsulDefaultCluster: {
GRPCAddr: fakeConsul.Addr().String(),
}}
alloc := mock.ConnectAlloc()
logger := testlog.HCLogger(t)
allocDir, cleanup := allocdir.TestAllocDir(t, logger, "EnvoyBootstrap", alloc.ID)
defer cleanup()
// Start the unix socket proxy
h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{})
require.NoError(t, h.Prerun(nil))
gRPCSock := filepath.Join(allocDir.AllocDir, allocdir.AllocGRPCSocket)
envoyConn, err := net.Dial("unix", gRPCSock)
require.NoError(t, err)
// Write to Consul to ensure data is proxied out of the netns
input := bytes.Repeat([]byte{'X'}, 5*1024)
errCh := make(chan error, 1)
go func() {
_, err := envoyConn.Write(input)
errCh <- err
}()
// Accept the connection from the netns
consulConn, err := fakeConsul.Accept()
require.NoError(t, err)
defer consulConn.Close()
output := make([]byte, len(input))
_, err = consulConn.Read(output)
require.NoError(t, err)
require.NoError(t, <-errCh)
require.Equal(t, input, output)
// Read from Consul to ensure data is proxied into the netns
input = bytes.Repeat([]byte{'Y'}, 5*1024)
go func() {
_, err := consulConn.Write(input)
errCh <- err
}()
_, err = envoyConn.Read(output)
require.NoError(t, err)
require.NoError(t, <-errCh)
require.Equal(t, input, output)
// Stop the unix socket proxy
require.NoError(t, h.Postrun())
// Consul reads should error
n, err := consulConn.Read(output)
require.Error(t, err)
require.Zero(t, n)
// Envoy reads and writes should error
n, err = envoyConn.Write(input)
require.Error(t, err)
require.Zero(t, n)
n, err = envoyConn.Read(output)
require.Error(t, err)
require.Zero(t, n)
}
// TestConsulGRPCSocketHook_Prerun_Error asserts that invalid Consul addresses cause
// Prerun to return an error if the alloc requires a grpc proxy.
func TestConsulGRPCSocketHook_Prerun_Error(t *testing.T) {
ci.Parallel(t)
logger := testlog.HCLogger(t)
// A config without an Addr or GRPCAddr is invalid.
consulConfigs := map[string]*config.ConsulConfig{
structs.ConsulDefaultCluster: {}}
alloc := mock.Alloc()
connectAlloc := mock.ConnectAlloc()
allocDir, cleanup := allocdir.TestAllocDir(t, logger, "EnvoyBootstrap", alloc.ID)
defer cleanup()
{
// An alloc without a Connect proxy sidecar should not return
// an error.
h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{})
must.NoError(t, h.Prerun(nil))
// Postrun should be a noop
must.NoError(t, h.Postrun())
}
{
// An alloc *with* a Connect proxy sidecar *should* return an error
// when Consul is not configured.
h := newConsulGRPCSocketHook(logger, connectAlloc, allocDir, consulConfigs, map[string]string{})
must.ErrorContains(t, h.Prerun(nil), `consul address for cluster "" must be set on nomad client`)
// Postrun should be a noop
must.NoError(t, h.Postrun())
}
{
// Updating an alloc without a sidecar to have a sidecar should
// error when the sidecar is added.
h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfigs, map[string]string{})
must.NoError(t, h.Prerun(nil))
req := &interfaces.RunnerUpdateRequest{
Alloc: connectAlloc,
}
must.EqError(t, h.Update(req), "cannot update alloc to Connect in-place")
// Postrun should be a noop
must.NoError(t, h.Postrun())
}
}
// TestConsulGRPCSocketHook_proxy_Unix asserts that the destination can be a unix
// socket path.
func TestConsulGRPCSocketHook_proxy_Unix(t *testing.T) {
ci.Parallel(t)
dir := t.TempDir()
// Setup fake listener that would be inside the netns (normally a unix
// socket, but it doesn't matter for this test).
src, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer src.Close()
// Setup fake listener that would be Consul outside the netns. Use a
// socket as Consul may be configured to listen on a unix socket.
destFn := filepath.Join(dir, "fakeconsul.sock")
dest, err := net.Listen("unix", destFn)
require.NoError(t, err)
defer dest.Close()
// Collect errors (must have len > goroutines)
errCh := make(chan error, 10)
// Block until completion
wg := sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg.Add(1)
go func() {
defer wg.Done()
proxy(ctx, testlog.HCLogger(t), "unix://"+destFn, src)
}()
// Fake Envoy
// Connect and write to the src (netns) side of the proxy; then read
// and exit.
wg.Add(1)
go func() {
defer func() {
// Cancel after final read has completed (or an error
// has occurred)
cancel()
wg.Done()
}()
addr := src.Addr()
conn, err := net.Dial(addr.Network(), addr.String())
if err != nil {
errCh <- err
return
}
defer conn.Close()
if _, err := conn.Write([]byte{'X'}); err != nil {
errCh <- err
return
}
recv := make([]byte, 1)
if _, err := conn.Read(recv); err != nil {
errCh <- err
return
}
if expected := byte('Y'); recv[0] != expected {
errCh <- fmt.Errorf("expected %q but received: %q", expected, recv[0])
return
}
}()
// Fake Consul on a unix socket
// Listen, receive 1 byte, write a response, and exit
wg.Add(1)
go func() {
defer wg.Done()
conn, err := dest.Accept()
if err != nil {
errCh <- err
return
}
// Close listener now. No more connections expected.
if err := dest.Close(); err != nil {
errCh <- err
return
}
defer conn.Close()
recv := make([]byte, 1)
if _, err := conn.Read(recv); err != nil {
errCh <- err
return
}
if expected := byte('X'); recv[0] != expected {
errCh <- fmt.Errorf("expected %q but received: %q", expected, recv[0])
return
}
if _, err := conn.Write([]byte{'Y'}); err != nil {
errCh <- err
return
}
}()
// Wait for goroutines to complete
wg.Wait()
// Make sure no errors occurred
for len(errCh) > 0 {
assert.NoError(t, <-errCh)
}
}