Merge branch 'master' into f-policy-json

This commit is contained in:
Buck Doyle
2019-11-20 11:20:07 -06:00
committed by GitHub
852 changed files with 27635 additions and 9245 deletions

View File

@@ -53,7 +53,7 @@ type Agent struct {
config *Config
configLock sync.Mutex
logger log.Logger
logger log.InterceptLogger
httpLogger log.Logger
logOutput io.Writer
@@ -87,7 +87,7 @@ type Agent struct {
}
// NewAgent is used to create a new agent with the given configuration
func NewAgent(config *Config, logger log.Logger, logOutput io.Writer, inmem *metrics.InmemSink) (*Agent, error) {
func NewAgent(config *Config, logger log.InterceptLogger, logOutput io.Writer, inmem *metrics.InmemSink) (*Agent, error) {
a := &Agent{
config: config,
logOutput: logOutput,
@@ -278,7 +278,9 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
if gcInterval := agentConfig.Server.JobGCInterval; gcInterval != "" {
dur, err := time.ParseDuration(gcInterval)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to parse job_gc_interval: %v", err)
} else if dur <= time.Duration(0) {
return nil, fmt.Errorf("job_gc_interval should be greater than 0s")
}
conf.JobGCInterval = dur
}
@@ -830,40 +832,6 @@ func (a *Agent) reservePortsForClient(conf *clientconfig.Config) error {
return nil
}
// findLoopbackDevice iterates through all the interfaces on a machine and
// returns the ip addr, mask of the loopback device
func (a *Agent) findLoopbackDevice() (string, string, string, error) {
var ifcs []net.Interface
var err error
ifcs, err = net.Interfaces()
if err != nil {
return "", "", "", err
}
for _, ifc := range ifcs {
addrs, err := ifc.Addrs()
if err != nil {
return "", "", "", err
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip.IsLoopback() {
if ip.To4() == nil {
continue
}
return ifc.Name, ip.String(), addr.String(), nil
}
}
}
return "", "", "", fmt.Errorf("no loopback devices with IPV4 addr found")
}
// Leave is used gracefully exit. Clients will inform servers
// of their departure so that allocations can be rescheduled.
func (a *Agent) Leave() error {

View File

@@ -1,16 +1,25 @@
package agent
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"sort"
"strconv"
"strings"
"github.com/docker/docker/pkg/ioutils"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/acl"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/copystructure"
"github.com/ugorji/go/codec"
)
type Member struct {
@@ -87,6 +96,18 @@ func (s *HTTPServer) AgentSelfRequest(resp http.ResponseWriter, req *http.Reques
self.Config.Vault.Token = "<redacted>"
}
if self.Config != nil && self.Config.ACL != nil && self.Config.ACL.ReplicationToken != "" {
self.Config.ACL.ReplicationToken = "<redacted>"
}
if self.Config != nil && self.Config.Consul != nil && self.Config.Consul.Token != "" {
self.Config.Consul.Token = "<redacted>"
}
if self.Config != nil && self.Config.Telemetry != nil && self.Config.Telemetry.CirconusAPIToken != "" {
self.Config.Telemetry.CirconusAPIToken = "<redacted>"
}
return self, nil
}
@@ -133,6 +154,155 @@ func (s *HTTPServer) AgentMembersRequest(resp http.ResponseWriter, req *http.Req
return out, nil
}
func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var secret string
s.parseToken(req, &secret)
// Check agent read permissions
if aclObj, err := s.agent.Server().ResolveToken(secret); err != nil {
return nil, err
} else if aclObj != nil && !aclObj.AllowAgentRead() {
return nil, structs.ErrPermissionDenied
}
// Get the provided loglevel.
logLevel := req.URL.Query().Get("log_level")
if logLevel == "" {
logLevel = "INFO"
}
if log.LevelFromString(logLevel) == log.NoLevel {
return nil, CodedError(400, fmt.Sprintf("Unknown log level: %s", logLevel))
}
logJSON := false
logJSONStr := req.URL.Query().Get("log_json")
if logJSONStr != "" {
parsed, err := strconv.ParseBool(logJSONStr)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Unknown option for log json: %v", err))
}
logJSON = parsed
}
plainText := false
plainTextStr := req.URL.Query().Get("plain")
if plainTextStr != "" {
parsed, err := strconv.ParseBool(plainTextStr)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Unknown option for plain: %v", err))
}
plainText = parsed
}
nodeID := req.URL.Query().Get("node_id")
// Build the request and parse the ACL token
args := cstructs.MonitorRequest{
NodeID: nodeID,
ServerID: req.URL.Query().Get("server_id"),
LogLevel: logLevel,
LogJSON: logJSON,
PlainText: plainText,
}
// if node and server were requested return error
if args.NodeID != "" && args.ServerID != "" {
return nil, CodedError(400, "Cannot target node and server simultaneously")
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
// Make the RPC
var handler structs.StreamingRpcHandler
var handlerErr error
if nodeID != "" {
// Determine the handler to use
useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForNode(nodeID)
if useLocalClient {
handler, handlerErr = s.agent.Client().StreamingRpcHandler("Agent.Monitor")
} else if useClientRPC {
handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler("Agent.Monitor")
} else if useServerRPC {
handler, handlerErr = s.agent.Server().StreamingRpcHandler("Agent.Monitor")
} else {
handlerErr = CodedError(400, "No local Node and node_id not provided")
}
} else {
// No node id we want to monitor this server
handler, handlerErr = s.agent.Server().StreamingRpcHandler("Agent.Monitor")
}
if handlerErr != nil {
return nil, CodedError(500, handlerErr.Error())
}
httpPipe, handlerPipe := net.Pipe()
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
ctx, cancel := context.WithCancel(req.Context())
go func() {
<-ctx.Done()
httpPipe.Close()
}()
// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)
// create an error channel to handle errors
errCh := make(chan HTTPCodedError, 2)
// stream response
go func() {
defer cancel()
// Send the request
if err := encoder.Encode(args); err != nil {
errCh <- CodedError(500, err.Error())
return
}
for {
select {
case <-ctx.Done():
errCh <- nil
return
default:
}
var res cstructs.StreamErrWrapper
if err := decoder.Decode(&res); err != nil {
errCh <- CodedError(500, err.Error())
return
}
decoder.Reset(httpPipe)
if err := res.Error; err != nil {
if err.Code != nil {
errCh <- CodedError(int(*err.Code), err.Error())
return
}
}
if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil {
errCh <- CodedError(500, err.Error())
return
}
}
}()
handler(handlerPipe)
cancel()
codedErr := <-errCh
if codedErr != nil &&
(codedErr == io.EOF ||
strings.Contains(codedErr.Error(), "closed") ||
strings.Contains(codedErr.Error(), "EOF")) {
codedErr = nil
}
return nil, codedErr
}
func (s *HTTPServer) AgentForceLeaveRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)

View File

@@ -4,10 +4,12 @@ import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
@@ -22,45 +24,64 @@ import (
func TestHTTP_AgentSelf(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/agent/self", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(err)
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.AgentSelfRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(err)
// Check the job
self := obj.(agentSelf)
if self.Config == nil {
t.Fatalf("bad: %#v", self)
}
if len(self.Stats) == 0 {
t.Fatalf("bad: %#v", self)
}
require.NotNil(self.Config)
require.NotNil(self.Config.ACL)
require.NotEmpty(self.Stats)
// Check the Vault config
if self.Config.Vault.Token != "" {
t.Fatalf("bad: %#v", self)
}
require.Empty(self.Config.Vault.Token)
// Assign a Vault token and require it is redacted.
s.Config.Vault.Token = "badc0deb-adc0-deba-dc0d-ebadc0debadc"
respW = httptest.NewRecorder()
obj, err = s.Server.AgentSelfRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(err)
self = obj.(agentSelf)
if self.Config.Vault.Token != "<redacted>" {
t.Fatalf("bad: %#v", self)
}
require.Equal("<redacted>", self.Config.Vault.Token)
// Assign a ReplicationToken token and require it is redacted.
s.Config.ACL.ReplicationToken = "badc0deb-adc0-deba-dc0d-ebadc0debadc"
respW = httptest.NewRecorder()
obj, err = s.Server.AgentSelfRequest(respW, req)
require.NoError(err)
self = obj.(agentSelf)
require.Equal("<redacted>", self.Config.ACL.ReplicationToken)
// Check the Consul config
require.Empty(self.Config.Consul.Token)
// Assign a Consul token and require it is redacted.
s.Config.Consul.Token = "badc0deb-adc0-deba-dc0d-ebadc0debadc"
respW = httptest.NewRecorder()
obj, err = s.Server.AgentSelfRequest(respW, req)
require.NoError(err)
self = obj.(agentSelf)
require.Equal("<redacted>", self.Config.Consul.Token)
// Check the Circonus config
require.Empty(self.Config.Telemetry.CirconusAPIToken)
// Assign a Consul token and require it is redacted.
s.Config.Telemetry.CirconusAPIToken = "badc0deb-adc0-deba-dc0d-ebadc0debadc"
respW = httptest.NewRecorder()
obj, err = s.Server.AgentSelfRequest(respW, req)
require.NoError(err)
self = obj.(agentSelf)
require.Equal("<redacted>", self.Config.Telemetry.CirconusAPIToken)
})
}
@@ -230,6 +251,162 @@ func TestHTTP_AgentMembers_ACL(t *testing.T) {
})
}
func TestHTTP_AgentMonitor(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
// invalid log_json
{
req, err := http.NewRequest("GET", "/v1/agent/monitor?log_json=no", nil)
require.Nil(t, err)
resp := newClosableRecorder()
// Make the request
_, err = s.Server.AgentMonitor(resp, req)
if err.(HTTPCodedError).Code() != 400 {
t.Fatalf("expected 400 response, got: %v", resp.Code)
}
}
// unknown log_level
{
req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=unknown", nil)
require.Nil(t, err)
resp := newClosableRecorder()
// Make the request
_, err = s.Server.AgentMonitor(resp, req)
if err.(HTTPCodedError).Code() != 400 {
t.Fatalf("expected 400 response, got: %v", resp.Code)
}
}
// check for a specific log
{
req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=warn", nil)
require.Nil(t, err)
resp := newClosableRecorder()
defer resp.Close()
go func() {
_, err = s.Server.AgentMonitor(resp, req)
require.NoError(t, err)
}()
// send the same log until monitor sink is set up
maxLogAttempts := 10
tried := 0
testutil.WaitForResult(func() (bool, error) {
if tried < maxLogAttempts {
s.Server.logger.Warn("log that should be sent")
tried++
}
got := resp.Body.String()
want := `{"Data":"`
if strings.Contains(got, want) {
return true, nil
}
return false, fmt.Errorf("missing expected log, got: %v, want: %v", got, want)
}, func(err error) {
require.Fail(t, err.Error())
})
}
// plain param set to true
{
req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=debug&plain=true", nil)
require.Nil(t, err)
resp := newClosableRecorder()
defer resp.Close()
go func() {
_, err = s.Server.AgentMonitor(resp, req)
require.NoError(t, err)
}()
// send the same log until monitor sink is set up
maxLogAttempts := 10
tried := 0
testutil.WaitForResult(func() (bool, error) {
if tried < maxLogAttempts {
s.Server.logger.Debug("log that should be sent")
tried++
}
got := resp.Body.String()
want := `[DEBUG] http: log that should be sent`
if strings.Contains(got, want) {
return true, nil
}
return false, fmt.Errorf("missing expected log, got: %v, want: %v", got, want)
}, func(err error) {
require.Fail(t, err.Error())
})
}
// stream logs for a given node
{
req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=warn&node_id="+s.client.NodeID(), nil)
require.Nil(t, err)
resp := newClosableRecorder()
defer resp.Close()
go func() {
_, err = s.Server.AgentMonitor(resp, req)
require.NoError(t, err)
}()
// send the same log until monitor sink is set up
maxLogAttempts := 10
tried := 0
out := ""
testutil.WaitForResult(func() (bool, error) {
if tried < maxLogAttempts {
s.Server.logger.Debug("log that should not be sent")
s.Server.logger.Warn("log that should be sent")
tried++
}
output, err := ioutil.ReadAll(resp.Body)
if err != nil {
return false, err
}
out += string(output)
want := `{"Data":"`
if strings.Contains(out, want) {
return true, nil
}
return false, fmt.Errorf("missing expected log, got: %v, want: %v", out, want)
}, func(err error) {
require.Fail(t, err.Error())
})
}
})
}
type closableRecorder struct {
*httptest.ResponseRecorder
closer chan bool
}
func newClosableRecorder() *closableRecorder {
r := httptest.NewRecorder()
closer := make(chan bool)
return &closableRecorder{r, closer}
}
func (r *closableRecorder) Close() {
close(r.closer)
}
func (r *closableRecorder) CloseNotify() <-chan bool {
return r.closer
}
func TestHTTP_AgentForceLeave(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {

View File

@@ -162,7 +162,7 @@ func TestAgent_ServerConfig(t *testing.T) {
if err := conf.normalizeAddrs(); err != nil {
t.Fatalf("error normalizing config: %v", err)
}
out, err = a.serverConfig()
_, err = a.serverConfig()
if err == nil || !strings.Contains(err.Error(), "unknown unit") {
t.Fatalf("expected unknown unit error, got: %#v", err)
}
@@ -172,24 +172,36 @@ func TestAgent_ServerConfig(t *testing.T) {
t.Fatalf("error normalizing config: %v", err)
}
out, err = a.serverConfig()
if err != nil {
t.Fatalf("error getting server config: %s", err)
}
if threshold := out.NodeGCThreshold; threshold != time.Second*10 {
t.Fatalf("expect 10s, got: %s", threshold)
}
conf.Server.HeartbeatGrace = 37 * time.Second
out, err = a.serverConfig()
if err != nil {
t.Fatalf("error getting server config: %s", err)
}
if threshold := out.HeartbeatGrace; threshold != time.Second*37 {
t.Fatalf("expect 37s, got: %s", threshold)
}
conf.Server.MinHeartbeatTTL = 37 * time.Second
out, err = a.serverConfig()
if err != nil {
t.Fatalf("error getting server config: %s", err)
}
if min := out.MinHeartbeatTTL; min != time.Second*37 {
t.Fatalf("expect 37s, got: %s", min)
}
conf.Server.MaxHeartbeatsPerSecond = 11.0
out, err = a.serverConfig()
if err != nil {
t.Fatalf("error getting server config: %s", err)
}
if max := out.MaxHeartbeatsPerSecond; max != 11.0 {
t.Fatalf("expect 11, got: %v", max)
}

View File

@@ -346,7 +346,7 @@ func TestHTTP_AllocRestart_ACL(t *testing.T) {
respW := httptest.NewRecorder()
_, err = s.Server.ClientAllocRequest(respW, req)
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.True(structs.IsErrUnknownAllocation(err), "(%T) %v", err, err)
}
// Try request with an invalid token and expect it to fail
@@ -360,7 +360,7 @@ func TestHTTP_AllocRestart_ACL(t *testing.T) {
setToken(req, token)
_, err = s.Server.ClientAllocRequest(respW, req)
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.True(structs.IsErrUnknownAllocation(err), "(%T) %v", err, err)
}
// Try request with a valid token
@@ -376,7 +376,7 @@ func TestHTTP_AllocRestart_ACL(t *testing.T) {
setToken(req, token)
_, err = s.Server.ClientAllocRequest(respW, req)
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
require.True(structs.IsErrUnknownAllocation(err), "(%T) %v", err, err)
}
// Try request with a management token
@@ -523,7 +523,7 @@ func TestHTTP_AllocStats_ACL(t *testing.T) {
respW := httptest.NewRecorder()
_, err := s.Server.ClientAllocRequest(respW, req)
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.True(structs.IsErrUnknownAllocation(err), "(%T) %v", err, err)
}
// Try request with an invalid token and expect failure
@@ -533,7 +533,7 @@ func TestHTTP_AllocStats_ACL(t *testing.T) {
setToken(req, token)
_, err := s.Server.ClientAllocRequest(respW, req)
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.True(structs.IsErrUnknownAllocation(err), "(%T) %v", err, err)
}
// Try request with a valid token
@@ -545,7 +545,7 @@ func TestHTTP_AllocStats_ACL(t *testing.T) {
setToken(req, token)
_, err := s.Server.ClientAllocRequest(respW, req)
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
require.True(structs.IsErrUnknownAllocation(err), "(%T) %v", err, err)
}
// Try request with a management token
@@ -812,7 +812,7 @@ func TestHTTP_AllocGC_ACL(t *testing.T) {
respW := httptest.NewRecorder()
_, err := s.Server.ClientAllocRequest(respW, req)
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.True(structs.IsErrUnknownAllocation(err), "(%T) %v", err, err)
}
// Try request with an invalid token and expect failure
@@ -822,7 +822,7 @@ func TestHTTP_AllocGC_ACL(t *testing.T) {
setToken(req, token)
_, err := s.Server.ClientAllocRequest(respW, req)
require.NotNil(err)
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
require.True(structs.IsErrUnknownAllocation(err), "(%T) %v", err, err)
}
// Try request with a valid token
@@ -834,7 +834,7 @@ func TestHTTP_AllocGC_ACL(t *testing.T) {
setToken(req, token)
_, err := s.Server.ClientAllocRequest(respW, req)
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
require.True(structs.IsErrUnknownAllocation(err), "(%T) %v", err, err)
}
// Try request with a management token

File diff suppressed because one or more lines are too long

View File

@@ -15,20 +15,21 @@ import (
"syscall"
"time"
"github.com/armon/go-metrics"
metrics "github.com/armon/go-metrics"
"github.com/armon/go-metrics/circonus"
"github.com/armon/go-metrics/datadog"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-checkpoint"
"github.com/hashicorp/go-discover"
"github.com/hashicorp/go-hclog"
checkpoint "github.com/hashicorp/go-checkpoint"
discover "github.com/hashicorp/go-discover"
hclog "github.com/hashicorp/go-hclog"
gsyslog "github.com/hashicorp/go-syslog"
"github.com/hashicorp/logutils"
"github.com/hashicorp/nomad/helper"
flaghelper "github.com/hashicorp/nomad/helper/flag-helpers"
gatedwriter "github.com/hashicorp/nomad/helper/gated-writer"
"github.com/hashicorp/nomad/helper/logging"
"github.com/hashicorp/nomad/helper/winsvc"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/version"
"github.com/mitchellh/cli"
@@ -278,14 +279,14 @@ func (c *Command) readConfig() *Config {
config.PluginDir = filepath.Join(config.DataDir, "plugins")
}
if !c.isValidConfig(config) {
if !c.isValidConfig(config, cmdConfig) {
return nil
}
return config
}
func (c *Command) isValidConfig(config *Config) bool {
func (c *Command) isValidConfig(config, cmdConfig *Config) bool {
// Check that the server is running in at least one mode.
if !(config.Server.Enabled || config.Client.Enabled) {
@@ -361,19 +362,20 @@ func (c *Command) isValidConfig(config *Config) bool {
}
// Check the bootstrap flags
if config.Server.BootstrapExpect > 0 && !config.Server.Enabled {
if !config.Server.Enabled && cmdConfig.Server.BootstrapExpect > 0 {
// report an error if BootstrapExpect is set in CLI but server is disabled
c.Ui.Error("Bootstrap requires server mode to be enabled")
return false
}
if config.Server.BootstrapExpect == 1 {
if config.Server.Enabled && config.Server.BootstrapExpect == 1 {
c.Ui.Error("WARNING: Bootstrap mode enabled! Potentially unsafe operation.")
}
return true
}
// setupLoggers is used to setup the logGate, logWriter, and our logOutput
func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, *logWriter, io.Writer) {
// setupLoggers is used to setup the logGate, and our logOutput
func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, io.Writer) {
// Setup logging. First create the gated log writer, which will
// store logs until we're ready to show them. Then create the level
// filter, filtering logs of the specified level.
@@ -388,35 +390,64 @@ func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, *logWriter,
c.Ui.Error(fmt.Sprintf(
"Invalid log level: %s. Valid log levels are: %v",
c.logFilter.MinLevel, c.logFilter.Levels))
return nil, nil, nil
return nil, nil
}
// Create a log writer, and wrap a logOutput around it
writers := []io.Writer{c.logFilter}
// Check if syslog is enabled
var syslog io.Writer
if config.EnableSyslog {
l, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, config.SyslogFacility, "nomad")
if err != nil {
c.Ui.Error(fmt.Sprintf("Syslog setup failed: %v", err))
return nil, nil, nil
return nil, nil
}
syslog = &SyslogWrapper{l, c.logFilter}
writers = append(writers, &SyslogWrapper{l, c.logFilter})
}
// Create a log writer, and wrap a logOutput around it
logWriter := NewLogWriter(512)
var logOutput io.Writer
if syslog != nil {
logOutput = io.MultiWriter(c.logFilter, logWriter, syslog)
} else {
logOutput = io.MultiWriter(c.logFilter, logWriter)
// Check if file logging is enabled
if config.LogFile != "" {
dir, fileName := filepath.Split(config.LogFile)
// if a path is provided, but has no filename, then a default is used.
if fileName == "" {
fileName = "nomad.log"
}
// Try to enter the user specified log rotation duration first
var logRotateDuration time.Duration
if config.LogRotateDuration != "" {
duration, err := time.ParseDuration(config.LogRotateDuration)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse log rotation duration: %v", err))
return nil, nil
}
logRotateDuration = duration
} else {
// Default to 24 hrs if no rotation period is specified
logRotateDuration = 24 * time.Hour
}
logFile := &logFile{
logFilter: c.logFilter,
fileName: fileName,
logPath: dir,
duration: logRotateDuration,
MaxBytes: config.LogRotateBytes,
MaxFiles: config.LogRotateMaxFiles,
}
writers = append(writers, logFile)
}
c.logOutput = logOutput
log.SetOutput(logOutput)
return logGate, logWriter, logOutput
c.logOutput = io.MultiWriter(writers...)
log.SetOutput(c.logOutput)
return logGate, c.logOutput
}
// setupAgent is used to start the agent and various interfaces
func (c *Command) setupAgent(config *Config, logger hclog.Logger, logOutput io.Writer, inmem *metrics.InmemSink) error {
func (c *Command) setupAgent(config *Config, logger hclog.InterceptLogger, logOutput io.Writer, inmem *metrics.InmemSink) error {
c.Ui.Output("Starting Nomad agent...")
agent, err := NewAgent(config, logger, logOutput, inmem)
if err != nil {
@@ -565,13 +596,13 @@ func (c *Command) Run(args []string) int {
}
// Setup the log outputs
logGate, _, logOutput := c.setupLoggers(config)
logGate, logOutput := c.setupLoggers(config)
if logGate == nil {
return 1
}
// Create logger
logger := hclog.New(&hclog.LoggerOptions{
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Name: "agent",
Level: hclog.LevelFromString(config.LogLevel),
Output: logOutput,
@@ -747,6 +778,8 @@ WAIT:
select {
case s := <-signalCh:
sig = s
case <-winsvc.ShutdownChannel():
sig = os.Interrupt
case <-c.ShutdownCh:
sig = os.Interrupt
case <-c.retryJoinErrCh:

View File

@@ -7,6 +7,7 @@ import (
"io"
"net"
"os"
"os/exec"
"os/user"
"path/filepath"
"runtime"
@@ -56,6 +57,18 @@ type Config struct {
// LogJson enables log output in a JSON format
LogJson bool `hcl:"log_json"`
// LogFile enables logging to a file
LogFile string `hcl:"log_file"`
// LogRotateDuration is the time period that logs should be rotated in
LogRotateDuration string `hcl:"log_rotate_duration"`
// LogRotateBytes is the max number of bytes that should be written to a file
LogRotateBytes int `hcl:"log_rotate_bytes"`
// LogRotateMaxFiles is the max number of log files to keep
LogRotateMaxFiles int `hcl:"log_rotate_max_files"`
// BindAddr is the address on which all of nomad's services will
// be bound. If not specified, this defaults to 127.0.0.1.
BindAddr string `hcl:"bind_addr"`
@@ -705,6 +718,10 @@ func newDevModeConfig(devMode, connectMode bool) (*devModeConfig, error) {
return nil, fmt.Errorf(
"-dev-connect uses network namespaces and is only supported for root.")
}
// Ensure Consul is on PATH
if _, err := exec.LookPath("consul"); err != nil {
return nil, fmt.Errorf("-dev-connect requires a 'consul' binary in Nomad's $PATH")
}
mode.connectMode = true
}
err := mode.networkConfig()
@@ -893,6 +910,18 @@ func (c *Config) Merge(b *Config) *Config {
if b.LogJson {
result.LogJson = true
}
if b.LogFile != "" {
result.LogFile = b.LogFile
}
if b.LogRotateDuration != "" {
result.LogRotateDuration = b.LogRotateDuration
}
if b.LogRotateBytes != 0 {
result.LogRotateBytes = b.LogRotateBytes
}
if b.LogRotateMaxFiles != 0 {
result.LogRotateMaxFiles = b.LogRotateMaxFiles
}
if b.BindAddr != "" {
result.BindAddr = b.BindAddr
}

View File

@@ -19,6 +19,7 @@ var basicConfig = &Config{
NodeName: "my-web",
DataDir: "/tmp/nomad",
PluginDir: "/tmp/nomad-plugins",
LogFile: "/var/log/nomad.log",
LogLevel: "ERR",
LogJson: true,
BindAddr: "192.168.0.1",
@@ -409,14 +410,10 @@ func TestConfig_Parse(t *testing.T) {
t.Run(tc.File, func(t *testing.T) {
require := require.New(t)
path, err := filepath.Abs(filepath.Join("./testdata", tc.File))
if err != nil {
t.Fatalf("file: %s\n\n%s", tc.File, err)
}
require.NoError(err)
actual, err := ParseConfigFile(path)
if (err != nil) != tc.Err {
t.Fatalf("file: %s\n\n%s", tc.File, err)
}
require.NoError(err)
// ParseConfig used to re-merge defaults for these three objects,
// despite them already being merged in LoadConfig. The test structs

View File

@@ -22,8 +22,8 @@ type ChecksAPI interface {
Checks() (map[string]*api.AgentCheck, error)
}
// TaskRestarter allows the checkWatcher to restart tasks.
type TaskRestarter interface {
// WorkloadRestarter allows the checkWatcher to restart tasks or entire task groups.
type WorkloadRestarter interface {
Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error
}
@@ -35,7 +35,7 @@ type checkRestart struct {
checkName string
taskKey string // composite of allocID + taskName for uniqueness
task TaskRestarter
task WorkloadRestarter
grace time.Duration
interval time.Duration
timeLimit time.Duration
@@ -114,7 +114,7 @@ func (c *checkRestart) apply(ctx context.Context, now time.Time, status string)
// asyncRestart mimics the pre-0.9 TaskRunner.Restart behavior and is intended
// to be called in a goroutine.
func asyncRestart(ctx context.Context, logger log.Logger, task TaskRestarter, event *structs.TaskEvent) {
func asyncRestart(ctx context.Context, logger log.Logger, task WorkloadRestarter, event *structs.TaskEvent) {
// Check watcher restarts are always failures
const failure = true
@@ -292,7 +292,7 @@ func (w *checkWatcher) Run(ctx context.Context) {
}
// Watch a check and restart its task if unhealthy.
func (w *checkWatcher) Watch(allocID, taskName, checkID string, check *structs.ServiceCheck, restarter TaskRestarter) {
func (w *checkWatcher) Watch(allocID, taskName, checkID string, check *structs.ServiceCheck, restarter WorkloadRestarter) {
if !check.TriggersRestarts() {
// Not watched, noop
return

View File

@@ -105,10 +105,8 @@ func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.Agen
// operations are submitted to the main loop via commit() for synchronizing
// with Consul.
type operations struct {
regServices []*api.AgentServiceRegistration
regChecks []*api.AgentCheckRegistration
scripts []*scriptCheck
regServices []*api.AgentServiceRegistration
regChecks []*api.AgentCheckRegistration
deregServices []string
deregChecks []string
}
@@ -117,12 +115,12 @@ type operations struct {
// allocations by task.
type AllocRegistration struct {
// Tasks maps the name of a task to its registered services and checks
Tasks map[string]*TaskRegistration
Tasks map[string]*ServiceRegistrations
}
func (a *AllocRegistration) copy() *AllocRegistration {
c := &AllocRegistration{
Tasks: make(map[string]*TaskRegistration, len(a.Tasks)),
Tasks: make(map[string]*ServiceRegistrations, len(a.Tasks)),
}
for k, v := range a.Tasks {
@@ -166,14 +164,14 @@ func (a *AllocRegistration) NumChecks() int {
return total
}
// TaskRegistration holds the status of services registered for a particular
// task.
type TaskRegistration struct {
// ServiceRegistrations holds the status of services registered for a particular
// task or task group.
type ServiceRegistrations struct {
Services map[string]*ServiceRegistration
}
func (t *TaskRegistration) copy() *TaskRegistration {
c := &TaskRegistration{
func (t *ServiceRegistrations) copy() *ServiceRegistrations {
c := &ServiceRegistrations{
Services: make(map[string]*ServiceRegistration, len(t.Services)),
}
@@ -230,10 +228,8 @@ type ServiceClient struct {
opCh chan *operations
services map[string]*api.AgentServiceRegistration
checks map[string]*api.AgentCheckRegistration
scripts map[string]*scriptCheck
runningScripts map[string]*scriptHandle
services map[string]*api.AgentServiceRegistration
checks map[string]*api.AgentCheckRegistration
explicitlyDeregisteredServices map[string]bool
explicitlyDeregisteredChecks map[string]bool
@@ -284,8 +280,6 @@ func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bo
opCh: make(chan *operations, 8),
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
scripts: make(map[string]*scriptCheck),
runningScripts: make(map[string]*scriptHandle),
explicitlyDeregisteredServices: make(map[string]bool),
explicitlyDeregisteredChecks: make(map[string]bool),
allocRegistrations: make(map[string]*AllocRegistration),
@@ -439,25 +433,16 @@ func (c *ServiceClient) merge(ops *operations) {
for _, check := range ops.regChecks {
c.checks[check.ID] = check
}
for _, s := range ops.scripts {
c.scripts[s.id] = s
}
for _, sid := range ops.deregServices {
delete(c.services, sid)
c.explicitlyDeregisteredServices[sid] = true
}
for _, cid := range ops.deregChecks {
if script, ok := c.runningScripts[cid]; ok {
script.cancel()
delete(c.scripts, cid)
delete(c.runningScripts, cid)
}
delete(c.checks, cid)
c.explicitlyDeregisteredChecks[cid] = true
}
metrics.SetGauge([]string{"client", "consul", "services"}, float32(len(c.services)))
metrics.SetGauge([]string{"client", "consul", "checks"}, float32(len(c.checks)))
metrics.SetGauge([]string{"client", "consul", "script_checks"}, float32(len(c.runningScripts)))
}
// sync enqueued operations.
@@ -593,16 +578,6 @@ func (c *ServiceClient) sync() error {
}
creg++
metrics.IncrCounter([]string{"client", "consul", "check_registrations"}, 1)
// Handle starting scripts
if script, ok := c.scripts[id]; ok {
// If it's already running, cancel and replace
if oldScript, running := c.runningScripts[id]; running {
oldScript.cancel()
}
// Start and store the handle
c.runningScripts[id] = script.run()
}
}
// Only log if something was actually synced
@@ -649,7 +624,7 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
ops.regServices = append(ops.regServices, serviceReg)
for _, check := range service.Checks {
checkID := makeCheckID(id, check)
checkID := MakeCheckID(id, check)
if check.Type == structs.ServiceCheckScript {
return fmt.Errorf("service %q contains invalid check: agent checks do not support scripts", service.Name)
}
@@ -700,11 +675,11 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
// serviceRegs creates service registrations, check registrations, and script
// checks from a service. It returns a service registration object with the
// service and check IDs populated.
func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, task *TaskServices) (
func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, workload *WorkloadServices) (
*ServiceRegistration, error) {
// Get the services ID
id := MakeTaskServiceID(task.AllocID, task.Name, service)
id := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
sreg := &ServiceRegistration{
serviceID: id,
checkIDs: make(map[string]struct{}, len(service.Checks)),
@@ -717,14 +692,14 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
}
// Determine the address to advertise based on the mode
ip, port, err := getAddress(addrMode, service.PortLabel, task.Networks, task.DriverNetwork)
ip, port, err := getAddress(addrMode, service.PortLabel, workload.Networks, workload.DriverNetwork)
if err != nil {
return nil, fmt.Errorf("unable to get address for service %q: %v", service.Name, err)
}
// Determine whether to use tags or canary_tags
var tags []string
if task.Canary && len(service.CanaryTags) > 0 {
if workload.Canary && len(service.CanaryTags) > 0 {
tags = make([]string, len(service.CanaryTags))
copy(tags, service.CanaryTags)
} else {
@@ -733,7 +708,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
}
// newConnect returns (nil, nil) if there's no Connect-enabled service.
connect, err := newConnect(service.Name, service.Connect, task.Networks)
connect, err := newConnect(service.Name, service.Connect, workload.Networks)
if err != nil {
return nil, fmt.Errorf("invalid Consul Connect configuration for service %q: %v", service.Name, err)
}
@@ -759,7 +734,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
ops.regServices = append(ops.regServices, serviceReg)
// Build the check registrations
checkIDs, err := c.checkRegs(ops, id, service, task)
checkIDs, err := c.checkRegs(ops, id, service, workload)
if err != nil {
return nil, err
}
@@ -772,7 +747,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, t
// checkRegs registers the checks for the given service and returns the
// registered check ids.
func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *structs.Service,
task *TaskServices) ([]string, error) {
workload *WorkloadServices) ([]string, error) {
// Fast path
numChecks := len(service.Checks)
@@ -782,17 +757,9 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st
checkIDs := make([]string, 0, numChecks)
for _, check := range service.Checks {
checkID := makeCheckID(serviceID, check)
checkID := MakeCheckID(serviceID, check)
checkIDs = append(checkIDs, checkID)
if check.Type == structs.ServiceCheckScript {
if task.DriverExec == nil {
return nil, fmt.Errorf("driver doesn't support script checks")
}
sc := newScriptCheck(task.AllocID, task.Name, checkID, check, task.DriverExec,
c.client, c.logger, c.shutdownCh)
ops.scripts = append(ops.scripts, sc)
// Skip getAddress for script checks
checkReg, err := createCheckReg(serviceID, checkID, check, "", 0)
if err != nil {
@@ -815,7 +782,7 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st
addrMode = structs.AddressModeHost
}
ip, port, err := getAddress(addrMode, portLabel, task.Networks, task.DriverNetwork)
ip, port, err := getAddress(addrMode, portLabel, workload.Networks, workload.DriverNetwork)
if err != nil {
return nil, fmt.Errorf("error getting address for check %q: %v", check.Name, err)
}
@@ -829,179 +796,67 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st
return checkIDs, nil
}
//TODO(schmichael) remove
type noopRestarter struct{}
func (noopRestarter) Restart(context.Context, *structs.TaskEvent, bool) error { return nil }
// makeAllocTaskServices creates a TaskServices struct for a group service.
//
//TODO(schmichael) rename TaskServices and refactor this into a New method
func makeAllocTaskServices(alloc *structs.Allocation, tg *structs.TaskGroup) (*TaskServices, error) {
if n := len(alloc.AllocatedResources.Shared.Networks); n == 0 {
return nil, fmt.Errorf("unable to register a group service without a group network")
}
//TODO(schmichael) only support one network for now
net := alloc.AllocatedResources.Shared.Networks[0]
ts := &TaskServices{
AllocID: alloc.ID,
Name: "group-" + alloc.TaskGroup,
Services: tg.Services,
Networks: alloc.AllocatedResources.Shared.Networks,
//TODO(schmichael) there's probably a better way than hacking driver network
DriverNetwork: &drivers.DriverNetwork{
AutoAdvertise: true,
IP: net.IP,
// Copy PortLabels from group network
PortMap: net.PortLabels(),
},
// unsupported for group services
Restarter: noopRestarter{},
DriverExec: nil,
}
if alloc.DeploymentStatus != nil {
ts.Canary = alloc.DeploymentStatus.Canary
}
return ts, nil
}
// RegisterGroup services with Consul. Adds all task group-level service
// entries and checks to Consul.
func (c *ServiceClient) RegisterGroup(alloc *structs.Allocation) error {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
return fmt.Errorf("task group %q not in allocation", alloc.TaskGroup)
}
if len(tg.Services) == 0 {
// noop
return nil
}
ts, err := makeAllocTaskServices(alloc, tg)
if err != nil {
return err
}
return c.RegisterTask(ts)
}
// UpdateGroup services with Consul. Updates all task group-level service
// entries and checks to Consul.
func (c *ServiceClient) UpdateGroup(oldAlloc, newAlloc *structs.Allocation) error {
oldTG := oldAlloc.Job.LookupTaskGroup(oldAlloc.TaskGroup)
if oldTG == nil {
return fmt.Errorf("task group %q not in old allocation", oldAlloc.TaskGroup)
}
oldServices, err := makeAllocTaskServices(oldAlloc, oldTG)
if err != nil {
return err
}
newTG := newAlloc.Job.LookupTaskGroup(newAlloc.TaskGroup)
if newTG == nil {
return fmt.Errorf("task group %q not in new allocation", newAlloc.TaskGroup)
}
newServices, err := makeAllocTaskServices(newAlloc, newTG)
if err != nil {
return err
}
return c.UpdateTask(oldServices, newServices)
}
// RemoveGroup services with Consul. Removes all task group-level service
// entries and checks from Consul.
func (c *ServiceClient) RemoveGroup(alloc *structs.Allocation) error {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
return fmt.Errorf("task group %q not in allocation", alloc.TaskGroup)
}
if len(tg.Services) == 0 {
// noop
return nil
}
ts, err := makeAllocTaskServices(alloc, tg)
if err != nil {
return err
}
c.RemoveTask(ts)
return nil
}
// RegisterTask with Consul. Adds all service entries and checks to Consul. If
// exec is nil and a script check exists an error is returned.
// RegisterWorkload with Consul. Adds all service entries and checks to Consul.
//
// If the service IP is set it used as the address in the service registration.
// Checks will always use the IP from the Task struct (host's IP).
//
// Actual communication with Consul is done asynchronously (see Run).
func (c *ServiceClient) RegisterTask(task *TaskServices) error {
func (c *ServiceClient) RegisterWorkload(workload *WorkloadServices) error {
// Fast path
numServices := len(task.Services)
numServices := len(workload.Services)
if numServices == 0 {
return nil
}
t := new(TaskRegistration)
t := new(ServiceRegistrations)
t.Services = make(map[string]*ServiceRegistration, numServices)
ops := &operations{}
for _, service := range task.Services {
sreg, err := c.serviceRegs(ops, service, task)
for _, service := range workload.Services {
sreg, err := c.serviceRegs(ops, service, workload)
if err != nil {
return err
}
t.Services[sreg.serviceID] = sreg
}
// Add the task to the allocation's registration
c.addTaskRegistration(task.AllocID, task.Name, t)
// Add the workload to the allocation's registration
c.addRegistrations(workload.AllocID, workload.Name(), t)
c.commit(ops)
// Start watching checks. Done after service registrations are built
// since an error building them could leak watches.
for _, service := range task.Services {
serviceID := MakeTaskServiceID(task.AllocID, task.Name, service)
for _, service := range workload.Services {
serviceID := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := makeCheckID(serviceID, check)
c.checkWatcher.Watch(task.AllocID, task.Name, checkID, check, task.Restarter)
checkID := MakeCheckID(serviceID, check)
c.checkWatcher.Watch(workload.AllocID, workload.Name(), checkID, check, workload.Restarter)
}
}
}
return nil
}
// UpdateTask in Consul. Does not alter the service if only checks have
// UpdateWorkload in Consul. Does not alter the service if only checks have
// changed.
//
// DriverNetwork must not change between invocations for the same allocation.
func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
func (c *ServiceClient) UpdateWorkload(old, newWorkload *WorkloadServices) error {
ops := &operations{}
taskReg := new(TaskRegistration)
taskReg.Services = make(map[string]*ServiceRegistration, len(newTask.Services))
regs := new(ServiceRegistrations)
regs.Services = make(map[string]*ServiceRegistration, len(newWorkload.Services))
existingIDs := make(map[string]*structs.Service, len(old.Services))
for _, s := range old.Services {
existingIDs[MakeTaskServiceID(old.AllocID, old.Name, s)] = s
existingIDs[MakeAllocServiceID(old.AllocID, old.Name(), s)] = s
}
newIDs := make(map[string]*structs.Service, len(newTask.Services))
for _, s := range newTask.Services {
newIDs[MakeTaskServiceID(newTask.AllocID, newTask.Name, s)] = s
newIDs := make(map[string]*structs.Service, len(newWorkload.Services))
for _, s := range newWorkload.Services {
newIDs[MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), s)] = s
}
// Loop over existing Service IDs to see if they have been removed
@@ -1012,7 +867,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
// Existing service entry removed
ops.deregServices = append(ops.deregServices, existingID)
for _, check := range existingSvc.Checks {
cid := makeCheckID(existingID, check)
cid := MakeCheckID(existingID, check)
ops.deregChecks = append(ops.deregChecks, cid)
// Unwatch watched checks
@@ -1023,8 +878,8 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
continue
}
oldHash := existingSvc.Hash(old.AllocID, old.Name, old.Canary)
newHash := newSvc.Hash(newTask.AllocID, newTask.Name, newTask.Canary)
oldHash := existingSvc.Hash(old.AllocID, old.Name(), old.Canary)
newHash := newSvc.Hash(newWorkload.AllocID, newWorkload.Name(), newWorkload.Canary)
if oldHash == newHash {
// Service exists and hasn't changed, don't re-add it later
delete(newIDs, existingID)
@@ -1035,17 +890,17 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
serviceID: existingID,
checkIDs: make(map[string]struct{}, len(newSvc.Checks)),
}
taskReg.Services[existingID] = sreg
regs.Services[existingID] = sreg
// See if any checks were updated
existingChecks := make(map[string]*structs.ServiceCheck, len(existingSvc.Checks))
for _, check := range existingSvc.Checks {
existingChecks[makeCheckID(existingID, check)] = check
existingChecks[MakeCheckID(existingID, check)] = check
}
// Register new checks
for _, check := range newSvc.Checks {
checkID := makeCheckID(existingID, check)
checkID := MakeCheckID(existingID, check)
if _, exists := existingChecks[checkID]; exists {
// Check is still required. Remove it from the map so it doesn't get
// deleted later.
@@ -1054,7 +909,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
}
// New check on an unchanged service; add them now
newCheckIDs, err := c.checkRegs(ops, existingID, newSvc, newTask)
newCheckIDs, err := c.checkRegs(ops, existingID, newSvc, newWorkload)
if err != nil {
return err
}
@@ -1065,7 +920,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
// Update all watched checks as CheckRestart fields aren't part of ID
if check.TriggersRestarts() {
c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter)
c.checkWatcher.Watch(newWorkload.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter)
}
}
@@ -1082,45 +937,45 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
// Any remaining services should just be enqueued directly
for _, newSvc := range newIDs {
sreg, err := c.serviceRegs(ops, newSvc, newTask)
sreg, err := c.serviceRegs(ops, newSvc, newWorkload)
if err != nil {
return err
}
taskReg.Services[sreg.serviceID] = sreg
regs.Services[sreg.serviceID] = sreg
}
// Add the task to the allocation's registration
c.addTaskRegistration(newTask.AllocID, newTask.Name, taskReg)
c.addRegistrations(newWorkload.AllocID, newWorkload.Name(), regs)
c.commit(ops)
// Start watching checks. Done after service registrations are built
// since an error building them could leak watches.
for _, service := range newIDs {
serviceID := MakeTaskServiceID(newTask.AllocID, newTask.Name, service)
serviceID := MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), service)
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := makeCheckID(serviceID, check)
c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter)
checkID := MakeCheckID(serviceID, check)
c.checkWatcher.Watch(newWorkload.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter)
}
}
}
return nil
}
// RemoveTask from Consul. Removes all service entries and checks.
// RemoveWorkload from Consul. Removes all service entries and checks.
//
// Actual communication with Consul is done asynchronously (see Run).
func (c *ServiceClient) RemoveTask(task *TaskServices) {
func (c *ServiceClient) RemoveWorkload(workload *WorkloadServices) {
ops := operations{}
for _, service := range task.Services {
id := MakeTaskServiceID(task.AllocID, task.Name, service)
for _, service := range workload.Services {
id := MakeAllocServiceID(workload.AllocID, workload.Name(), service)
ops.deregServices = append(ops.deregServices, id)
for _, check := range service.Checks {
cid := makeCheckID(id, check)
cid := MakeCheckID(id, check)
ops.deregChecks = append(ops.deregChecks, cid)
if check.TriggersRestarts() {
@@ -1129,8 +984,8 @@ func (c *ServiceClient) RemoveTask(task *TaskServices) {
}
}
// Remove the task from the alloc's registrations
c.removeTaskRegistration(task.AllocID, task.Name)
// Remove the workload from the alloc's registrations
c.removeRegistration(workload.AllocID, workload.Name())
// Now add them to the deregistration fields; main Run loop will update
c.commit(&ops)
@@ -1177,6 +1032,12 @@ func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration,
return reg, nil
}
// UpdateTTL is used to update the TTL of a check. Typically this will only be
// called to heartbeat script checks.
func (c *ServiceClient) UpdateTTL(id, output, status string) error {
return c.client.UpdateTTL(id, output, status)
}
// Shutdown the Consul client. Update running task registrations and deregister
// agent from Consul. On first call blocks up to shutdownWait before giving up
// on syncing operations.
@@ -1220,34 +1081,26 @@ func (c *ServiceClient) Shutdown() error {
}
}
// Give script checks time to exit (no need to lock as Run() has exited)
for _, h := range c.runningScripts {
select {
case <-h.wait():
case <-deadline:
return fmt.Errorf("timed out waiting for script checks to run")
}
}
return nil
}
// addTaskRegistration adds the task registration for the given allocation.
func (c *ServiceClient) addTaskRegistration(allocID, taskName string, reg *TaskRegistration) {
// addRegistration adds the service registrations for the given allocation.
func (c *ServiceClient) addRegistrations(allocID, taskName string, reg *ServiceRegistrations) {
c.allocRegistrationsLock.Lock()
defer c.allocRegistrationsLock.Unlock()
alloc, ok := c.allocRegistrations[allocID]
if !ok {
alloc = &AllocRegistration{
Tasks: make(map[string]*TaskRegistration),
Tasks: make(map[string]*ServiceRegistrations),
}
c.allocRegistrations[allocID] = alloc
}
alloc.Tasks[taskName] = reg
}
// removeTaskRegistration removes the task registration for the given allocation.
func (c *ServiceClient) removeTaskRegistration(allocID, taskName string) {
// removeRegistrations removes the registration for the given allocation.
func (c *ServiceClient) removeRegistration(allocID, taskName string) {
c.allocRegistrationsLock.Lock()
defer c.allocRegistrationsLock.Unlock()
@@ -1277,18 +1130,18 @@ func makeAgentServiceID(role string, service *structs.Service) string {
return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, service.Hash(role, "", false))
}
// MakeTaskServiceID creates a unique ID for identifying a task service in
// MakeAllocServiceID creates a unique ID for identifying an alloc service in
// Consul.
//
// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http-http
func MakeTaskServiceID(allocID, taskName string, service *structs.Service) string {
func MakeAllocServiceID(allocID, taskName string, service *structs.Service) string {
return fmt.Sprintf("%s%s-%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name, service.PortLabel)
}
// makeCheckID creates a unique ID for a check.
// MakeCheckID creates a unique ID for a check.
//
// Example Check ID: _nomad-check-434ae42f9a57c5705344974ac38de2aee0ee089d
func makeCheckID(serviceID string, check *structs.ServiceCheck) string {
func MakeCheckID(serviceID string, check *structs.ServiceCheck) string {
return fmt.Sprintf("%s%s", nomadCheckPrefix, check.Hash(serviceID))
}
@@ -1494,21 +1347,30 @@ func newConnect(serviceName string, nc *structs.ConsulConnect, networks structs.
// Bind to netns IP(s):port
proxyConfig := map[string]interface{}{}
if nc.SidecarService.Proxy != nil && nc.SidecarService.Proxy.Config != nil {
proxyConfig = nc.SidecarService.Proxy.Config
localServiceAddress := ""
localServicePort := 0
if nc.SidecarService.Proxy != nil {
localServiceAddress = nc.SidecarService.Proxy.LocalServiceAddress
localServicePort = nc.SidecarService.Proxy.LocalServicePort
if nc.SidecarService.Proxy.Config != nil {
proxyConfig = nc.SidecarService.Proxy.Config
}
}
proxyConfig["bind_address"] = "0.0.0.0"
proxyConfig["bind_port"] = port.To
// Advertise host IP:port
cc.SidecarService = &api.AgentServiceRegistration{
Tags: helper.CopySliceString(nc.SidecarService.Tags),
Address: net.IP,
Port: port.Value,
// Automatically configure the proxy to bind to all addresses
// within the netns.
Proxy: &api.AgentServiceConnectProxyConfig{
Config: proxyConfig,
LocalServiceAddress: localServiceAddress,
LocalServicePort: localServicePort,
Config: proxyConfig,
},
}

View File

@@ -64,18 +64,25 @@ func TestConsul_Connect(t *testing.T) {
{
Name: "testconnect",
PortLabel: "9999",
Meta: map[string]string{
"alloc_id": "${NOMAD_ALLOC_ID}",
},
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{},
SidecarService: &structs.ConsulSidecarService{
Proxy: &structs.ConsulProxy{
LocalServicePort: 9000,
},
},
},
},
}
// required by isNomadSidecar assertion below
serviceRegMap := map[string]*api.AgentServiceRegistration{
MakeTaskServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0]): nil,
MakeAllocServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0]): nil,
}
require.NoError(t, serviceClient.RegisterGroup(alloc))
require.NoError(t, serviceClient.RegisterWorkload(BuildAllocServices(mock.Node(), alloc, NoopRestarter())))
require.Eventually(t, func() bool {
services, err := consulClient.Agent().Services()
@@ -90,7 +97,7 @@ func TestConsul_Connect(t *testing.T) {
require.NoError(t, err)
require.Len(t, services, 2)
serviceID := MakeTaskServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0])
serviceID := MakeAllocServiceID(alloc.ID, "group-"+alloc.TaskGroup, tg.Services[0])
connectID := serviceID + "-sidecar-proxy"
require.Contains(t, services, serviceID)
@@ -114,11 +121,12 @@ func TestConsul_Connect(t *testing.T) {
require.Equal(t, connectService.Proxy.DestinationServiceName, "testconnect")
require.Equal(t, connectService.Proxy.DestinationServiceID, serviceID)
require.Equal(t, connectService.Proxy.LocalServiceAddress, "127.0.0.1")
require.Equal(t, connectService.Proxy.LocalServicePort, 9999)
require.Equal(t, connectService.Proxy.LocalServicePort, 9000)
require.Equal(t, connectService.Proxy.Config, map[string]interface{}{
"bind_address": "0.0.0.0",
"bind_port": float64(9998),
})
require.Equal(t, alloc.ID, agentService.Meta["alloc_id"])
time.Sleep(interval >> 2)
}

View File

@@ -1,215 +0,0 @@
package consul
import (
"context"
"time"
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/nomad/structs"
)
// heartbeater is the subset of consul agent functionality needed by script
// checks to heartbeat
type heartbeater interface {
UpdateTTL(id, output, status string) error
}
// contextExec allows canceling a ScriptExecutor with a context.
type contextExec struct {
// pctx is the parent context. A subcontext will be created with Exec's
// timeout.
pctx context.Context
// exec to be wrapped in a context
exec interfaces.ScriptExecutor
}
func newContextExec(ctx context.Context, exec interfaces.ScriptExecutor) *contextExec {
return &contextExec{
pctx: ctx,
exec: exec,
}
}
type execResult struct {
buf []byte
code int
err error
}
// Exec a command until the timeout expires, the context is canceled, or the
// underlying Exec returns.
func (c *contextExec) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) {
resCh := make(chan execResult, 1)
// Don't trust the underlying implementation to obey timeout
ctx, cancel := context.WithTimeout(c.pctx, timeout)
defer cancel()
go func() {
output, code, err := c.exec.Exec(timeout, cmd, args)
select {
case resCh <- execResult{output, code, err}:
case <-ctx.Done():
}
}()
select {
case res := <-resCh:
return res.buf, res.code, res.err
case <-ctx.Done():
return nil, 0, ctx.Err()
}
}
// scriptHandle is returned by scriptCheck.run by cancelling a scriptCheck and
// waiting for it to shutdown.
type scriptHandle struct {
// cancel the script
cancel func()
exitCh chan struct{}
}
// wait returns a chan that's closed when the script exits
func (s *scriptHandle) wait() <-chan struct{} {
return s.exitCh
}
// scriptCheck runs script checks via a ScriptExecutor and updates the
// appropriate check's TTL when the script succeeds.
type scriptCheck struct {
allocID string
taskName string
id string
check *structs.ServiceCheck
exec interfaces.ScriptExecutor
agent heartbeater
// lastCheckOk is true if the last check was ok; otherwise false
lastCheckOk bool
logger log.Logger
shutdownCh <-chan struct{}
}
// newScriptCheck creates a new scriptCheck. run() should be called once the
// initial check is registered with Consul.
func newScriptCheck(allocID, taskName, checkID string, check *structs.ServiceCheck,
exec interfaces.ScriptExecutor, agent heartbeater, logger log.Logger,
shutdownCh <-chan struct{}) *scriptCheck {
logger = logger.ResetNamed("consul.checks").With("task", taskName, "alloc_id", allocID, "check", check.Name)
return &scriptCheck{
allocID: allocID,
taskName: taskName,
id: checkID,
check: check,
exec: exec,
agent: agent,
lastCheckOk: true, // start logging on first failure
logger: logger,
shutdownCh: shutdownCh,
}
}
// run this script check and return its cancel func. If the shutdownCh is
// closed the check will be run once more before exiting.
func (s *scriptCheck) run() *scriptHandle {
ctx, cancel := context.WithCancel(context.Background())
exitCh := make(chan struct{})
// Wrap the original ScriptExecutor in one that obeys context
// cancelation.
ctxExec := newContextExec(ctx, s.exec)
go func() {
defer close(exitCh)
timer := time.NewTimer(0)
defer timer.Stop()
for {
// Block until check is removed, Nomad is shutting
// down, or the check interval is up
select {
case <-ctx.Done():
// check has been removed
return
case <-s.shutdownCh:
// unblock but don't exit until after we heartbeat once more
case <-timer.C:
timer.Reset(s.check.Interval)
}
metrics.IncrCounter([]string{"client", "consul", "script_runs"}, 1)
// Execute check script with timeout
output, code, err := ctxExec.Exec(s.check.Timeout, s.check.Command, s.check.Args)
switch err {
case context.Canceled:
// check removed during execution; exit
return
case context.DeadlineExceeded:
metrics.IncrCounter([]string{"client", "consul", "script_timeouts"}, 1)
// If no error was returned, set one to make sure the task goes critical
if err == nil {
err = context.DeadlineExceeded
}
// Log deadline exceeded every time as it's a
// distinct issue from checks returning
// failures
s.logger.Warn("check timed out", "timeout", s.check.Timeout)
}
state := api.HealthCritical
switch code {
case 0:
state = api.HealthPassing
case 1:
state = api.HealthWarning
}
var outputMsg string
if err != nil {
state = api.HealthCritical
outputMsg = err.Error()
} else {
outputMsg = string(output)
}
// Actually heartbeat the check
err = s.agent.UpdateTTL(s.id, outputMsg, state)
select {
case <-ctx.Done():
// check has been removed; don't report errors
return
default:
}
if err != nil {
if s.lastCheckOk {
s.lastCheckOk = false
s.logger.Warn("updating check failed", "error", err)
} else {
s.logger.Debug("updating check still failing", "error", err)
}
} else if !s.lastCheckOk {
// Succeeded for the first time or after failing; log
s.lastCheckOk = true
s.logger.Info("updating check succeeded")
}
select {
case <-s.shutdownCh:
// We've been told to exit and just heartbeated so exit
return
default:
}
}
}()
return &scriptHandle{cancel: cancel, exitCh: exitCh}
}

View File

@@ -1,309 +0,0 @@
package consul
import (
"context"
"fmt"
"os"
"os/exec"
"sync/atomic"
"testing"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/testtask"
"github.com/hashicorp/nomad/nomad/structs"
)
func TestMain(m *testing.M) {
if !testtask.Run() {
os.Exit(m.Run())
}
}
// blockingScriptExec implements ScriptExec by running a subcommand that never
// exits.
type blockingScriptExec struct {
// pctx is canceled *only* for test cleanup. Just like real
// ScriptExecutors its Exec method cannot be canceled directly -- only
// with a timeout.
pctx context.Context
// running is ticked before blocking to allow synchronizing operations
running chan struct{}
// set to 1 with atomics if Exec is called and has exited
exited int32
}
// newBlockingScriptExec returns a ScriptExecutor that blocks Exec() until the
// caller recvs on the b.running chan. It also returns a CancelFunc for test
// cleanup only. The runtime cannot cancel ScriptExecutors before their timeout
// expires.
func newBlockingScriptExec() (*blockingScriptExec, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
exec := &blockingScriptExec{
pctx: ctx,
running: make(chan struct{}),
}
return exec, cancel
}
func (b *blockingScriptExec) Exec(dur time.Duration, _ string, _ []string) ([]byte, int, error) {
b.running <- struct{}{}
ctx, cancel := context.WithTimeout(b.pctx, dur)
defer cancel()
cmd := exec.CommandContext(ctx, testtask.Path(), "sleep", "9000h")
testtask.SetCmdEnv(cmd)
err := cmd.Run()
code := 0
if exitErr, ok := err.(*exec.ExitError); ok {
if !exitErr.Success() {
code = 1
}
}
atomic.StoreInt32(&b.exited, 1)
return []byte{}, code, err
}
// TestConsulScript_Exec_Cancel asserts cancelling a script check shortcircuits
// any running scripts.
func TestConsulScript_Exec_Cancel(t *testing.T) {
serviceCheck := structs.ServiceCheck{
Name: "sleeper",
Interval: time.Hour,
Timeout: time.Hour,
}
exec, cancel := newBlockingScriptExec()
defer cancel()
// pass nil for heartbeater as it shouldn't be called
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testlog.HCLogger(t), nil)
handle := check.run()
// wait until Exec is called
<-exec.running
// cancel now that we're blocked in exec
handle.cancel()
select {
case <-handle.wait():
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
// The underlying ScriptExecutor (newBlockScriptExec) *cannot* be
// canceled. Only a wrapper around it obeys the context cancelation.
if atomic.LoadInt32(&exec.exited) == 1 {
t.Errorf("expected script executor to still be running after timeout")
}
}
type execStatus struct {
checkID string
output string
status string
}
// fakeHeartbeater implements the heartbeater interface to allow mocking out
// Consul in script executor tests.
type fakeHeartbeater struct {
updates chan execStatus
}
func (f *fakeHeartbeater) UpdateTTL(checkID, output, status string) error {
f.updates <- execStatus{checkID: checkID, output: output, status: status}
return nil
}
func newFakeHeartbeater() *fakeHeartbeater {
return &fakeHeartbeater{updates: make(chan execStatus)}
}
// TestConsulScript_Exec_TimeoutBasic asserts a script will be killed when the
// timeout is reached.
func TestConsulScript_Exec_TimeoutBasic(t *testing.T) {
t.Parallel()
serviceCheck := structs.ServiceCheck{
Name: "sleeper",
Interval: time.Hour,
Timeout: time.Second,
}
exec, cancel := newBlockingScriptExec()
defer cancel()
hb := newFakeHeartbeater()
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), nil)
handle := check.run()
defer handle.cancel() // just-in-case cleanup
<-exec.running
// Check for UpdateTTL call
select {
case update := <-hb.updates:
if update.status != api.HealthCritical {
t.Errorf("expected %q due to timeout but received %q", api.HealthCritical, update)
}
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
// The underlying ScriptExecutor (newBlockScriptExec) *cannot* be
// canceled. Only a wrapper around it obeys the context cancelation.
if atomic.LoadInt32(&exec.exited) == 1 {
t.Errorf("expected script executor to still be running after timeout")
}
// Cancel and watch for exit
handle.cancel()
select {
case <-handle.wait():
// ok!
case update := <-hb.updates:
t.Errorf("unexpected UpdateTTL call on exit with status=%q", update)
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
}
// sleeperExec sleeps for 100ms but returns successfully to allow testing timeout conditions
type sleeperExec struct{}
func (sleeperExec) Exec(time.Duration, string, []string) ([]byte, int, error) {
time.Sleep(100 * time.Millisecond)
return []byte{}, 0, nil
}
// TestConsulScript_Exec_TimeoutCritical asserts a script will be killed when
// the timeout is reached and always set a critical status regardless of what
// Exec returns.
func TestConsulScript_Exec_TimeoutCritical(t *testing.T) {
t.Parallel()
serviceCheck := structs.ServiceCheck{
Name: "sleeper",
Interval: time.Hour,
Timeout: time.Nanosecond,
}
hb := newFakeHeartbeater()
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testlog.HCLogger(t), nil)
handle := check.run()
defer handle.cancel() // just-in-case cleanup
// Check for UpdateTTL call
select {
case update := <-hb.updates:
if update.status != api.HealthCritical {
t.Errorf("expected %q due to timeout but received %q", api.HealthCritical, update)
}
if update.output != context.DeadlineExceeded.Error() {
t.Errorf("expected output=%q but found: %q", context.DeadlineExceeded.Error(), update.output)
}
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to timeout")
}
}
// simpleExec is a fake ScriptExecutor that returns whatever is specified.
type simpleExec struct {
code int
err error
}
func (s simpleExec) Exec(time.Duration, string, []string) ([]byte, int, error) {
return []byte(fmt.Sprintf("code=%d err=%v", s.code, s.err)), s.code, s.err
}
// newSimpleExec creates a new ScriptExecutor that returns the given code and err.
func newSimpleExec(code int, err error) simpleExec {
return simpleExec{code: code, err: err}
}
// TestConsulScript_Exec_Shutdown asserts a script will be executed once more
// when told to shutdown.
func TestConsulScript_Exec_Shutdown(t *testing.T) {
serviceCheck := structs.ServiceCheck{
Name: "sleeper",
Interval: time.Hour,
Timeout: 3 * time.Second,
}
hb := newFakeHeartbeater()
shutdown := make(chan struct{})
exec := newSimpleExec(0, nil)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), shutdown)
handle := check.run()
defer handle.cancel() // just-in-case cleanup
// Tell scriptCheck to exit
close(shutdown)
select {
case update := <-hb.updates:
if update.status != api.HealthPassing {
t.Errorf("expected %q due to timeout but received %q", api.HealthCritical, update)
}
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
select {
case <-handle.wait():
// ok!
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exit")
}
}
func TestConsulScript_Exec_Codes(t *testing.T) {
run := func(code int, err error, expected string) func(t *testing.T) {
return func(t *testing.T) {
t.Parallel()
serviceCheck := structs.ServiceCheck{
Name: "test",
Interval: time.Hour,
Timeout: 3 * time.Second,
}
hb := newFakeHeartbeater()
shutdown := make(chan struct{})
exec := newSimpleExec(code, err)
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), shutdown)
handle := check.run()
defer handle.cancel()
select {
case update := <-hb.updates:
if update.status != expected {
t.Errorf("expected %q but received %q", expected, update)
}
// assert output is being reported
expectedOutput := fmt.Sprintf("code=%d err=%v", code, err)
if err != nil {
expectedOutput = err.Error()
}
if update.output != expectedOutput {
t.Errorf("expected output=%q but found: %q", expectedOutput, update.output)
}
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to exec")
}
}
}
// Test exit codes with errors
t.Run("Passing", run(0, nil, api.HealthPassing))
t.Run("Warning", run(1, nil, api.HealthWarning))
t.Run("Critical-2", run(2, nil, api.HealthCritical))
t.Run("Critical-9000", run(9000, nil, api.HealthCritical))
// Errors should always cause Critical status
err := fmt.Errorf("test error")
t.Run("Error-0", run(0, err, api.HealthCritical))
t.Run("Error-1", run(1, err, api.HealthCritical))
t.Run("Error-2", run(2, err, api.HealthCritical))
t.Run("Error-9000", run(9000, err, api.HealthCritical))
}

View File

@@ -2,22 +2,28 @@ package consul
import (
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
type TaskServices struct {
// WorkloadServices describes services defined in either a Task or TaskGroup
// that need to be syncronized with Consul
type WorkloadServices struct {
AllocID string
// Name of the task
Name string
// Name of the task and task group the services are defined for. For
// group based services, Task will be empty
Task string
Group string
// Canary indicates whether or not the allocation is a canary
Canary bool
// Restarter allows restarting the task depending on the task's
// Restarter allows restarting the task or task group depending on the
// check_restart stanzas.
Restarter TaskRestarter
Restarter WorkloadRestarter
// Services and checks to register for the task.
Services []*structs.Service
@@ -26,41 +32,49 @@ type TaskServices struct {
Networks structs.Networks
// DriverExec is the script executor for the task's driver.
// For group services this is nil and script execution is managed by
// a tasklet in the taskrunner script_check_hook
DriverExec interfaces.ScriptExecutor
// DriverNetwork is the network specified by the driver and may be nil.
DriverNetwork *drivers.DriverNetwork
}
func NewTaskServices(alloc *structs.Allocation, task *structs.Task, restarter TaskRestarter, exec interfaces.ScriptExecutor, net *drivers.DriverNetwork) *TaskServices {
ts := TaskServices{
AllocID: alloc.ID,
Name: task.Name,
Restarter: restarter,
Services: task.Services,
DriverExec: exec,
DriverNetwork: net,
func BuildAllocServices(node *structs.Node, alloc *structs.Allocation, restarter WorkloadRestarter) *WorkloadServices {
//TODO(schmichael) only support one network for now
net := alloc.AllocatedResources.Shared.Networks[0]
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
ws := &WorkloadServices{
AllocID: alloc.ID,
Group: alloc.TaskGroup,
Services: taskenv.InterpolateServices(taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build(), tg.Services),
Networks: alloc.AllocatedResources.Shared.Networks,
//TODO(schmichael) there's probably a better way than hacking driver network
DriverNetwork: &drivers.DriverNetwork{
AutoAdvertise: true,
IP: net.IP,
// Copy PortLabels from group network
PortMap: net.PortLabels(),
},
Restarter: restarter,
DriverExec: nil,
}
if alloc.AllocatedResources != nil {
if tr, ok := alloc.AllocatedResources.Tasks[task.Name]; ok {
ts.Networks = tr.Networks
}
} else if task.Resources != nil {
// COMPAT(0.11): Remove in 0.11
ts.Networks = task.Resources.Networks
if alloc.DeploymentStatus != nil {
ws.Canary = alloc.DeploymentStatus.Canary
}
if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Canary {
ts.Canary = true
}
return &ts
return ws
}
// Copy method for easing tests
func (t *TaskServices) Copy() *TaskServices {
newTS := new(TaskServices)
func (t *WorkloadServices) Copy() *WorkloadServices {
newTS := new(WorkloadServices)
*newTS = *t
// Deep copy Services
@@ -70,3 +84,11 @@ func (t *TaskServices) Copy() *TaskServices {
}
return newTS
}
func (w *WorkloadServices) Name() string {
if w.Task != "" {
return w.Task
}
return "group-" + w.Group
}

View File

@@ -0,0 +1,17 @@
package consul
import (
"context"
"github.com/hashicorp/nomad/nomad/structs"
)
func NoopRestarter() WorkloadRestarter {
return noopRestarter{}
}
type noopRestarter struct{}
func (noopRestarter) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
return nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -17,12 +17,12 @@ import (
)
var (
allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id")
fileNameNotPresentErr = fmt.Errorf("must provide a file name")
taskNotPresentErr = fmt.Errorf("must provide task name")
logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)")
clientNotRunning = fmt.Errorf("node is not running a Nomad Client")
invalidOrigin = fmt.Errorf("origin must be start or end")
allocIDNotPresentErr = CodedError(400, "must provide a valid alloc id")
fileNameNotPresentErr = CodedError(400, "must provide a file name")
taskNotPresentErr = CodedError(400, "must provide task name")
logTypeNotPresentErr = CodedError(400, "must provide log type (stdout/stderr)")
clientNotRunning = CodedError(400, "node is not running a Nomad Client")
invalidOrigin = CodedError(400, "origin must be start or end")
)
func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
@@ -273,13 +273,13 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
if followStr := q.Get("follow"); followStr != "" {
if follow, err = strconv.ParseBool(followStr); err != nil {
return nil, fmt.Errorf("failed to parse follow field to boolean: %v", err)
return nil, CodedError(400, fmt.Sprintf("failed to parse follow field to boolean: %v", err))
}
}
if plainStr := q.Get("plain"); plainStr != "" {
if plain, err = strconv.ParseBool(plainStr); err != nil {
return nil, fmt.Errorf("failed to parse plain field to boolean: %v", err)
return nil, CodedError(400, fmt.Sprintf("failed to parse plain field to boolean: %v", err))
}
}
@@ -295,7 +295,7 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
if offsetString != "" {
var err error
if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil {
return nil, fmt.Errorf("error parsing offset: %v", err)
return nil, CodedError(400, fmt.Sprintf("error parsing offset: %v", err))
}
}
@@ -388,10 +388,13 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter,
decoder.Reset(httpPipe)
if err := res.Error; err != nil {
code := 500
if err.Code != nil {
errCh <- CodedError(int(*err.Code), err.Error())
return
code = int(*err.Code)
}
errCh <- CodedError(code, err.Error())
return
}
if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil {

View File

@@ -3,7 +3,6 @@ package agent
import (
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
@@ -12,6 +11,7 @@ import (
"time"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
@@ -189,25 +189,26 @@ func TestHTTP_FS_Stream_MissingParams(t *testing.T) {
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil)
require.Nil(err)
require.NoError(err)
respW := httptest.NewRecorder()
_, err = s.Server.Stream(respW, req)
require.EqualError(err, allocIDNotPresentErr.Error())
req, err = http.NewRequest("GET", "/v1/client/fs/stream/foo", nil)
require.Nil(err)
require.NoError(err)
respW = httptest.NewRecorder()
_, err = s.Server.Stream(respW, req)
require.EqualError(err, fileNameNotPresentErr.Error())
req, err = http.NewRequest("GET", "/v1/client/fs/stream/foo?path=/path/to/file", nil)
require.Nil(err)
require.NoError(err)
respW = httptest.NewRecorder()
_, err = s.Server.Stream(respW, req)
require.Nil(err)
require.Error(err)
require.Contains(err.Error(), "alloc lookup failed")
})
}
@@ -219,38 +220,39 @@ func TestHTTP_FS_Logs_MissingParams(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
// AllocID Not Present
req, err := http.NewRequest("GET", "/v1/client/fs/logs/", nil)
require.Nil(err)
require.NoError(err)
respW := httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(respW.Body.String(), allocIDNotPresentErr.Error())
require.Equal(500, respW.Code) // 500 for backward compat
require.Equal(400, respW.Code)
// Task Not Present
req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo", nil)
require.Nil(err)
require.NoError(err)
respW = httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(respW.Body.String(), taskNotPresentErr.Error())
require.Equal(500, respW.Code) // 500 for backward compat
require.Equal(400, respW.Code)
// Log Type Not Present
req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo?task=foo", nil)
require.Nil(err)
require.NoError(err)
respW = httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(respW.Body.String(), logTypeNotPresentErr.Error())
require.Equal(500, respW.Code) // 500 for backward compat
require.Equal(400, respW.Code)
// Ok
// case where all parameters are set but alloc isn't found
req, err = http.NewRequest("GET", "/v1/client/fs/logs/foo?task=foo&type=stdout", nil)
require.Nil(err)
require.NoError(err)
respW = httptest.NewRecorder()
s.Server.mux.ServeHTTP(respW, req)
require.Equal(200, respW.Code)
require.Equal(500, respW.Code)
require.Contains(respW.Body.String(), "alloc lookup failed")
})
}
@@ -354,8 +356,7 @@ func TestHTTP_FS_Stream_NoFollow(t *testing.T) {
path := fmt.Sprintf("/v1/client/fs/stream/%s?path=alloc/logs/web.stdout.0&offset=%d&origin=end&follow=false",
a.ID, offset)
p, _ := io.Pipe()
req, err := http.NewRequest("GET", path, p)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := testutil.NewResponseRecorder()
doneCh := make(chan struct{})
@@ -383,8 +384,6 @@ func TestHTTP_FS_Stream_NoFollow(t *testing.T) {
case <-time.After(1 * time.Second):
t.Fatal("should close but did not")
}
p.Close()
})
}
@@ -401,9 +400,7 @@ func TestHTTP_FS_Stream_Follow(t *testing.T) {
path := fmt.Sprintf("/v1/client/fs/stream/%s?path=alloc/logs/web.stdout.0&offset=%d&origin=end",
a.ID, offset)
p, _ := io.Pipe()
req, err := http.NewRequest("GET", path, p)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := httptest.NewRecorder()
doneCh := make(chan struct{})
@@ -431,8 +428,6 @@ func TestHTTP_FS_Stream_Follow(t *testing.T) {
t.Fatal("shouldn't close")
case <-time.After(1 * time.Second):
}
p.Close()
})
}
@@ -448,8 +443,7 @@ func TestHTTP_FS_Logs(t *testing.T) {
path := fmt.Sprintf("/v1/client/fs/logs/%s?type=stdout&task=web&offset=%d&origin=end&plain=true",
a.ID, offset)
p, _ := io.Pipe()
req, err := http.NewRequest("GET", path, p)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := testutil.NewResponseRecorder()
go func() {
@@ -469,8 +463,6 @@ func TestHTTP_FS_Logs(t *testing.T) {
}, func(err error) {
t.Fatal(err)
})
p.Close()
})
}
@@ -486,8 +478,7 @@ func TestHTTP_FS_Logs_Follow(t *testing.T) {
path := fmt.Sprintf("/v1/client/fs/logs/%s?type=stdout&task=web&offset=%d&origin=end&plain=true&follow=true",
a.ID, offset)
p, _ := io.Pipe()
req, err := http.NewRequest("GET", path, p)
req, err := http.NewRequest("GET", path, nil)
require.Nil(err)
respW := testutil.NewResponseRecorder()
errCh := make(chan error)
@@ -514,7 +505,23 @@ func TestHTTP_FS_Logs_Follow(t *testing.T) {
t.Fatalf("shouldn't exit: %v", err)
case <-time.After(1 * time.Second):
}
p.Close()
})
}
func TestHTTP_FS_Logs_PropagatesErrors(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
path := fmt.Sprintf("/v1/client/fs/logs/%s?type=stdout&task=web&offset=0&origin=end&plain=true",
uuid.Generate())
req, err := http.NewRequest("GET", path, nil)
require.NoError(t, err)
respW := testutil.NewResponseRecorder()
_, err = s.Server.Logs(respW, req)
require.Error(t, err)
_, ok := err.(HTTPCodedError)
require.Truef(t, ok, "expected a coded error but found: %#+v", err)
})
}

View File

@@ -183,6 +183,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/agent/servers", s.wrap(s.AgentServersRequest))
s.mux.HandleFunc("/v1/agent/keyring/", s.wrap(s.KeyringOperationRequest))
s.mux.HandleFunc("/v1/agent/health", s.wrap(s.HealthRequest))
s.mux.HandleFunc("/v1/agent/monitor", s.wrap(s.AgentMonitor))
s.mux.HandleFunc("/v1/metrics", s.wrap(s.MetricsRequest))
@@ -212,7 +213,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
w.Write([]byte(stubHTML))
})
}
s.mux.Handle("/", handleRootRedirect())
s.mux.Handle("/", handleRootFallthrough())
if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
@@ -274,10 +275,13 @@ func handleUI(h http.Handler) http.Handler {
})
}
func handleRootRedirect() http.Handler {
func handleRootFallthrough() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
http.Redirect(w, req, "/ui/", 307)
return
if req.URL.Path == "/" {
http.Redirect(w, req, "/ui/", 307)
} else {
w.WriteHeader(http.StatusNotFound)
}
})
}
@@ -300,6 +304,9 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
errMsg := err.Error()
if http, ok := err.(HTTPCodedError); ok {
code = http.Code()
} else if ecode, emsg, ok := structs.CodeFromRPCCodedErr(err); ok {
code = ecode
errMsg = emsg
} else {
// RPC errors get wrapped, so manually unwrap by only looking at their suffix
if strings.HasSuffix(errMsg, structs.ErrPermissionDenied.Error()) {

View File

@@ -12,7 +12,6 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"
"time"
@@ -21,6 +20,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/ugorji/go/codec"
)
@@ -61,6 +61,58 @@ func BenchmarkHTTPRequests(b *testing.B) {
})
}
// TestRootFallthrough tests rootFallthrough handler to
// verify redirect and 404 behavior
func TestRootFallthrough(t *testing.T) {
t.Parallel()
cases := []struct {
desc string
path string
expectedPath string
expectedCode int
}{
{
desc: "unknown endpoint 404s",
path: "/v1/unknown/endpoint",
expectedCode: 404,
},
{
desc: "root path redirects to ui",
path: "/",
expectedPath: "/ui/",
expectedCode: 307,
},
}
s := makeHTTPServer(t, nil)
defer s.Shutdown()
// setup a client that doesn't follow redirects
client := &http.Client{
CheckRedirect: func(_ *http.Request, _ []*http.Request) error {
return http.ErrUseLastResponse
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
reqURL := fmt.Sprintf("http://%s%s", s.Agent.config.AdvertiseAddrs.HTTP, tc.path)
resp, err := client.Get(reqURL)
require.NoError(t, err)
require.Equal(t, tc.expectedCode, resp.StatusCode)
if tc.expectedPath != "" {
loc, err := resp.Location()
require.NoError(t, err)
require.Equal(t, tc.expectedPath, loc.Path)
}
})
}
}
func TestSetIndex(t *testing.T) {
t.Parallel()
resp := httptest.NewRecorder()
@@ -525,23 +577,6 @@ func TestHTTP_VerifyHTTPSClient(t *testing.T) {
}
}
// assertIndex tests that X-Nomad-Index is set and non-zero
func assertIndex(t *testing.T, resp *httptest.ResponseRecorder) {
header := resp.Header().Get("X-Nomad-Index")
if header == "" || header == "0" {
t.Fatalf("Bad: %v", header)
}
}
// checkIndex is like assertIndex but returns an error
func checkIndex(resp *httptest.ResponseRecorder) error {
header := resp.Header().Get("X-Nomad-Index")
if header == "" || header == "0" {
return fmt.Errorf("Bad: %v", header)
}
return nil
}
func TestHTTP_VerifyHTTPSClient_AfterConfigReload(t *testing.T) {
t.Parallel()
assert := assert.New(t)
@@ -644,19 +679,6 @@ func TestHTTP_VerifyHTTPSClient_AfterConfigReload(t *testing.T) {
}
}
// getIndex parses X-Nomad-Index
func getIndex(t *testing.T, resp *httptest.ResponseRecorder) uint64 {
header := resp.Header().Get("X-Nomad-Index")
if header == "" {
t.Fatalf("Bad: %v", header)
}
val, err := strconv.Atoi(header)
if err != nil {
t.Fatalf("Bad: %v", header)
}
return uint64(val)
}
func httpTest(t testing.TB, cb func(c *Config), f func(srv *TestAgent)) {
s := makeHTTPServer(t, cb)
defer s.Shutdown()

View File

@@ -753,7 +753,7 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
Name: v.Name,
Type: v.Type,
ReadOnly: v.ReadOnly,
Config: v.Config,
Source: v.Source,
}
tg.Volumes[k] = vol
@@ -812,9 +812,10 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
structsTask.VolumeMounts = make([]*structs.VolumeMount, l)
for i, mount := range apiTask.VolumeMounts {
structsTask.VolumeMounts[i] = &structs.VolumeMount{
Volume: mount.Volume,
Destination: mount.Destination,
ReadOnly: mount.ReadOnly,
Volume: *mount.Volume,
Destination: *mount.Destination,
ReadOnly: *mount.ReadOnly,
PropagationMode: *mount.PropagationMode,
}
}
}
@@ -1062,13 +1063,16 @@ func ApiConsulConnectToStructs(in *api.ConsulConnect) *structs.ConsulConnect {
if in.SidecarService != nil {
out.SidecarService = &structs.ConsulSidecarService{
Tags: helper.CopySliceString(in.SidecarService.Tags),
Port: in.SidecarService.Port,
}
if in.SidecarService.Proxy != nil {
out.SidecarService.Proxy = &structs.ConsulProxy{
Config: in.SidecarService.Proxy.Config,
LocalServiceAddress: in.SidecarService.Proxy.LocalServiceAddress,
LocalServicePort: in.SidecarService.Proxy.LocalServicePort,
Config: in.SidecarService.Proxy.Config,
}
upstreams := make([]structs.ConsulUpstream, len(in.SidecarService.Proxy.Upstreams))

View File

@@ -1537,6 +1537,13 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
TaskName: "task1",
},
},
Connect: &api.ConsulConnect{
Native: false,
SidecarService: &api.ConsulSidecarService{
Tags: []string{"f", "g"},
Port: "9000",
},
},
},
},
Tasks: []*api.Task{
@@ -1877,6 +1884,13 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
TaskName: "task1",
},
},
Connect: &structs.ConsulConnect{
Native: false,
SidecarService: &structs.ConsulSidecarService{
Tags: []string{"f", "g"},
Port: "9000",
},
},
},
},
Tasks: []*structs.Task{

146
command/agent/log_file.go Normal file
View File

@@ -0,0 +1,146 @@
package agent
import (
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/hashicorp/logutils"
)
var (
now = time.Now
)
// logFile is used to setup a file based logger that also performs log rotation
type logFile struct {
// Log level Filter to filter out logs that do not matcch LogLevel criteria
logFilter *logutils.LevelFilter
//Name of the log file
fileName string
//Path to the log file
logPath string
//Duration between each file rotation operation
duration time.Duration
//LastCreated represents the creation time of the latest log
LastCreated time.Time
//FileInfo is the pointer to the current file being written to
FileInfo *os.File
//MaxBytes is the maximum number of desired bytes for a log file
MaxBytes int
//BytesWritten is the number of bytes written in the current log file
BytesWritten int64
// Max rotated files to keep before removing them.
MaxFiles int
//acquire is the mutex utilized to ensure we have no concurrency issues
acquire sync.Mutex
}
func (l *logFile) fileNamePattern() string {
// Extract the file extension
fileExt := filepath.Ext(l.fileName)
// If we have no file extension we append .log
if fileExt == "" {
fileExt = ".log"
}
// Remove the file extension from the filename
return strings.TrimSuffix(l.fileName, fileExt) + "-%s" + fileExt
}
func (l *logFile) openNew() error {
fileNamePattern := l.fileNamePattern()
// New file name has the format : filename-timestamp.extension
createTime := now()
newfileName := fmt.Sprintf(fileNamePattern, strconv.FormatInt(createTime.UnixNano(), 10))
newfilePath := filepath.Join(l.logPath, newfileName)
// Try creating a file. We truncate the file because we are the only authority to write the logs
filePointer, err := os.OpenFile(newfilePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0640)
if err != nil {
return err
}
l.FileInfo = filePointer
// New file, new bytes tracker, new creation time :)
l.LastCreated = createTime
l.BytesWritten = 0
return nil
}
func (l *logFile) rotate() error {
// Get the time from the last point of contact
timeElapsed := time.Since(l.LastCreated)
// Rotate if we hit the byte file limit or the time limit
if (l.BytesWritten >= int64(l.MaxBytes) && (l.MaxBytes > 0)) || timeElapsed >= l.duration {
l.FileInfo.Close()
if err := l.pruneFiles(); err != nil {
return err
}
return l.openNew()
}
return nil
}
func (l *logFile) pruneFiles() error {
if l.MaxFiles == 0 {
return nil
}
pattern := l.fileNamePattern()
//get all the files that match the log file pattern
globExpression := filepath.Join(l.logPath, fmt.Sprintf(pattern, "*"))
matches, err := filepath.Glob(globExpression)
if err != nil {
return err
}
// Stort the strings as filepath.Glob does not publicly guarantee that files
// are sorted, so here we add an extra defensive sort.
sort.Strings(matches)
// Prune if there are more files stored than the configured max
stale := len(matches) - l.MaxFiles
for i := 0; i < stale; i++ {
if err := os.Remove(matches[i]); err != nil {
return err
}
}
return nil
}
// Write is used to implement io.Writer
func (l *logFile) Write(b []byte) (int, error) {
// Filter out log entries that do not match log level criteria
if !l.logFilter.Check(b) {
return 0, nil
}
l.acquire.Lock()
defer l.acquire.Unlock()
//Create a new file if we have no file to write to
if l.FileInfo == nil {
if err := l.openNew(); err != nil {
return 0, err
}
}
// Check for the last contact and rotate if necessary
if err := l.rotate(); err != nil {
return 0, err
}
n, err := l.FileInfo.Write(b)
l.BytesWritten += int64(n)
return n, err
}

View File

@@ -0,0 +1,171 @@
package agent
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/hashicorp/logutils"
"github.com/stretchr/testify/require"
)
const (
testFileName = "Nomad.log"
testDuration = 2 * time.Second
testBytes = 10
)
func TestLogFile_timeRotation(t *testing.T) {
t.Parallel()
require := require.New(t)
tempDir, err := ioutil.TempDir("", "LogWriterTimeTest")
require.NoError(err)
defer os.Remove(tempDir)
filt := LevelFilter()
logFile := logFile{
logFilter: filt,
fileName: testFileName,
logPath: tempDir,
duration: testDuration,
}
logFile.Write([]byte("Hello World"))
time.Sleep(2 * time.Second)
logFile.Write([]byte("Second File"))
want := 2
if got, _ := ioutil.ReadDir(tempDir); len(got) != want {
t.Errorf("Expected %d files, got %v file(s)", want, len(got))
}
}
func TestLogFile_openNew(t *testing.T) {
t.Parallel()
require := require.New(t)
tempDir, err := ioutil.TempDir("", "LogWriterOpenTest")
require.NoError(err)
defer os.Remove(tempDir)
logFile := logFile{fileName: testFileName, logPath: tempDir, duration: testDuration}
require.NoError(logFile.openNew())
_, err = ioutil.ReadFile(logFile.FileInfo.Name())
require.NoError(err)
}
func TestLogFile_byteRotation(t *testing.T) {
t.Parallel()
require := require.New(t)
tempDir, err := ioutil.TempDir("", "LogWriterByteTest")
require.NoError(err)
defer os.Remove(tempDir)
filt := LevelFilter()
filt.MinLevel = logutils.LogLevel("INFO")
logFile := logFile{
logFilter: filt,
fileName: testFileName,
logPath: tempDir,
MaxBytes: testBytes,
duration: 24 * time.Hour,
}
logFile.Write([]byte("Hello World"))
logFile.Write([]byte("Second File"))
want := 2
tempFiles, _ := ioutil.ReadDir(tempDir)
require.Equal(want, len(tempFiles))
}
func TestLogFile_logLevelFiltering(t *testing.T) {
t.Parallel()
require := require.New(t)
tempDir, err := ioutil.TempDir("", "LogWriterFilterTest")
require.NoError(err)
defer os.Remove(tempDir)
filt := LevelFilter()
logFile := logFile{
logFilter: filt,
fileName: testFileName,
logPath: tempDir,
MaxBytes: testBytes,
duration: testDuration,
}
logFile.Write([]byte("[INFO] This is an info message"))
logFile.Write([]byte("[DEBUG] This is a debug message"))
logFile.Write([]byte("[ERR] This is an error message"))
want := 2
tempFiles, _ := ioutil.ReadDir(tempDir)
require.Equal(want, len(tempFiles))
}
func TestLogFile_deleteArchives(t *testing.T) {
t.Parallel()
require := require.New(t)
tempDir, err := ioutil.TempDir("", "LogWriterDeleteArchivesTest")
require.NoError(err)
defer os.Remove(tempDir)
filt := LevelFilter()
filt.MinLevel = logutils.LogLevel("INFO")
logFile := logFile{
logFilter: filt,
fileName: testFileName,
logPath: tempDir,
MaxBytes: testBytes,
duration: 24 * time.Hour,
MaxFiles: 1,
}
logFile.Write([]byte("[INFO] Hello World"))
logFile.Write([]byte("[INFO] Second File"))
logFile.Write([]byte("[INFO] Third File"))
want := 2
tempFiles, _ := ioutil.ReadDir(tempDir)
require.Equal(want, len(tempFiles))
for _, tempFile := range tempFiles {
var bytes []byte
var err error
path := filepath.Join(tempDir, tempFile.Name())
if bytes, err = ioutil.ReadFile(path); err != nil {
t.Errorf(err.Error())
return
}
contents := string(bytes)
require.NotEqual("[INFO] Hello World", contents, "oldest log should have been deleted")
}
}
func TestLogFile_deleteArchivesDisabled(t *testing.T) {
t.Parallel()
require := require.New(t)
tempDir, err := ioutil.TempDir("", "LogWriterDeleteArchivesDisabledTest")
require.NoError(err)
defer os.Remove(tempDir)
filt := LevelFilter()
filt.MinLevel = logutils.LogLevel("INFO")
logFile := logFile{
logFilter: filt,
fileName: testFileName,
logPath: tempDir,
MaxBytes: testBytes,
duration: 24 * time.Hour,
MaxFiles: 0,
}
logFile.Write([]byte("[INFO] Hello World"))
logFile.Write([]byte("[INFO] Second File"))
logFile.Write([]byte("[INFO] Third File"))
want := 3
tempFiles, _ := ioutil.ReadDir(tempDir)
require.Equal(want, len(tempFiles))
}

View File

@@ -1,83 +0,0 @@
package agent
import (
"sync"
)
// LogHandler interface is used for clients that want to subscribe
// to logs, for example to stream them over an IPC mechanism
type LogHandler interface {
HandleLog(string)
}
// logWriter implements io.Writer so it can be used as a log sink.
// It maintains a circular buffer of logs, and a set of handlers to
// which it can stream the logs to.
type logWriter struct {
sync.Mutex
logs []string
index int
handlers map[LogHandler]struct{}
}
// NewLogWriter creates a logWriter with the given buffer capacity
func NewLogWriter(buf int) *logWriter {
return &logWriter{
logs: make([]string, buf),
index: 0,
handlers: make(map[LogHandler]struct{}),
}
}
// RegisterHandler adds a log handler to receive logs, and sends
// the last buffered logs to the handler
func (l *logWriter) RegisterHandler(lh LogHandler) {
l.Lock()
defer l.Unlock()
// Do nothing if already registered
if _, ok := l.handlers[lh]; ok {
return
}
// Register
l.handlers[lh] = struct{}{}
// Send the old logs
if l.logs[l.index] != "" {
for i := l.index; i < len(l.logs); i++ {
lh.HandleLog(l.logs[i])
}
}
for i := 0; i < l.index; i++ {
lh.HandleLog(l.logs[i])
}
}
// DeregisterHandler removes a LogHandler and prevents more invocations
func (l *logWriter) DeregisterHandler(lh LogHandler) {
l.Lock()
defer l.Unlock()
delete(l.handlers, lh)
}
// Write is used to accumulate new logs
func (l *logWriter) Write(p []byte) (n int, err error) {
l.Lock()
defer l.Unlock()
// Strip off newlines at the end if there are any since we store
// individual log lines in the agent.
n = len(p)
if p[n-1] == '\n' {
p = p[:n-1]
}
l.logs[l.index] = string(p)
l.index = (l.index + 1) % len(l.logs)
for lh := range l.handlers {
lh.HandleLog(string(p))
}
return
}

View File

@@ -1,52 +0,0 @@
package agent
import (
"testing"
)
type MockLogHandler struct {
logs []string
}
func (m *MockLogHandler) HandleLog(l string) {
m.logs = append(m.logs, l)
}
func TestLogWriter(t *testing.T) {
t.Parallel()
h := &MockLogHandler{}
w := NewLogWriter(4)
// Write some logs
w.Write([]byte("one")) // Gets dropped!
w.Write([]byte("two"))
w.Write([]byte("three"))
w.Write([]byte("four"))
w.Write([]byte("five"))
// Register a handler, sends old!
w.RegisterHandler(h)
w.Write([]byte("six"))
w.Write([]byte("seven"))
// Deregister
w.DeregisterHandler(h)
w.Write([]byte("eight"))
w.Write([]byte("nine"))
out := []string{
"two",
"three",
"four",
"five",
"six",
"seven",
}
for idx := range out {
if out[idx] != h.logs[idx] {
t.Fatalf("mismatch %v", h.logs)
}
}
}

View File

@@ -121,7 +121,7 @@ func TestHTTP_FreshClientAllocMetrics(t *testing.T) {
terminal == float32(numTasks), nil
}, func(err error) {
require.Fail("timed out waiting for metrics to converge",
"pending: %v, running: %v, terminal: %v", pending, running, terminal)
"expected: (pending: 0, running: 0, terminal: %v), got: (pending: %v, running: %v, terminal: %v)", numTasks, pending, running, terminal)
})
})
}

View File

@@ -0,0 +1,172 @@
package monitor
import (
"fmt"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
)
// Monitor provides a mechanism to stream logs using go-hclog
// InterceptLogger and SinkAdapter. It allows streaming of logs
// at a different log level than what is set on the logger.
type Monitor interface {
// Start returns a channel of log messages which are sent
// ever time a log message occurs
Start() <-chan []byte
// Stop de-registers the sink from the InterceptLogger
// and closes the log channels
Stop()
}
// monitor implements the Monitor interface
type monitor struct {
// protects droppedCount and logCh
sync.Mutex
sink log.SinkAdapter
// logger is the logger we will be monitoring
logger log.InterceptLogger
// logCh is a buffered chan where we send logs when streaming
logCh chan []byte
// doneCh coordinates the shutdown of logCh
doneCh chan struct{}
// droppedCount is the current count of messages
// that were dropped from the logCh buffer.
// only access under lock
droppedCount int
bufSize int
// droppedDuration is the amount of time we should
// wait to check for dropped messages. Defaults
// to 3 seconds
droppedDuration time.Duration
}
// New creates a new Monitor. Start must be called in order to actually start
// streaming logs
func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) Monitor {
return new(buf, logger, opts)
}
func new(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *monitor {
sw := &monitor{
logger: logger,
logCh: make(chan []byte, buf),
doneCh: make(chan struct{}, 1),
bufSize: buf,
droppedDuration: 3 * time.Second,
}
opts.Output = sw
sink := log.NewSinkAdapter(opts)
sw.sink = sink
return sw
}
// Stop deregisters the sink and stops the monitoring process
func (d *monitor) Stop() {
d.logger.DeregisterSink(d.sink)
close(d.doneCh)
}
// Start registers a sink on the monitor's logger and starts sending
// received log messages over the returned channel.
func (d *monitor) Start() <-chan []byte {
// register our sink with the logger
d.logger.RegisterSink(d.sink)
streamCh := make(chan []byte, d.bufSize)
// run a go routine that listens for streamed
// log messages and sends them to streamCh
go func() {
defer close(streamCh)
for {
select {
case log := <-d.logCh:
select {
case <-d.doneCh:
return
case streamCh <- log:
}
case <-d.doneCh:
return
}
}
}()
// run a go routine that periodically checks for
// dropped messages and makes room on the logCh
// to add a dropped message count warning
go func() {
// loop and check for dropped messages
for {
select {
case <-d.doneCh:
return
case <-time.After(d.droppedDuration):
d.Lock()
// Check if there have been any dropped messages.
if d.droppedCount > 0 {
dropped := fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount)
select {
case <-d.doneCh:
d.Unlock()
return
// Try sending dropped message count to logCh in case
// there is room in the buffer now.
case d.logCh <- []byte(dropped):
default:
// Drop a log message to make room for "Monitor dropped.." message
select {
case <-d.logCh:
d.droppedCount++
dropped = fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount)
default:
}
d.logCh <- []byte(dropped)
}
d.droppedCount = 0
}
// unlock after handling dropped message
d.Unlock()
}
}
}()
return streamCh
}
// Write attempts to send latest log to logCh
// it drops the log if channel is unavailable to receive
func (d *monitor) Write(p []byte) (n int, err error) {
d.Lock()
defer d.Unlock()
// ensure logCh is still open
select {
case <-d.doneCh:
return
default:
}
bytes := make([]byte, len(p))
copy(bytes, p)
select {
case d.logCh <- bytes:
default:
d.droppedCount++
}
return len(p), nil
}

View File

@@ -0,0 +1,88 @@
package monitor
import (
"fmt"
"strings"
"testing"
"time"
log "github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
)
func TestMonitor_Start(t *testing.T) {
t.Parallel()
logger := log.NewInterceptLogger(&log.LoggerOptions{
Level: log.Error,
})
m := New(512, logger, &log.LoggerOptions{
Level: log.Debug,
})
logCh := m.Start()
defer m.Stop()
go func() {
logger.Debug("test log")
time.Sleep(10 * time.Millisecond)
}()
for {
select {
case log := <-logCh:
require.Contains(t, string(log), "[DEBUG] test log")
return
case <-time.After(3 * time.Second):
t.Fatal("Expected to receive from log channel")
}
}
}
// Ensure number of dropped messages are logged
func TestMonitor_DroppedMessages(t *testing.T) {
t.Parallel()
logger := log.NewInterceptLogger(&log.LoggerOptions{
Level: log.Warn,
})
m := new(5, logger, &log.LoggerOptions{
Level: log.Debug,
})
m.droppedDuration = 5 * time.Millisecond
doneCh := make(chan struct{})
defer close(doneCh)
logCh := m.Start()
for i := 0; i <= 100; i++ {
logger.Debug(fmt.Sprintf("test message %d", i))
}
received := ""
passed := make(chan struct{})
go func() {
for {
select {
case recv := <-logCh:
received += string(recv)
if strings.Contains(received, "[WARN] Monitor dropped") {
close(passed)
}
}
}
}()
TEST:
for {
select {
case <-passed:
break TEST
case <-time.After(2 * time.Second):
require.Fail(t, "expected to see warn dropped messages")
}
}
}

View File

@@ -216,7 +216,7 @@ func (a *TestAgent) start() (*Agent, error) {
return nil, fmt.Errorf("unable to set up in memory metrics needed for agent initialization")
}
logger := hclog.New(&hclog.LoggerOptions{
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Name: "agent",
Level: hclog.LevelFromString(a.Config.LogLevel),
Output: a.LogOutput,

View File

@@ -13,6 +13,8 @@ log_level = "ERR"
log_json = true
log_file = "/var/log/nomad.log"
bind_addr = "192.168.0.1"
enable_debug = true

View File

@@ -143,6 +143,7 @@
],
"leave_on_interrupt": true,
"leave_on_terminate": true,
"log_file": "/var/log/nomad.log",
"log_json": true,
"log_level": "ERR",
"name": "my-web",

View File

@@ -99,17 +99,6 @@ func MockJob() *api.Job {
return job
}
func MockPeriodicJob() *api.Job {
j := MockJob()
j.Type = helper.StringToPtr("batch")
j.Periodic = &api.PeriodicConfig{
Enabled: helper.BoolToPtr(true),
SpecType: helper.StringToPtr("cron"),
Spec: helper.StringToPtr("*/30 * * * *"),
}
return j
}
func MockRegionalJob() *api.Job {
j := MockJob()
j.Region = helper.StringToPtr("north-america")

137
command/agent_monitor.go Normal file
View File

@@ -0,0 +1,137 @@
package command
import (
"fmt"
"io"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/hashicorp/nomad/api"
"github.com/mitchellh/cli"
)
type MonitorCommand struct {
Meta
}
func (c *MonitorCommand) Help() string {
helpText := `
Usage: nomad monitor [options]
Stream log messages of a nomad agent. The monitor command lets you
listen for log levels that may be filtered out of the Nomad agent. For
example your agent may only be logging at INFO level, but with the monitor
command you can set -log-level DEBUG
General Options:
` + generalOptionsUsage() + `
Monitor Specific Options:
-log-level <level>
Sets the log level to monitor (default: INFO)
-node-id <node-id>
Sets the specific node to monitor
-json
Sets log output to JSON format
`
return strings.TrimSpace(helpText)
}
func (c *MonitorCommand) Synopsis() string {
return "stream logs from a Nomad agent"
}
func (c *MonitorCommand) Name() string { return "monitor" }
func (c *MonitorCommand) Run(args []string) int {
c.Ui = &cli.PrefixedUi{
OutputPrefix: " ",
InfoPrefix: " ",
ErrorPrefix: "==> ",
Ui: c.Ui,
}
var logLevel string
var nodeID string
var serverID string
var logJSON bool
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.StringVar(&logLevel, "log-level", "", "")
flags.StringVar(&nodeID, "node-id", "", "")
flags.StringVar(&serverID, "server-id", "", "")
flags.BoolVar(&logJSON, "json", false, "")
if err := flags.Parse(args); err != nil {
return 1
}
args = flags.Args()
if l := len(args); l != 0 {
c.Ui.Error("This command takes no arguments")
c.Ui.Error(commandErrorText(c))
return 1
}
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
c.Ui.Error(commandErrorText(c))
return 1
}
params := map[string]string{
"log_level": logLevel,
"node_id": nodeID,
"server_id": serverID,
"log_json": strconv.FormatBool(logJSON),
}
query := &api.QueryOptions{
Params: params,
}
eventDoneCh := make(chan struct{})
frames, errCh := client.Agent().Monitor(eventDoneCh, query)
select {
case err := <-errCh:
c.Ui.Error(fmt.Sprintf("Error starting monitor: %s", err))
c.Ui.Error(commandErrorText(c))
return 1
default:
}
// Create a reader
var r io.ReadCloser
frameReader := api.NewFrameReader(frames, errCh, eventDoneCh)
frameReader.SetUnblockTime(500 * time.Millisecond)
r = frameReader
defer r.Close()
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
go func() {
<-signalCh
// End the streaming
r.Close()
}()
_, err = io.Copy(os.Stdout, r)
if err != nil {
c.Ui.Error(fmt.Sprintf("error monitoring logs: %s", err))
return 1
}
return 0
}

View File

@@ -0,0 +1,36 @@
package command
import (
"strings"
"testing"
"github.com/mitchellh/cli"
)
func TestMonitorCommand_Implements(t *testing.T) {
t.Parallel()
var _ cli.Command = &MonitorCommand{}
}
func TestMonitorCommand_Fails(t *testing.T) {
t.Parallel()
srv, _, _ := testServer(t, false, nil)
defer srv.Shutdown()
ui := new(cli.MockUi)
cmd := &MonitorCommand{Meta: Meta{Ui: ui}}
// Fails on misuse
if code := cmd.Run([]string{"some", "bad", "args"}); code != 1 {
t.Fatalf("exepected exit code 1, got: %d", code)
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, commandErrorText(cmd)) {
t.Fatalf("expected help output, got: %s", out)
}
ui.ErrorWriter.Reset()
if code := cmd.Run([]string{"-address=nope"}); code != 1 {
t.Fatalf("exepected exit code 1, got: %d", code)
}
}

View File

@@ -105,8 +105,8 @@ func (l *AllocExecCommand) Run(args []string) int {
flags.BoolVar(&stdinOpt, "i", true, "")
stdinTty := isStdinTty()
flags.BoolVar(&ttyOpt, "t", stdinTty, "")
inferredTty := isTty()
flags.BoolVar(&ttyOpt, "t", inferredTty, "")
if err := flags.Parse(args); err != nil {
return 1
@@ -194,7 +194,6 @@ func (l *AllocExecCommand) Run(args []string) int {
if err != nil {
l.Ui.Error(err.Error())
l.Ui.Error("\nPlease specify the task.")
return 1
}
}
@@ -292,9 +291,11 @@ func (l *AllocExecCommand) execImpl(client *api.Client, alloc *api.Allocation, t
alloc, task, tty, command, stdin, stdout, stderr, sizeCh, nil)
}
func isStdinTty() bool {
_, isTerminal := term.GetFdInfo(os.Stdin)
return isTerminal
// isTty returns true if both stdin and stdout are a TTY
func isTty() bool {
_, isStdinTerminal := term.GetFdInfo(os.Stdin)
_, isStdoutTerminal := term.GetFdInfo(os.Stdout)
return isStdinTerminal && isStdoutTerminal
}
// setRawTerminal sets the stream terminal in raw mode, so process captures

View File

@@ -190,7 +190,6 @@ func (l *AllocLogsCommand) Run(args []string) int {
if err != nil {
l.Ui.Error(err.Error())
l.Ui.Error("\nPlease specify the task.")
return 1
}
}

View File

@@ -234,13 +234,13 @@ func formatAllocBasicInfo(alloc *api.Allocation, client *api.Client, uuidLength
}
basic := []string{
fmt.Sprintf("ID|%s", limit(alloc.ID, uuidLength)),
fmt.Sprintf("ID|%s", alloc.ID),
fmt.Sprintf("Eval ID|%s", limit(alloc.EvalID, uuidLength)),
fmt.Sprintf("Name|%s", alloc.Name),
fmt.Sprintf("Node ID|%s", limit(alloc.NodeID, uuidLength)),
fmt.Sprintf("Node Name|%s", alloc.NodeName),
fmt.Sprintf("Job ID|%s", alloc.JobID),
fmt.Sprintf("Job Version|%d", alloc.Job.Version),
fmt.Sprintf("Job Version|%d", *alloc.Job.Version),
fmt.Sprintf("Client Status|%s", alloc.ClientStatus),
fmt.Sprintf("Client Description|%s", alloc.ClientDescription),
fmt.Sprintf("Desired Status|%s", alloc.DesiredStatus),

View File

@@ -286,6 +286,9 @@ func TestAllocStatusCommand_ScoreMetrics(t *testing.T) {
require.Contains(out, "Placement Metrics")
require.Contains(out, mockNode1.ID)
require.Contains(out, mockNode2.ID)
// assert we sort headers alphabetically
require.Contains(out, "binpack node-affinity")
require.Contains(out, "final score")
}

View File

@@ -366,6 +366,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"monitor": func() (cli.Command, error) {
return &MonitorCommand{
Meta: meta,
}, nil
},
"namespace": func() (cli.Command, error) {
return &NamespaceCommand{
Meta: meta,

View File

@@ -421,10 +421,10 @@ func (j *JobGetter) ApiJob(jpath string) (*api.Job, error) {
return nil, fmt.Errorf("Error getting jobfile from %q: %v", jpath, err)
} else {
file, err := os.Open(job.Name())
defer file.Close()
if err != nil {
return nil, fmt.Errorf("Error opening file %q: %v", jpath, err)
}
defer file.Close()
jobfile = file
}
}

View File

@@ -174,17 +174,17 @@ func TestHelpers_LineLimitReader_TimeLimit(t *testing.T) {
expected := []byte("hello world")
resultCh := make(chan struct{})
errCh := make(chan error)
resultCh := make(chan []byte)
go func() {
defer close(resultCh)
defer close(errCh)
outBytes, err := ioutil.ReadAll(limit)
if err != nil {
t.Fatalf("ReadAll failed: %v", err)
}
if reflect.DeepEqual(outBytes, expected) {
close(resultCh)
errCh <- fmt.Errorf("ReadAll failed: %v", err)
return
}
resultCh <- outBytes
}()
// Send the data
@@ -192,7 +192,14 @@ func TestHelpers_LineLimitReader_TimeLimit(t *testing.T) {
in.Close()
select {
case <-resultCh:
case err := <-errCh:
if err != nil {
t.Fatalf("ReadAll: %v", err)
}
case outBytes := <-resultCh:
if !reflect.DeepEqual(outBytes, expected) {
t.Fatalf("got:%s, expected,%s", string(outBytes), string(expected))
}
case <-time.After(1 * time.Second):
t.Fatalf("did not exit by time limit")
}

View File

@@ -36,7 +36,7 @@ Deployments Options:
-verbose
Display full information.
-all-allocs
-all
Display all deployments matching the job ID, including those
from an older instance of the job.
`

View File

@@ -462,7 +462,7 @@ func formatAllocList(allocations []*api.Allocation, verbose bool, uuidLength int
limit(alloc.EvalID, uuidLength),
limit(alloc.NodeID, uuidLength),
alloc.TaskGroup,
alloc.Job.Version,
*alloc.Job.Version,
alloc.DesiredStatus,
alloc.ClientStatus,
formatUnixNanoTime(alloc.CreateTime),
@@ -478,7 +478,7 @@ func formatAllocList(allocations []*api.Allocation, verbose bool, uuidLength int
limit(alloc.ID, uuidLength),
limit(alloc.NodeID, uuidLength),
alloc.TaskGroup,
alloc.Job.Version,
*alloc.Job.Version,
alloc.DesiredStatus,
alloc.ClientStatus,
createTimePretty,

View File

@@ -50,11 +50,12 @@ type Meta struct {
// token is used for ACLs to access privileged information
token string
caCert string
caPath string
clientCert string
clientKey string
insecure bool
caCert string
caPath string
clientCert string
clientKey string
tlsServerName string
insecure bool
}
// FlagSet returns a FlagSet with the common flags that every
@@ -76,6 +77,7 @@ func (m *Meta) FlagSet(n string, fs FlagSetFlags) *flag.FlagSet {
f.StringVar(&m.clientCert, "client-cert", "", "")
f.StringVar(&m.clientKey, "client-key", "", "")
f.BoolVar(&m.insecure, "insecure", false, "")
f.StringVar(&m.tlsServerName, "tls-server-name", "", "")
f.BoolVar(&m.insecure, "tls-skip-verify", false, "")
f.StringVar(&m.token, "token", "", "")
@@ -113,6 +115,7 @@ func (m *Meta) AutocompleteFlags(fs FlagSetFlags) complete.Flags {
"-client-cert": complete.PredictFiles("*"),
"-client-key": complete.PredictFiles("*"),
"-insecure": complete.PredictNothing,
"-tls-server-name": complete.PredictNothing,
"-tls-skip-verify": complete.PredictNothing,
"-token": complete.PredictAnything,
}
@@ -136,13 +139,14 @@ func (m *Meta) Client() (*api.Client, error) {
}
// If we need custom TLS configuration, then set it
if m.caCert != "" || m.caPath != "" || m.clientCert != "" || m.clientKey != "" || m.insecure {
if m.caCert != "" || m.caPath != "" || m.clientCert != "" || m.clientKey != "" || m.tlsServerName != "" || m.insecure {
t := &api.TLSConfig{
CACert: m.caCert,
CAPath: m.caPath,
ClientCert: m.clientCert,
ClientKey: m.clientKey,
Insecure: m.insecure,
CACert: m.caCert,
CAPath: m.caPath,
ClientCert: m.clientCert,
ClientKey: m.clientKey,
TLSServerName: m.tlsServerName,
Insecure: m.insecure,
}
config.TLSConfig = t
}
@@ -204,6 +208,10 @@ func generalOptionsUsage() string {
Path to an unencrypted PEM encoded private key matching the
client certificate from -client-cert. Overrides the
NOMAD_CLIENT_KEY environment variable if set.
-tls-server-name=<value>
The server name to use as the SNI host when connecting via
TLS. Overrides the NOMAD_TLS_SERVER_NAME environment variable if set.
-tls-skip-verify
Do not verify TLS certificate. This is highly not recommended. Verification

View File

@@ -29,6 +29,7 @@ func TestMeta_FlagSet(t *testing.T) {
"client-cert",
"client-key",
"insecure",
"tls-server-name",
"tls-skip-verify",
"token",
},

View File

@@ -2,6 +2,7 @@ package command
import (
"fmt"
"sort"
"strings"
"sync"
"time"
@@ -380,7 +381,16 @@ func formatAllocMetrics(metrics *api.AllocationMetric, scores bool, prefix strin
// Add header as first row
if i == 0 {
scoreOutput[0] = "Node|"
for scorerName := range scoreMeta.Scores {
// sort scores alphabetically
scores := make([]string, 0, len(scoreMeta.Scores))
for score := range scoreMeta.Scores {
scores = append(scores, score)
}
sort.Strings(scores)
// build score header output
for _, scorerName := range scores {
scoreOutput[0] += fmt.Sprintf("%v|", scorerName)
scorerNames = append(scorerNames, scorerName)
}

View File

@@ -23,7 +23,7 @@ Usage: nomad namespace <subcommand> [options] [args]
Create or update a namespace:
$ nomad namespace apply <name> -description "My new namespace"
$ nomad namespace apply -description "My new namespace" <name>
List namespaces:

View File

@@ -238,7 +238,7 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
require.Contains(out, fmt.Sprintf("Alloc %q draining", a.ID))
}
expected := fmt.Sprintf("All allocations on node %q have stopped.\n", nodeID)
expected := fmt.Sprintf("All allocations on node %q have stopped\n", nodeID)
if !strings.HasSuffix(out, expected) {
t.Fatalf("expected output to end with:\n%s", expected)
}

View File

@@ -333,7 +333,7 @@ func formatDrain(n *api.Node) string {
func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int {
// Format the header output
basic := []string{
fmt.Sprintf("ID|%s", limit(node.ID, c.length)),
fmt.Sprintf("ID|%s", node.ID),
fmt.Sprintf("Name|%s", node.Name),
fmt.Sprintf("Class|%s", node.NodeClass),
fmt.Sprintf("DC|%s", node.Datacenter),

View File

@@ -137,11 +137,8 @@ func TestNodeStatusCommand_Run(t *testing.T) {
if !strings.Contains(out, "mynode") {
t.Fatalf("expect to find mynode, got: %s", out)
}
if strings.Contains(out, nodeID) {
t.Fatalf("expected truncated node id, got: %s", out)
}
if !strings.Contains(out, nodeID[:8]) {
t.Fatalf("expected node id %q, got: %s", nodeID[:8], out)
if !strings.Contains(out, nodeID) {
t.Fatalf("expected node id %q, got: %s", nodeID, out)
}
ui.OutputWriter.Reset()

View File

@@ -13,6 +13,7 @@ import (
"github.com/hashicorp/hcl/hcl/ast"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/jobspec"
"github.com/mitchellh/mapstructure"
"github.com/posener/complete"
)
@@ -261,6 +262,7 @@ func parseQuotaResource(result *api.Resources, list *ast.ObjectList) error {
valid := []string{
"cpu",
"memory",
"network",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return multierror.Prefix(err, "resources ->")
@@ -275,5 +277,20 @@ func parseQuotaResource(result *api.Resources, list *ast.ObjectList) error {
return err
}
// Find the network ObjectList, parse it
nw := listVal.Filter("network")
if len(nw.Items) > 0 {
rl, err := jobspec.ParseNetwork(nw)
if err != nil {
return multierror.Prefix(err, "resources ->")
}
if rl != nil {
if rl.Mode != "" || rl.HasPorts() {
return fmt.Errorf("resources -> network only allows mbits")
}
result.Networks = []*api.NetworkResource{rl}
}
}
return nil
}

View File

@@ -8,8 +8,10 @@ import (
"strings"
"testing"
"github.com/hashicorp/nomad/api"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestQuotaApplyCommand_Implements(t *testing.T) {
@@ -97,3 +99,42 @@ func TestQuotaApplyCommand_Good_JSON(t *testing.T) {
assert.Nil(t, err)
assert.Len(t, quotas, 1)
}
func TestQuotaApplyNetwork(t *testing.T) {
t.Parallel()
mbits := 20
cases := []struct {
hcl string
q *api.QuotaSpec
err string
}{{
hcl: `limit {region = "global", region_limit {network {mbits = 20}}}`,
q: &api.QuotaSpec{
Limits: []*api.QuotaLimit{{
Region: "global",
RegionLimit: &api.Resources{
Networks: []*api.NetworkResource{{
MBits: &mbits,
}},
},
}},
},
err: "",
}, {
hcl: `limit {region = "global", region_limit {network { mbits = 20, device = "eth0"}}}`,
q: nil,
err: "1 error(s) occurred:\n\n* limit -> region_limit -> resources -> network -> invalid key: device",
}}
for _, c := range cases {
t.Run(c.hcl, func(t *testing.T) {
q, err := parseQuotaSpec([]byte(c.hcl))
require.Equal(t, c.q, q)
if c.err != "" {
require.EqualError(t, err, c.err)
}
})
}
}