Merge branch 'main' into f-NMD-763-introduction

This commit is contained in:
James Rasell
2025-08-05 08:56:51 +01:00
144 changed files with 3368 additions and 878 deletions

View File

@@ -10,19 +10,16 @@ import (
"io"
"time"
log "github.com/hashicorp/go-hclog"
metrics "github.com/hashicorp/go-metrics/compat"
"github.com/hashicorp/go-msgpack/v2/codec"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/host"
"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/command/agent/pprof"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/structs"
log "github.com/hashicorp/go-hclog"
metrics "github.com/hashicorp/go-metrics/compat"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
)
type Agent struct {
@@ -32,6 +29,7 @@ type Agent struct {
func NewAgentEndpoint(c *Client) *Agent {
a := &Agent{c: c}
a.c.streamingRpcs.Register("Agent.Monitor", a.monitor)
a.c.streamingRpcs.Register("Agent.MonitorExport", a.monitorExport)
return a
}
@@ -84,7 +82,6 @@ func (a *Agent) Profile(args *structs.AgentPprofRequest, reply *structs.AgentPpr
func (a *Agent) monitor(conn io.ReadWriteCloser) {
defer metrics.MeasureSince([]string{"client", "agent", "monitor"}, time.Now())
defer conn.Close()
// Decode arguments
var args cstructs.MonitorRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
@@ -117,7 +114,7 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
monitor := monitor.New(512, a.c.logger, &log.LoggerOptions{
m := monitor.New(512, a.c.logger, &log.LoggerOptions{
JSONFormat: args.LogJSON,
Level: logLevel,
IncludeLocation: args.LogIncludeLocation,
@@ -143,8 +140,8 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) {
<-ctx.Done()
}()
logCh := monitor.Start()
defer monitor.Stop()
logCh := m.Start()
defer m.Stop()
initialOffset := int64(0)
// receive logs and build frames
@@ -164,49 +161,11 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) {
case <-ctx.Done():
break LOOP
}
}
}()
var streamErr error
OUTER:
for {
select {
case frame, ok := <-frames:
if !ok {
// frame may have been closed when an error
// occurred. Check once more for an error.
select {
case streamErr = <-errCh:
// There was a pending error!
default:
// No error, continue on
}
break OUTER
}
var resp cstructs.StreamErrWrapper
if args.PlainText {
resp.Payload = frame.Data
} else {
if err := frameCodec.Encode(frame); err != nil {
streamErr = err
break OUTER
}
resp.Payload = buf.Bytes()
buf.Reset()
}
if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
}
encoder.Reset(conn)
case <-ctx.Done():
break OUTER
}
}
streamEncoder := monitor.NewStreamEncoder(&buf, conn, encoder, frameCodec, args.PlainText)
streamErr := streamEncoder.EncodeStream(frames, errCh, ctx, framer, false)
if streamErr != nil {
handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder)
@@ -214,7 +173,7 @@ OUTER:
}
}
// Host collects data about the host evironment running the agent
// Host collects data about the host environment running the agent
func (a *Agent) Host(args *structs.HostDataRequest, reply *structs.HostDataResponse) error {
aclObj, err := a.c.ResolveToken(args.AuthToken)
if err != nil {
@@ -233,3 +192,94 @@ func (a *Agent) Host(args *structs.HostDataRequest, reply *structs.HostDataRespo
reply.HostData = data
return nil
}
func (a *Agent) monitorExport(conn io.ReadWriteCloser) {
defer conn.Close()
// Decode arguments
var args cstructs.MonitorExportRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&args); err != nil {
handleStreamResultError(err, pointer.Of(int64(500)), encoder)
return
}
// Check acl
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
handleStreamResultError(err, pointer.Of(int64(403)), encoder)
return
} else if !aclObj.AllowAgentRead() {
handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(403)), encoder)
return
}
nomadLogPath := a.c.GetConfig().LogFile
if args.OnDisk && nomadLogPath == "" {
handleStreamResultError(errors.New("No nomad log file defined"), pointer.Of(int64(400)), encoder)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opts := monitor.MonitorExportOpts{
Logger: a.c.logger,
LogsSince: args.LogsSince,
ServiceName: args.ServiceName,
NomadLogPath: nomadLogPath,
OnDisk: args.OnDisk,
Follow: args.Follow,
Context: ctx,
}
frames := make(chan *sframer.StreamFrame, streamFramesBuffer)
errCh := make(chan error)
var buf bytes.Buffer
frameSize := 1024
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)
framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, frameSize)
framer.Run()
defer framer.Destroy()
// goroutine to detect remote side closing
go func() {
if _, err := conn.Read(nil); err != nil {
// One end of the pipe explicitly closed, exit
cancel()
return
}
<-ctx.Done()
}()
m, err := monitor.NewExportMonitor(opts)
if err != nil {
handleStreamResultError(err, pointer.Of(int64(500)), encoder)
return
}
var eofCancelCh chan error
streamCh := m.Start()
initialOffset := int64(0)
eofCancel := !opts.Follow
// receive logs and build frames
streamReader := monitor.NewStreamReader(streamCh, framer, int64(frameSize))
go func() {
defer framer.Destroy()
if err := streamReader.StreamFixed(ctx, initialOffset, "", 0, eofCancelCh, eofCancel); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
}
}()
streamEncoder := monitor.NewStreamEncoder(&buf, conn, encoder, frameCodec, args.PlainText)
streamErr := streamEncoder.EncodeStream(frames, errCh, ctx, framer, true)
if streamErr != nil {
handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder)
return
}
}

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net"
"os"
"strings"
"testing"
"time"
@@ -18,11 +19,13 @@ import (
"github.com/hashicorp/nomad/client/config"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/command/agent/pprof"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -446,3 +449,74 @@ func TestAgentHost_ACL(t *testing.T) {
})
}
}
func TestMonitor_MonitorExport(t *testing.T) {
ci.Parallel(t)
// Create test file
testFilePath := monitor.PrepFile(t).Name()
testFileContents, err := os.ReadFile(testFilePath)
must.NoError(t, err)
// start server
s, root, cleanupS := nomad.TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
defer cleanupS()
c, cleanupC := TestClient(t, func(c *config.Config) {
c.ACLEnabled = true
c.Servers = []string{s.GetConfig().RPCAddr.String()}
c.LogFile = testFilePath
})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny))
defer cleanupC()
testutil.WaitForLeader(t, s.RPC)
cases := []struct {
name string
expected string
serviceName string
token string
onDisk bool
expectErr bool
}{
{
name: "happy_path_golden_file",
onDisk: true,
expected: string(testFileContents),
token: root.SecretID,
},
{
name: "token_error",
onDisk: true,
expected: string(testFileContents),
token: tokenBad.SecretID,
expectErr: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
req := cstructs.MonitorExportRequest{
NodeID: "this is checked in the CLI",
OnDisk: tc.onDisk,
QueryOptions: structs.QueryOptions{
Region: "global",
AuthToken: tc.token,
},
}
builder, finalError := monitor.ExportMonitorClient_TestHelper(req, c, time.After(3*time.Second))
if tc.expectErr {
must.Error(t, finalError)
return
}
must.NoError(t, err)
must.NotNil(t, builder)
must.Eq(t, strings.TrimSpace(tc.expected), strings.TrimSpace(builder.String()))
})
}
}

View File

@@ -173,17 +173,17 @@ func NewAllocDir(logger hclog.Logger, clientAllocDir, clientMountsDir, allocID s
}
// NewTaskDir creates a new TaskDir and adds it to the AllocDirs TaskDirs map.
func (d *AllocDir) NewTaskDir(task *structs.Task) *TaskDir {
d.mu.Lock()
defer d.mu.Unlock()
func (a *AllocDir) NewTaskDir(task *structs.Task) *TaskDir {
a.mu.Lock()
defer a.mu.Unlock()
secretsSize := 0
if task.Resources != nil {
secretsSize = task.Resources.SecretsMB
}
td := d.newTaskDir(task.Name, secretsSize)
d.TaskDirs[task.Name] = td
td := a.newTaskDir(task.Name, secretsSize)
a.TaskDirs[task.Name] = td
return td
}
@@ -193,13 +193,13 @@ func (d *AllocDir) NewTaskDir(task *structs.Task) *TaskDir {
// Since a valid tar may have been written even when an error occurs, a special
// file "NOMAD-${ALLOC_ID}-ERROR.log" will be appended to the tar with the
// error message as the contents.
func (d *AllocDir) Snapshot(w io.Writer) error {
d.mu.RLock()
defer d.mu.RUnlock()
func (a *AllocDir) Snapshot(w io.Writer) error {
a.mu.RLock()
defer a.mu.RUnlock()
allocDataDir := filepath.Join(d.SharedDir, SharedDataDir)
allocDataDir := filepath.Join(a.SharedDir, SharedDataDir)
rootPaths := []string{allocDataDir}
for _, taskdir := range d.TaskDirs {
for _, taskdir := range a.TaskDirs {
rootPaths = append(rootPaths, taskdir.LocalDir)
}
@@ -213,7 +213,7 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
// Include the path of the file name relative to the alloc dir
// so that we can put the files in the right directories
relPath, err := filepath.Rel(d.AllocDir, path)
relPath, err := filepath.Rel(a.AllocDir, path)
if err != nil {
return err
}
@@ -256,14 +256,14 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
// directories in the archive
for _, path := range rootPaths {
if err := filepath.Walk(path, walkFn); err != nil {
allocID := filepath.Base(d.AllocDir)
allocID := filepath.Base(a.AllocDir)
if writeErr := writeError(tw, allocID, err); writeErr != nil {
// This could be bad; other side won't know
// snapshotting failed. It could also just mean
// the snapshotting side closed the connect
// prematurely and won't try to use the tar
// anyway.
d.logger.Warn("snapshotting failed and unable to write error marker", "error", writeErr)
a.logger.Warn("snapshotting failed and unable to write error marker", "error", writeErr)
}
return fmt.Errorf("failed to snapshot %s: %w", path, err)
}
@@ -273,20 +273,20 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
}
// Move other alloc directory's shared path and local dir to this alloc dir.
func (d *AllocDir) Move(other Interface, tasks []*structs.Task) error {
d.mu.RLock()
if !d.built {
func (a *AllocDir) Move(other Interface, tasks []*structs.Task) error {
a.mu.RLock()
if !a.built {
// Enforce the invariant that Build is called before Move
d.mu.RUnlock()
return fmt.Errorf("unable to move to %q - alloc dir is not built", d.AllocDir)
a.mu.RUnlock()
return fmt.Errorf("unable to move to %q - alloc dir is not built", a.AllocDir)
}
// Moving is slow and only reads immutable fields, so unlock during heavy IO
d.mu.RUnlock()
a.mu.RUnlock()
// Move the data directory
otherDataDir := filepath.Join(other.ShareDirPath(), SharedDataDir)
dataDir := filepath.Join(d.SharedDir, SharedDataDir)
dataDir := filepath.Join(a.SharedDir, SharedDataDir)
if fileInfo, err := os.Stat(otherDataDir); fileInfo != nil && err == nil {
os.Remove(dataDir) // remove an empty data dir if it exists
if err := os.Rename(otherDataDir, dataDir); err != nil {
@@ -302,7 +302,7 @@ func (d *AllocDir) Move(other Interface, tasks []*structs.Task) error {
fileInfo, err := os.Stat(otherTaskLocal)
if fileInfo != nil && err == nil {
// TaskDirs haven't been built yet, so create it
newTaskDir := filepath.Join(d.AllocDir, task.Name)
newTaskDir := filepath.Join(a.AllocDir, task.Name)
if err := os.MkdirAll(newTaskDir, fileMode777); err != nil {
return fmt.Errorf("error creating task %q dir: %w", task.Name, err)
}
@@ -318,31 +318,31 @@ func (d *AllocDir) Move(other Interface, tasks []*structs.Task) error {
}
// Destroy tears down previously build directory structure.
func (d *AllocDir) Destroy() error {
func (a *AllocDir) Destroy() error {
// Unmount all mounted shared alloc dirs.
mErr := new(multierror.Error)
if err := d.UnmountAll(); err != nil {
if err := a.UnmountAll(); err != nil {
mErr = multierror.Append(mErr, err)
}
if err := os.RemoveAll(d.AllocDir); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("failed to remove alloc dir %q: %w", d.AllocDir, err))
if err := os.RemoveAll(a.AllocDir); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("failed to remove alloc dir %q: %w", a.AllocDir, err))
}
// Unset built since the alloc dir has been destroyed.
d.mu.Lock()
d.built = false
d.mu.Unlock()
a.mu.Lock()
a.built = false
a.mu.Unlock()
return mErr.ErrorOrNil()
}
// UnmountAll linked/mounted directories in task dirs.
func (d *AllocDir) UnmountAll() error {
d.mu.RLock()
defer d.mu.RUnlock()
func (a *AllocDir) UnmountAll() error {
a.mu.RLock()
defer a.mu.RUnlock()
mErr := new(multierror.Error)
for _, dir := range d.TaskDirs {
for _, dir := range a.TaskDirs {
if err := dir.Unmount(); err != nil {
mErr = multierror.Append(mErr, err)
}
@@ -352,41 +352,41 @@ func (d *AllocDir) UnmountAll() error {
}
// Build the directory tree for an allocation.
func (d *AllocDir) Build() error {
func (a *AllocDir) Build() error {
// Make the alloc directory, owned by the nomad process.
if err := os.MkdirAll(d.AllocDir, fileMode755); err != nil {
return fmt.Errorf("Failed to make the alloc directory %v: %w", d.AllocDir, err)
if err := os.MkdirAll(a.AllocDir, fileMode755); err != nil {
return fmt.Errorf("Failed to make the alloc directory %v: %w", a.AllocDir, err)
}
// Make the shared directory and make it available to all user/groups.
if err := allocMkdirAll(d.SharedDir, fileMode755); err != nil {
if err := allocMkdirAll(a.SharedDir, fileMode755); err != nil {
return err
}
// Create shared subdirs
for _, dir := range SharedAllocDirs {
p := filepath.Join(d.SharedDir, dir)
p := filepath.Join(a.SharedDir, dir)
if err := allocMkdirAll(p, fileMode777); err != nil {
return err
}
}
// Mark as built
d.mu.Lock()
d.built = true
d.mu.Unlock()
a.mu.Lock()
a.built = true
a.mu.Unlock()
return nil
}
// List returns the list of files at a path relative to the alloc dir
func (d *AllocDir) List(path string) ([]*cstructs.AllocFileInfo, error) {
if escapes, err := escapingfs.PathEscapesAllocDir(d.AllocDir, "", path); err != nil {
func (a *AllocDir) List(path string) ([]*cstructs.AllocFileInfo, error) {
if escapes, err := escapingfs.PathEscapesAllocDir(a.AllocDir, "", path); err != nil {
return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %w", err)
} else if escapes {
return nil, fmt.Errorf("Path escapes the alloc directory")
}
p := filepath.Join(d.AllocDir, path)
p := filepath.Join(a.AllocDir, path)
finfos, err := os.ReadDir(p)
if err != nil {
return []*cstructs.AllocFileInfo{}, err
@@ -409,14 +409,14 @@ func (d *AllocDir) List(path string) ([]*cstructs.AllocFileInfo, error) {
}
// Stat returns information about the file at a path relative to the alloc dir
func (d *AllocDir) Stat(path string) (*cstructs.AllocFileInfo, error) {
if escapes, err := escapingfs.PathEscapesAllocDir(d.AllocDir, "", path); err != nil {
func (a *AllocDir) Stat(path string) (*cstructs.AllocFileInfo, error) {
if escapes, err := escapingfs.PathEscapesAllocDir(a.AllocDir, "", path); err != nil {
return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %w", err)
} else if escapes {
return nil, fmt.Errorf("Path escapes the alloc directory")
}
p := filepath.Join(d.AllocDir, path)
p := filepath.Join(a.AllocDir, path)
info, err := os.Stat(p)
if err != nil {
return nil, err
@@ -459,28 +459,28 @@ func detectContentType(fileInfo os.FileInfo, path string) string {
}
// ReadAt returns a reader for a file at the path relative to the alloc dir
func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {
if escapes, err := escapingfs.PathEscapesAllocDir(d.AllocDir, "", path); err != nil {
func (a *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {
if escapes, err := escapingfs.PathEscapesAllocDir(a.AllocDir, "", path); err != nil {
return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %w", err)
} else if escapes {
return nil, fmt.Errorf("Path escapes the alloc directory")
}
p := filepath.Join(d.AllocDir, path)
p := filepath.Join(a.AllocDir, path)
// Check if it is trying to read into a secret directory
d.mu.RLock()
for _, dir := range d.TaskDirs {
a.mu.RLock()
for _, dir := range a.TaskDirs {
if caseInsensitiveHasPrefix(p, dir.SecretsDir) {
d.mu.RUnlock()
a.mu.RUnlock()
return nil, fmt.Errorf("Reading secret file prohibited: %s", path)
}
if caseInsensitiveHasPrefix(p, dir.PrivateDir) {
d.mu.RUnlock()
a.mu.RUnlock()
return nil, fmt.Errorf("Reading private file prohibited: %s", path)
}
}
d.mu.RUnlock()
a.mu.RUnlock()
f, err := os.Open(p)
if err != nil {
@@ -499,15 +499,15 @@ func caseInsensitiveHasPrefix(s, prefix string) bool {
// BlockUntilExists blocks until the passed file relative the allocation
// directory exists. The block can be cancelled with the passed context.
func (d *AllocDir) BlockUntilExists(ctx context.Context, path string) (chan error, error) {
if escapes, err := escapingfs.PathEscapesAllocDir(d.AllocDir, "", path); err != nil {
func (a *AllocDir) BlockUntilExists(ctx context.Context, path string) (chan error, error) {
if escapes, err := escapingfs.PathEscapesAllocDir(a.AllocDir, "", path); err != nil {
return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %w", err)
} else if escapes {
return nil, fmt.Errorf("Path escapes the alloc directory")
}
// Get the path relative to the alloc directory
p := filepath.Join(d.AllocDir, path)
p := filepath.Join(a.AllocDir, path)
watcher := getFileWatcher(p)
returnCh := make(chan error, 1)
t := &tomb.Tomb{}
@@ -525,8 +525,8 @@ func (d *AllocDir) BlockUntilExists(ctx context.Context, path string) (chan erro
// ChangeEvents watches for changes to the passed path relative to the
// allocation directory. The offset should be the last read offset. The context is
// used to clean up the watch.
func (d *AllocDir) ChangeEvents(ctx context.Context, path string, curOffset int64) (*watch.FileChanges, error) {
if escapes, err := escapingfs.PathEscapesAllocDir(d.AllocDir, "", path); err != nil {
func (a *AllocDir) ChangeEvents(ctx context.Context, path string, curOffset int64) (*watch.FileChanges, error) {
if escapes, err := escapingfs.PathEscapesAllocDir(a.AllocDir, "", path); err != nil {
return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %w", err)
} else if escapes {
return nil, fmt.Errorf("Path escapes the alloc directory")
@@ -539,7 +539,7 @@ func (d *AllocDir) ChangeEvents(ctx context.Context, path string, curOffset int6
}()
// Get the path relative to the alloc directory
p := filepath.Join(d.AllocDir, path)
p := filepath.Join(a.AllocDir, path)
watcher := getFileWatcher(p)
return watcher.ChangeEvents(t, curOffset)
}

View File

@@ -98,28 +98,28 @@ type TaskDir struct {
// create paths on disk.
//
// Call AllocDir.NewTaskDir to create new TaskDirs
func (d *AllocDir) newTaskDir(taskName string, secretsInMB int) *TaskDir {
taskDir := filepath.Join(d.AllocDir, taskName)
taskUnique := filepath.Base(d.AllocDir) + "-" + taskName
func (a *AllocDir) newTaskDir(taskName string, secretsInMB int) *TaskDir {
taskDir := filepath.Join(a.AllocDir, taskName)
taskUnique := filepath.Base(a.AllocDir) + "-" + taskName
if secretsInMB == 0 {
secretsInMB = defaultSecretDirTmpfsSize
}
return &TaskDir{
AllocDir: d.AllocDir,
AllocDir: a.AllocDir,
Dir: taskDir,
SharedAllocDir: filepath.Join(d.AllocDir, SharedAllocName),
LogDir: filepath.Join(d.AllocDir, SharedAllocName, LogDirName),
SharedAllocDir: filepath.Join(a.AllocDir, SharedAllocName),
LogDir: filepath.Join(a.AllocDir, SharedAllocName, LogDirName),
SharedTaskDir: filepath.Join(taskDir, SharedAllocName),
LocalDir: filepath.Join(taskDir, TaskLocal),
SecretsDir: filepath.Join(taskDir, TaskSecrets),
PrivateDir: filepath.Join(taskDir, TaskPrivate),
MountsAllocDir: filepath.Join(d.clientAllocMountsDir, taskUnique, "alloc"),
MountsTaskDir: filepath.Join(d.clientAllocMountsDir, taskUnique),
MountsSecretsDir: filepath.Join(d.clientAllocMountsDir, taskUnique, "secrets"),
skip: set.From[string]([]string{d.clientAllocDir, d.clientAllocMountsDir}),
logger: d.logger.Named("task_dir").With("task_name", taskName),
MountsAllocDir: filepath.Join(a.clientAllocMountsDir, taskUnique, "alloc"),
MountsTaskDir: filepath.Join(a.clientAllocMountsDir, taskUnique),
MountsSecretsDir: filepath.Join(a.clientAllocMountsDir, taskUnique, "secrets"),
skip: set.From[string]([]string{a.clientAllocDir, a.clientAllocMountsDir}),
logger: a.logger.Named("task_dir").With("task_name", taskName),
secretsInMB: secretsInMB,
}
}

View File

@@ -594,10 +594,10 @@ func (c *cniNetworkConfigurator) Teardown(ctx context.Context, alloc *structs.Al
// best effort cleanup ipv6
ipt, iptErr := c.newIPTables(structs.NodeNetworkAF_IPv6)
if iptErr != nil {
c.logger.Debug("failed to detect ip6tables: %v", iptErr)
c.logger.Debug("failed to detect ip6tables", "error", iptErr)
} else {
if err := c.forceCleanup(ipt, alloc.ID); err != nil {
c.logger.Warn("ip6tables: %v", err)
c.logger.Warn("failed to cleanup iptables", "error", err)
}
}

View File

@@ -388,6 +388,9 @@ type Config struct {
// NodeMaxAllocs is an optional field that sets the maximum number of
// allocations a node can be assigned. Defaults to 0 and ignored if unset.
NodeMaxAllocs int
// LogFile is used by MonitorExport to stream a server's log file
LogFile string `hcl:"log_file"`
}
type APIListenerRegistrar interface {

View File

@@ -88,7 +88,7 @@ func (e *editor) Write(filename, content string) error {
return os.WriteFile(path, []byte(content), 0644)
}
// A Factory creates a Lifecycle which is an abstraction over the setup and
// Factory creates a Lifecycle which is an abstraction over the setup and
// teardown routines used for creating and destroying cgroups used for
// constraining Nomad tasks.
func Factory(allocID, task string, cores bool) Lifecycle {
@@ -106,7 +106,7 @@ func Factory(allocID, task string, cores bool) Lifecycle {
}
}
// A Lifecycle manages the lifecycle of the cgroup(s) of a task from the
// Lifecycle manages the lifecycle of the cgroup(s) of a task from the
// perspective of the Nomad client. That is, it creates and deletes the cgroups
// for a task, as well as provides last effort kill semantics for ensuring a
// process cannot stay alive beyond the intent of the client.

View File

@@ -27,7 +27,8 @@ import (
"golang.org/x/sys/unix"
)
// Returns an object representing the current OS thread's network namespace
// GetCurrentNS returns an object representing the current OS thread's network
// namespace
func GetCurrentNS() (NetNS, error) {
// Lock the thread in case other goroutine executes in it and changes its
// network namespace after getCurrentThreadNetNSPath(), otherwise it might
@@ -140,7 +141,7 @@ func IsNSorErr(nspath string) error {
}
}
// Returns an object representing the namespace referred to by @path
// GetNS returns an object representing the namespace referred to by @path
func GetNS(nspath string) (NetNS, error) {
err := IsNSorErr(nspath)
if err != nil {

View File

@@ -87,18 +87,18 @@ type Topology struct {
OverrideWitholdCompute hw.MHz
}
func (t *Topology) SetNodes(nodes *idset.Set[hw.NodeID]) {
t.nodeIDs = nodes
func (st *Topology) SetNodes(nodes *idset.Set[hw.NodeID]) {
st.nodeIDs = nodes
if !nodes.Empty() {
t.Nodes = nodes.Slice()
st.Nodes = nodes.Slice()
} else {
t.Nodes = []uint8{}
st.Nodes = []uint8{}
}
}
func (t *Topology) SetNodesFrom(nodes []uint8) {
t.nodeIDs = idset.From[hw.NodeID](nodes)
t.Nodes = nodes
func (st *Topology) SetNodesFrom(nodes []uint8) {
st.nodeIDs = idset.From[hw.NodeID](nodes)
st.Nodes = nodes
}
// A Core represents one logical (vCPU) core on a processor. Basically the slice

View File

@@ -97,6 +97,9 @@ type StreamFramer struct {
// Captures whether the framer is running
running bool
// Confirms final flush sent
flushed bool
}
// NewStreamFramer creates a new stream framer that will output StreamFrames to
@@ -107,7 +110,6 @@ func NewStreamFramer(out chan<- *StreamFrame,
// Create the heartbeat and flush ticker
heartbeat := time.NewTicker(heartbeatRate)
flusher := time.NewTicker(batchWindow)
return &StreamFramer{
out: out,
frameSize: frameSize,
@@ -123,7 +125,6 @@ func NewStreamFramer(out chan<- *StreamFrame,
// Destroy is used to cleanup the StreamFramer and flush any pending frames
func (s *StreamFramer) Destroy() {
s.l.Lock()
wasShutdown := s.shutdown
s.shutdown = true
@@ -204,7 +205,6 @@ OUTER:
// Send() may have left a partial frame. Send it now.
if !s.f.IsCleared() {
s.f.Data = s.readData()
// Only send if there's actually data left
if len(s.f.Data) > 0 {
// Cannot select on shutdownCh as it's already closed
@@ -281,6 +281,7 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
// Flush till we are under the max frame size
for s.data.Len() >= s.frameSize || force {
// Clear since are flushing the frame and capturing the file event.
// Subsequent data frames will be flushed based on the data size alone
// since they share the same fileevent.
@@ -309,3 +310,22 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
return nil
}
func (s *StreamFramer) IsFlushed() bool {
return s.flushed
}
func (s *StreamFramer) Flush() bool {
s.l.Lock()
// Send() may have left a partial frame. Send it now.
s.f.Data = s.readData()
// Only send if there's actually data left
if len(s.f.Data) > 0 {
s.out <- s.f.Copy()
}
s.flushed = true
s.l.Unlock()
return s.IsFlushed()
}

View File

@@ -62,6 +62,37 @@ type MonitorRequest struct {
structs.QueryOptions
}
type MonitorExportRequest struct {
// NodeID is the node we want to track the logs of
NodeID string
// ServerID is the server we want to track the logs of
ServerID string
// ServiceName is the systemd service for which we want to retrieve logs
// Cannot be used with OnDisk
ServiceName string
// Follow indicates that the monitor should continue to deliver logs until
// an outside interrupt. Cannot be used with OnDisk
Follow bool
// LogsSince sets the lookback time for monitorExport logs in hours
LogsSince string
// OnDisk indicates that nomad should export logs written to the configured nomad log path
OnDisk bool
// NomadLogPath is set to the nomad log path by the HTTP agent if OnDisk
// is true
NomadLogPath string
// PlainText disables base64 encoding.
PlainText bool
structs.QueryOptions
}
// AllocFileInfo holds information about a file inside the AllocDir
type AllocFileInfo struct {
Name string