From 8095b4868a2379c629943e1d87f044c53ed92cff Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Tue, 15 Oct 2019 15:14:25 -0400 Subject: [PATCH] New monitor pkg for shared monitor functionality Adds new package that can be used by client and server RPC endpoints to facilitate monitoring based off of a logger clean up old code small comment about write rm old comment about minsize rename to Monitor Removes connection logic from monitor command Keep connection logic in endpoints, use a channel to send results from monitoring use new multisink logger and interfaces small test for dropped messages update go-hclogger and update sink/intercept logger interfaces --- client/client.go | 8 +- client/config/config.go | 2 +- client/monitor_endpoint.go | 87 ++----- client/monitor_endpoint_test.go | 95 +++++++ client/structs/structs.go | 3 + command/agent/agent.go | 8 +- command/agent/agent_endpoint.go | 245 ++++++------------ command/agent/command.go | 4 +- command/agent/http.go | 2 - command/agent/monitor/monitor.go | 72 +++++ command/agent/monitor/monitor_test.go | 62 +++++ command/agent/testagent.go | 2 +- command/assets/example-short.nomad | 2 +- command/assets/example.nomad | 2 +- dev/docker-clients/client.nomad | 2 +- e2e/consul/input/consul_example.nomad | 2 +- e2e/metrics/input/helloworld.nomad | 2 +- e2e/metrics/input/redis.nomad | 2 +- e2e/metrics/input/simpleweb.nomad | 2 +- e2e/prometheus/prometheus.nomad | 2 +- helper/testlog/testlog.go | 4 +- nomad/client_monitor_endpoint.go | 186 +++++++++++++ nomad/config.go | 2 +- nomad/server.go | 8 +- .../hashicorp/go-hclog/colorize_unix.go | 27 ++ .../hashicorp/go-hclog/colorize_windows.go | 33 +++ vendor/github.com/hashicorp/go-hclog/go.mod | 5 + vendor/github.com/hashicorp/go-hclog/go.sum | 10 + .../hashicorp/go-hclog/interceptlogger.go | 195 ++++++++++++++ .../hashicorp/go-hclog/intlogger.go | 137 +++++----- .../github.com/hashicorp/go-hclog/logger.go | 68 ++++- .../hashicorp/go-hclog/nulllogger.go | 4 + .../github.com/hashicorp/go-hclog/writer.go | 20 +- vendor/vendor.json | 2 +- 34 files changed, 969 insertions(+), 338 deletions(-) create mode 100644 client/monitor_endpoint_test.go create mode 100644 command/agent/monitor/monitor.go create mode 100644 command/agent/monitor/monitor_test.go create mode 100644 nomad/client_monitor_endpoint.go create mode 100644 vendor/github.com/hashicorp/go-hclog/colorize_unix.go create mode 100644 vendor/github.com/hashicorp/go-hclog/colorize_windows.go create mode 100644 vendor/github.com/hashicorp/go-hclog/interceptlogger.go diff --git a/client/client.go b/client/client.go index a1bf34c08..0b68e3836 100644 --- a/client/client.go +++ b/client/client.go @@ -163,8 +163,8 @@ type Client struct { configCopy *config.Config configLock sync.RWMutex - logger hclog.MultiSinkLogger - rpcLogger hclog.MultiSinkLogger + logger hclog.InterceptLogger + rpcLogger hclog.Logger connPool *pool.ConnPool @@ -304,7 +304,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic } // Create the logger - logger := cfg.Logger.ResetNamed("client").(hclog.MultiSinkLogger) + logger := cfg.Logger.ResetNamedIntercept("client") // Create the client c := &Client{ @@ -316,7 +316,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic tlsWrap: tlsWrap, streamingRpcs: structs.NewStreamingRpcRegistry(), logger: logger, - rpcLogger: logger.Named("rpc").(hclog.MultiSinkLogger), + rpcLogger: logger.Named("rpc"), allocs: make(map[string]AllocRunner), allocUpdates: make(chan *structs.Allocation, 64), shutdownCh: make(chan struct{}), diff --git a/client/config/config.go b/client/config/config.go index 0df3cac27..d9cabf8d8 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -81,7 +81,7 @@ type Config struct { LogOutput io.Writer // Logger provides a logger to thhe client - Logger log.MultiSinkLogger + Logger log.InterceptLogger // Region is the clients region Region string diff --git a/client/monitor_endpoint.go b/client/monitor_endpoint.go index 9fd6d9229..6d4aa4183 100644 --- a/client/monitor_endpoint.go +++ b/client/monitor_endpoint.go @@ -4,10 +4,11 @@ import ( "context" "errors" "io" - "sync" + "strings" "time" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/command/agent/monitor" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" "github.com/ugorji/go/codec" @@ -62,39 +63,36 @@ func (m *Monitor) monitor(conn io.ReadWriteCloser) { return } + stopCh := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) + defer close(stopCh) defer cancel() - streamWriter := newStreamWriter(512) - - streamLog := log.New(&log.LoggerOptions{ - Level: logLevel, - Output: streamWriter, + monitor := monitor.New(512, m.c.logger, &log.LoggerOptions{ + Level: logLevel, + JSONFormat: false, }) - m.c.logger.RegisterSink(streamLog) - defer m.c.logger.DeregisterSink(streamLog) go func() { - for { - if _, err := conn.Read(nil); err != nil { - // One end of the pipe was explicitly closed, exit cleanly - cancel() - return - } - select { - case <-ctx.Done(): - return - } + if _, err := conn.Read(nil); err != nil { + close(stopCh) + cancel() + return + } + select { + case <-ctx.Done(): + return } }() + logCh := monitor.Start(stopCh) + var streamErr error OUTER: for { select { - case log := <-streamWriter.logCh: + case log := <-logCh: var resp cstructs.StreamErrWrapper - resp.Payload = log if err := encoder.Encode(resp); err != nil { streamErr = err @@ -107,46 +105,15 @@ OUTER: } if streamErr != nil { - handleStreamResultError(streamErr, helper.Int64ToPtr(500), encoder) + // Nothing to do as conn is closed + if streamErr == io.EOF || strings.Contains(streamErr.Error(), "closed") { + return + } + + // Attempt to send the error + encoder.Encode(&cstructs.StreamErrWrapper{ + Error: cstructs.NewRpcError(streamErr, helper.Int64ToPtr(500)), + }) return } - -} - -type streamWriter struct { - sync.Mutex - logs []string - logCh chan []byte - index int - droppedCount int -} - -func newStreamWriter(buf int) *streamWriter { - return &streamWriter{ - logs: make([]string, buf), - logCh: make(chan []byte, buf), - index: 0, - } -} - -func (d *streamWriter) Write(p []byte) (n int, err error) { - d.Lock() - defer d.Unlock() - - // Strip off newlines at the end if there are any since we store - // individual log lines in the agent. - // n = len(p) - // if p[n-1] == '\n' { - // p = p[:n-1] - // } - - d.logs[d.index] = string(p) - d.index = (d.index + 1) % len(d.logs) - - select { - case d.logCh <- p: - default: - d.droppedCount++ - } - return } diff --git a/client/monitor_endpoint_test.go b/client/monitor_endpoint_test.go new file mode 100644 index 000000000..ca95cf731 --- /dev/null +++ b/client/monitor_endpoint_test.go @@ -0,0 +1,95 @@ +package client + +import ( + "fmt" + "io" + "net" + "strings" + "testing" + "time" + + "github.com/hashicorp/nomad/client/config" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" + "github.com/ugorji/go/codec" +) + +func TestMonitor_Monitor(t *testing.T) { + t.Parallel() + require := require.New(t) + + // start server and client + s := nomad.TestServer(t, nil) + defer s.Shutdown() + testutil.WaitForLeader(t, s.RPC) + + c, cleanup := TestClient(t, func(c *config.Config) { + c.Servers = []string{s.GetConfig().RPCAddr.String()} + }) + defer cleanup() + + req := cstructs.MonitorRequest{ + LogLevel: "debug", + NodeID: c.NodeID(), + } + + handler, err := c.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) + + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(1 * time.Second) + expected := "[DEBUG]" + received := "" + +OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout waiting for logs") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + received += string(msg.Payload) + if strings.Contains(received, expected) { + require.Nil(p2.Close()) + break OUTER + } + } + } +} diff --git a/client/structs/structs.go b/client/structs/structs.go index 6a6206744..350e9de69 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -41,6 +41,9 @@ type MonitorRequest struct { // LogJSON specifies if log format should be unstructured or json LogJSON bool + // NodeID is the node we want to track the logs of + NodeID string + structs.QueryOptions } diff --git a/command/agent/agent.go b/command/agent/agent.go index b02e6a765..26f9e00a8 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -53,8 +53,8 @@ type Agent struct { config *Config configLock sync.Mutex - logger log.MultiSinkLogger - httpLogger log.MultiSinkLogger + logger log.InterceptLogger + httpLogger log.Logger logOutput io.Writer // consulService is Nomad's custom Consul client for managing services @@ -87,7 +87,7 @@ type Agent struct { } // NewAgent is used to create a new agent with the given configuration -func NewAgent(config *Config, logger log.MultiSinkLogger, logOutput io.Writer, inmem *metrics.InmemSink) (*Agent, error) { +func NewAgent(config *Config, logger log.InterceptLogger, logOutput io.Writer, inmem *metrics.InmemSink) (*Agent, error) { a := &Agent{ config: config, logOutput: logOutput, @@ -97,7 +97,7 @@ func NewAgent(config *Config, logger log.MultiSinkLogger, logOutput io.Writer, i // Create the loggers a.logger = logger - a.httpLogger = a.logger.ResetNamed("http").(log.MultiSinkLogger) + a.httpLogger = a.logger.ResetNamed("http") // Global logger should match internal logger as much as possible golog.SetFlags(golog.LstdFlags | golog.Lmicroseconds) diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index e286e1f6f..b99b62f60 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -10,7 +10,6 @@ import ( "net/http" "sort" "strings" - "sync" "github.com/docker/docker/pkg/ioutils" log "github.com/hashicorp/go-hclog" @@ -175,182 +174,102 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( return nil, CodedError(400, fmt.Sprintf("Unknown log level: %s", logLevel)) } - // START - // Determine if we are targeting a server or client nodeID := req.URL.Query().Get("nodeID") - if nodeID != "" { - // Build the request and parse the ACL token - args := cstructs.MonitorRequest{ - LogLevel: logLevel, - LogJSON: false, - } - s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) + // Build the request and parse the ACL token + args := cstructs.MonitorRequest{ + NodeID: nodeID, + LogLevel: logLevel, + LogJSON: false, + } + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) - // Determine the handler to use - useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForNode(nodeID) + // Determine the handler to use + useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForNode(nodeID) - // Make the RPC - var handler structs.StreamingRpcHandler - var handlerErr error - if useLocalClient { - handler, handlerErr = s.agent.Client().StreamingRpcHandler("Agent.Monitor") - } else if useClientRPC { - handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler("Agent.Monitor") - } else if useServerRPC { - handler, handlerErr = s.agent.Server().StreamingRpcHandler("Agent.Monitor") - } else { - handlerErr = CodedError(400, "No local Node and node_id not provided") - } - - if handlerErr != nil { - return nil, CodedError(500, handlerErr.Error()) - } - httpPipe, handlerPipe := net.Pipe() - decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle) - encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle) - - ctx, cancel := context.WithCancel(context.Background()) - go func() { - <-ctx.Done() - httpPipe.Close() - }() - - // Create an ouput that gets flushed on every write - output := ioutils.NewWriteFlusher(resp) - - // Create a channel that decodes the results - errCh := make(chan HTTPCodedError, 2) - - // stream the response - go func() { - defer cancel() - - // Send the request - if err := encoder.Encode(args); err != nil { - errCh <- CodedError(500, err.Error()) - return - } - - for { - select { - case <-ctx.Done(): - errCh <- nil - return - default: - } - - var res cstructs.StreamErrWrapper - if err := decoder.Decode(&res); err != nil { - errCh <- CodedError(500, err.Error()) - return - } - decoder.Reset(httpPipe) - - if err := res.Error; err != nil { - if err.Code != nil { - errCh <- CodedError(int(*err.Code), err.Error()) - return - } - } - - if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil { - errCh <- CodedError(500, err.Error()) - return - } - } - }() - - handler(handlerPipe) - cancel() - codedErr := <-errCh - - if codedErr != nil && - (codedErr == io.EOF || - strings.Contains(codedErr.Error(), "closed") || - strings.Contains(codedErr.Error(), "EOF")) { - codedErr = nil - } - return nil, codedErr + // Make the RPC + var handler structs.StreamingRpcHandler + var handlerErr error + if useLocalClient { + handler, handlerErr = s.agent.Client().StreamingRpcHandler("Agent.Monitor") + } else if useClientRPC { + handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler("Agent.Monitor") + } else if useServerRPC { + handler, handlerErr = s.agent.Server().StreamingRpcHandler("Agent.Monitor") } else { - // Create flusher for streaming - flusher, ok := resp.(http.Flusher) - if !ok { - return nil, CodedError(400, "Streaming not supported") + handlerErr = CodedError(400, "No local Node and node_id not provided") + } + + if handlerErr != nil { + return nil, CodedError(500, handlerErr.Error()) + } + httpPipe, handlerPipe := net.Pipe() + decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle) + encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-ctx.Done() + httpPipe.Close() + }() + + // Create an ouput that gets flushed on every write + output := ioutils.NewWriteFlusher(resp) + + // Create a channel that decodes the results + errCh := make(chan HTTPCodedError, 2) + + // stream the response + go func() { + defer cancel() + + // Send the request + if err := encoder.Encode(args); err != nil { + errCh <- CodedError(500, err.Error()) + return } - streamWriter := newStreamWriter(512) - streamLog := log.New(&log.LoggerOptions{ - Level: log.LevelFromString(logLevel), - Output: streamWriter, - }) - s.agent.logger.RegisterSink(streamLog) - defer s.agent.logger.DeregisterSink(streamLog) - - notify := resp.(http.CloseNotifier).CloseNotify() - - // Send header so client can start streaming body - resp.WriteHeader(http.StatusOK) - // gziphanlder needs a byte to be written and flushed in order - // to tell gzip handler to ignore this response and not compress - resp.Write([]byte("\n")) - flusher.Flush() - for { select { - case <-notify: - s.agent.logger.DeregisterSink(streamLog) - if streamWriter.droppedCount > 0 { - s.agent.logger.Warn(fmt.Sprintf("Dropped %d logs during monitor request", streamWriter.droppedCount)) + case <-ctx.Done(): + errCh <- nil + return + default: + } + + var res cstructs.StreamErrWrapper + if err := decoder.Decode(&res); err != nil { + errCh <- CodedError(500, err.Error()) + return + } + decoder.Reset(httpPipe) + + if err := res.Error; err != nil { + if err.Code != nil { + errCh <- CodedError(int(*err.Code), err.Error()) + return } - return nil, nil - case log := <-streamWriter.logCh: - fmt.Fprintln(resp, log) - flusher.Flush() + } + + if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil { + errCh <- CodedError(500, err.Error()) + return } } + }() + + handler(handlerPipe) + cancel() + codedErr := <-errCh + + if codedErr != nil && + (codedErr == io.EOF || + strings.Contains(codedErr.Error(), "closed") || + strings.Contains(codedErr.Error(), "EOF")) { + codedErr = nil } - - return nil, nil -} - -type streamWriter struct { - sync.Mutex - logs []string - logCh chan string - index int - droppedCount int -} - -func newStreamWriter(buf int) *streamWriter { - return &streamWriter{ - logs: make([]string, buf), - logCh: make(chan string, buf), - index: 0, - } -} - -func (d *streamWriter) Write(p []byte) (n int, err error) { - d.Lock() - defer d.Unlock() - - // Strip off newlines at the end if there are any since we store - // individual log lines in the agent. - n = len(p) - if p[n-1] == '\n' { - p = p[:n-1] - } - - d.logs[d.index] = string(p) - d.index = (d.index + 1) % len(d.logs) - - select { - case d.logCh <- string(p): - default: - d.droppedCount++ - } - return + return nil, codedErr } func (s *HTTPServer) AgentForceLeaveRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { diff --git a/command/agent/command.go b/command/agent/command.go index af20fb0ff..c0567bda8 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -447,7 +447,7 @@ func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, io.Writer) } // setupAgent is used to start the agent and various interfaces -func (c *Command) setupAgent(config *Config, logger hclog.MultiSinkLogger, logOutput io.Writer, inmem *metrics.InmemSink) error { +func (c *Command) setupAgent(config *Config, logger hclog.InterceptLogger, logOutput io.Writer, inmem *metrics.InmemSink) error { c.Ui.Output("Starting Nomad agent...") agent, err := NewAgent(config, logger, logOutput, inmem) if err != nil { @@ -602,7 +602,7 @@ func (c *Command) Run(args []string) int { } // Create logger - logger := hclog.NewMultiSink(&hclog.LoggerOptions{ + logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{ Name: "agent", Level: hclog.LevelFromString(config.LogLevel), Output: logOutput, diff --git a/command/agent/http.go b/command/agent/http.go index 1ee6540c4..756b9ea5a 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -106,8 +106,6 @@ func NewHTTPServer(agent *Agent, config *Config) (*HTTPServer, error) { srv.registerHandlers(config.EnableDebug) // Handle requests with gzip compression - // Use MinSize of 1 to allow a zero byte flush to return - // response header used for streaming gzip, err := gziphandler.GzipHandlerWithOpts(gziphandler.MinSize(0)) if err != nil { return nil, err diff --git a/command/agent/monitor/monitor.go b/command/agent/monitor/monitor.go new file mode 100644 index 000000000..7c56270b0 --- /dev/null +++ b/command/agent/monitor/monitor.go @@ -0,0 +1,72 @@ +package monitor + +import ( + "sync" + + log "github.com/hashicorp/go-hclog" +) + +type Monitor struct { + sync.Mutex + sink log.SinkAdapter + logger log.InterceptLogger + logCh chan []byte + index int + droppedCount int + bufSize int +} + +func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor { + sw := &Monitor{ + logger: logger, + logCh: make(chan []byte, buf), + index: 0, + bufSize: buf, + } + + opts.Output = sw + sink := log.NewSinkAdapter(opts) + sw.sink = sink + + return sw +} + +func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte { + d.logger.RegisterSink(d.sink) + + logCh := make(chan []byte, d.bufSize) + go func() { + for { + select { + case log := <-d.logCh: + logCh <- log + case <-stopCh: + d.logger.DeregisterSink(d.sink) + close(d.logCh) + return + } + } + }() + + return logCh +} + +// Write attemps to send latest log to logCh +// it drops the log if channel is unavailable to receive +func (d *Monitor) Write(p []byte) (n int, err error) { + bytes := make([]byte, len(p)) + copy(bytes, p) + + select { + case d.logCh <- bytes: + default: + d.Lock() + defer d.Unlock() + d.droppedCount++ + if d.droppedCount > 10 { + d.logger.Warn("Monitor dropped %d logs during monitor request", d.droppedCount) + d.droppedCount = 0 + } + } + return +} diff --git a/command/agent/monitor/monitor_test.go b/command/agent/monitor/monitor_test.go new file mode 100644 index 000000000..21be76b47 --- /dev/null +++ b/command/agent/monitor/monitor_test.go @@ -0,0 +1,62 @@ +package monitor + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + log "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" +) + +func TestMonitor_Start(t *testing.T) { + t.Parallel() + + logger := log.NewInterceptLogger(&log.LoggerOptions{ + Level: log.Error, + }) + + m := New(512, logger, &log.LoggerOptions{ + Level: log.Debug, + }) + + closeCh := make(chan struct{}) + defer close(closeCh) + + logCh := m.Start(closeCh) + go func() { + for { + select { + case log := <-logCh: + require.Contains(t, string(log), "[DEBUG] test log") + case <-time.After(1 * time.Second): + t.Fatal("Expected to receive from log channel") + } + } + }() + logger.Debug("test log") +} + +func TestMonitor_DroppedMessages(t *testing.T) { + t.Parallel() + + logger := log.NewInterceptLogger(&log.LoggerOptions{ + Level: log.Warn, + }) + + m := New(5, logger, &log.LoggerOptions{ + Level: log.Debug, + }) + + doneCh := make(chan struct{}) + defer close(doneCh) + + m.Start(doneCh) + + for i := 0; i <= 6; i++ { + logger.Debug("test message") + } + + assert.Equal(t, 1, m.droppedCount) +} diff --git a/command/agent/testagent.go b/command/agent/testagent.go index 8b5bb6afd..1dc2e0d82 100644 --- a/command/agent/testagent.go +++ b/command/agent/testagent.go @@ -216,7 +216,7 @@ func (a *TestAgent) start() (*Agent, error) { return nil, fmt.Errorf("unable to set up in memory metrics needed for agent initialization") } - logger := hclog.NewMultiSink(&hclog.LoggerOptions{ + logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{ Name: "agent", Level: hclog.LevelFromString(a.Config.LogLevel), Output: a.LogOutput, diff --git a/command/assets/example-short.nomad b/command/assets/example-short.nomad index ae3de97d3..d450ff7a1 100644 --- a/command/assets/example-short.nomad +++ b/command/assets/example-short.nomad @@ -19,7 +19,7 @@ job "example" { network { mbits = 10 - port "db" {} + port "db" {} } } } diff --git a/command/assets/example.nomad b/command/assets/example.nomad index 8f8d5fffa..f78dc356b 100644 --- a/command/assets/example.nomad +++ b/command/assets/example.nomad @@ -316,7 +316,7 @@ job "example" { network { mbits = 10 - port "db" {} + port "db" {} } } # The "service" stanza instructs Nomad to register this task as a service diff --git a/dev/docker-clients/client.nomad b/dev/docker-clients/client.nomad index 4689df775..37248bfce 100644 --- a/dev/docker-clients/client.nomad +++ b/dev/docker-clients/client.nomad @@ -23,7 +23,7 @@ job "client" { network { mbits = 10 - port "http"{} + port "http" {} } } diff --git a/e2e/consul/input/consul_example.nomad b/e2e/consul/input/consul_example.nomad index 18b02be7c..24217b842 100644 --- a/e2e/consul/input/consul_example.nomad +++ b/e2e/consul/input/consul_example.nomad @@ -49,7 +49,7 @@ job "consul-example" { network { mbits = 10 - port "db" {} + port "db" {} } } diff --git a/e2e/metrics/input/helloworld.nomad b/e2e/metrics/input/helloworld.nomad index bd8cfb443..f8fed4ed8 100644 --- a/e2e/metrics/input/helloworld.nomad +++ b/e2e/metrics/input/helloworld.nomad @@ -29,7 +29,7 @@ job "hello" { network { mbits = 10 - port "web" {} + port "web" {} } } diff --git a/e2e/metrics/input/redis.nomad b/e2e/metrics/input/redis.nomad index 27d8a5d84..2fedaed87 100644 --- a/e2e/metrics/input/redis.nomad +++ b/e2e/metrics/input/redis.nomad @@ -39,7 +39,7 @@ job "redis" { network { mbits = 10 - port "db" {} + port "db" {} } } diff --git a/e2e/metrics/input/simpleweb.nomad b/e2e/metrics/input/simpleweb.nomad index 352f89bb0..92a20e1a3 100644 --- a/e2e/metrics/input/simpleweb.nomad +++ b/e2e/metrics/input/simpleweb.nomad @@ -28,7 +28,7 @@ job "nginx" { network { mbits = 1 - port "http"{} + port "http" {} } } diff --git a/e2e/prometheus/prometheus.nomad b/e2e/prometheus/prometheus.nomad index c32d45d33..85b645443 100644 --- a/e2e/prometheus/prometheus.nomad +++ b/e2e/prometheus/prometheus.nomad @@ -64,7 +64,7 @@ EOH resources { network { mbits = 10 - port "prometheus_ui"{} + port "prometheus_ui" {} } } diff --git a/helper/testlog/testlog.go b/helper/testlog/testlog.go index 625c2cf9d..7a343b50c 100644 --- a/helper/testlog/testlog.go +++ b/helper/testlog/testlog.go @@ -72,7 +72,7 @@ func Logger(t LogPrinter) *log.Logger { } //HCLogger returns a new test hc-logger. -func HCLogger(t LogPrinter) hclog.MultiSinkLogger { +func HCLogger(t LogPrinter) hclog.InterceptLogger { level := hclog.Trace envLogLevel := os.Getenv("NOMAD_TEST_LOG_LEVEL") if envLogLevel != "" { @@ -83,7 +83,7 @@ func HCLogger(t LogPrinter) hclog.MultiSinkLogger { Output: NewWriter(t), IncludeLocation: true, } - return hclog.NewMultiSink(opts) + return hclog.NewInterceptLogger(opts) } type prefixStdout struct { diff --git a/nomad/client_monitor_endpoint.go b/nomad/client_monitor_endpoint.go new file mode 100644 index 000000000..bbe93ae16 --- /dev/null +++ b/nomad/client_monitor_endpoint.go @@ -0,0 +1,186 @@ +package nomad + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "strings" + + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/acl" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/command/agent/monitor" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/structs" + + "github.com/ugorji/go/codec" +) + +type Monitor struct { + srv *Server +} + +func (m *Monitor) register() { + m.srv.streamingRpcs.Register("Agent.Monitor", m.monitor) +} + +func (m *Monitor) monitor(conn io.ReadWriteCloser) { + defer conn.Close() + + // Decode args + var args cstructs.MonitorRequest + decoder := codec.NewDecoder(conn, structs.MsgpackHandle) + encoder := codec.NewEncoder(conn, structs.MsgpackHandle) + + if err := decoder.Decode(&args); err != nil { + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } + + // Check node read permissions + if aclObj, err := m.srv.ResolveToken(args.AuthToken); err != nil { + handleStreamResultError(err, nil, encoder) + return + } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadFS) { + handleStreamResultError(structs.ErrPermissionDenied, nil, encoder) + return + } + + var logLevel log.Level + if args.LogLevel == "" { + logLevel = log.LevelFromString("INFO") + } else { + logLevel = log.LevelFromString(args.LogLevel) + } + + if logLevel == log.NoLevel { + handleStreamResultError(errors.New("Unknown log level"), helper.Int64ToPtr(400), encoder) + return + } + + // Targeting a client so forward the request + if args.NodeID != "" { + nodeID := args.NodeID + + snap, err := m.srv.State().Snapshot() + if err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + node, err := snap.NodeByID(nil, nodeID) + if err != nil { + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } + + if node == nil { + err := fmt.Errorf("Unknown node %q", nodeID) + handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + + if err := nodeSupportsRpc(node); err != nil { + handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + + // Get the Connection to the client either by fowarding to another server + // or creating direct stream + var clientConn net.Conn + state, ok := m.srv.getNodeConn(nodeID) + if !ok { + // Determine the server that has a connection to the node + srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region()) + if err != nil { + var code *int64 + if structs.IsErrNoNodeConn(err) { + code = helper.Int64ToPtr(404) + } + handleStreamResultError(err, code, encoder) + return + } + conn, err := m.srv.streamingRpc(srv, "Agent.Monitor") + if err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + clientConn = conn + } else { + stream, err := NodeStreamingRpc(state.Session, "Agent.Monitor") + if err != nil { + handleStreamResultError(err, nil, encoder) + return + } + clientConn = stream + } + defer clientConn.Close() + + // Send the Request + outEncoder := codec.NewEncoder(clientConn, structs.MsgpackHandle) + if err := outEncoder.Encode(args); err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + structs.Bridge(conn, clientConn) + return + } + + // NodeID was empty, so monitor this current server + stopCh := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + monitor := monitor.New(512, m.srv.logger, &log.LoggerOptions{ + Level: logLevel, + JSONFormat: false, + }) + + go func() { + if _, err := conn.Read(nil); err != nil { + close(stopCh) + cancel() + return + } + select { + case <-ctx.Done(): + return + } + }() + + logCh := monitor.Start(stopCh) + + var streamErr error +OUTER: + for { + select { + case log := <-logCh: + var resp cstructs.StreamErrWrapper + resp.Payload = log + if err := encoder.Encode(resp); err != nil { + streamErr = err + break OUTER + } + encoder.Reset(conn) + case <-ctx.Done(): + break OUTER + } + } + + if streamErr != nil { + // Nothing to do as conn is closed + if streamErr == io.EOF || strings.Contains(streamErr.Error(), "closed") { + return + } + + // Attempt to send the error + encoder.Encode(&cstructs.StreamErrWrapper{ + Error: cstructs.NewRpcError(streamErr, helper.Int64ToPtr(500)), + }) + return + } +} diff --git a/nomad/config.go b/nomad/config.go index cabf981ea..053408eca 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -76,7 +76,7 @@ type Config struct { LogOutput io.Writer // Logger is the logger used by the server. - Logger log.Logger + Logger log.InterceptLogger // ProtocolVersion is the protocol version to speak. This must be between // ProtocolVersionMin and ProtocolVersionMax. diff --git a/nomad/server.go b/nomad/server.go index 158d26a55..b6c1ef844 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -91,7 +91,7 @@ const ( type Server struct { config *Config - logger log.Logger + logger log.InterceptLogger // Connection pool to other Nomad servers connPool *pool.ConnPool @@ -252,6 +252,7 @@ type endpoints struct { // Client endpoints ClientStats *ClientStats FileSystem *FileSystem + Monitor *Monitor ClientAllocations *ClientAllocations } @@ -290,7 +291,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error) } // Create the logger - logger := config.Logger.ResetNamed("nomad") + logger := config.Logger.ResetNamedIntercept("nomad") // Create the server s := &Server{ @@ -1044,6 +1045,9 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { // Streaming endpoints s.staticEndpoints.FileSystem = &FileSystem{srv: s, logger: s.logger.Named("client_fs")} s.staticEndpoints.FileSystem.register() + + s.staticEndpoints.Monitor = &Monitor{srv: s} + s.staticEndpoints.Monitor.register() } // Register the static handlers diff --git a/vendor/github.com/hashicorp/go-hclog/colorize_unix.go b/vendor/github.com/hashicorp/go-hclog/colorize_unix.go new file mode 100644 index 000000000..44aa9bf2c --- /dev/null +++ b/vendor/github.com/hashicorp/go-hclog/colorize_unix.go @@ -0,0 +1,27 @@ +// +build !windows + +package hclog + +import ( + "github.com/mattn/go-isatty" +) + +// setColorization will mutate the values of this logger +// to approperately configure colorization options. It provides +// a wrapper to the output stream on Windows systems. +func (l *intLogger) setColorization(opts *LoggerOptions) { + switch opts.Color { + case ColorOff: + fallthrough + case ForceColor: + return + case AutoColor: + fi := l.checkWriterIsFile() + isUnixTerm := isatty.IsTerminal(fi.Fd()) + isCygwinTerm := isatty.IsCygwinTerminal(fi.Fd()) + isTerm := isUnixTerm || isCygwinTerm + if !isTerm { + l.writer.color = ColorOff + } + } +} diff --git a/vendor/github.com/hashicorp/go-hclog/colorize_windows.go b/vendor/github.com/hashicorp/go-hclog/colorize_windows.go new file mode 100644 index 000000000..23486b6d7 --- /dev/null +++ b/vendor/github.com/hashicorp/go-hclog/colorize_windows.go @@ -0,0 +1,33 @@ +// +build windows + +package hclog + +import ( + "os" + + colorable "github.com/mattn/go-colorable" + "github.com/mattn/go-isatty" +) + +// setColorization will mutate the values of this logger +// to approperately configure colorization options. It provides +// a wrapper to the output stream on Windows systems. +func (l *intLogger) setColorization(opts *LoggerOptions) { + switch opts.Color { + case ColorOff: + return + case ForceColor: + fi := l.checkWriterIsFile() + l.writer.w = colorable.NewColorable(fi) + case AutoColor: + fi := l.checkWriterIsFile() + isUnixTerm := isatty.IsTerminal(os.Stdout.Fd()) + isCygwinTerm := isatty.IsCygwinTerminal(os.Stdout.Fd()) + isTerm := isUnixTerm || isCygwinTerm + if !isTerm { + l.writer.color = ColorOff + return + } + l.writer.w = colorable.NewColorable(fi) + } +} diff --git a/vendor/github.com/hashicorp/go-hclog/go.mod b/vendor/github.com/hashicorp/go-hclog/go.mod index 0d079a654..b6698c083 100644 --- a/vendor/github.com/hashicorp/go-hclog/go.mod +++ b/vendor/github.com/hashicorp/go-hclog/go.mod @@ -2,6 +2,11 @@ module github.com/hashicorp/go-hclog require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/fatih/color v1.7.0 + github.com/mattn/go-colorable v0.1.4 + github.com/mattn/go-isatty v0.0.10 github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/testify v1.2.2 ) + +go 1.13 diff --git a/vendor/github.com/hashicorp/go-hclog/go.sum b/vendor/github.com/hashicorp/go-hclog/go.sum index e03ee77d9..9cee2196c 100644 --- a/vendor/github.com/hashicorp/go-hclog/go.sum +++ b/vendor/github.com/hashicorp/go-hclog/go.sum @@ -1,6 +1,16 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA= +github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.10 h1:qxFzApOv4WsAL965uUPIsXzAKCZxN2p9UqdhFS4ZW10= +github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/vendor/github.com/hashicorp/go-hclog/interceptlogger.go b/vendor/github.com/hashicorp/go-hclog/interceptlogger.go new file mode 100644 index 000000000..a65f7be19 --- /dev/null +++ b/vendor/github.com/hashicorp/go-hclog/interceptlogger.go @@ -0,0 +1,195 @@ +package hclog + +import ( + "sync" + "sync/atomic" +) + +var _ Logger = &interceptLogger{} + +type interceptLogger struct { + Logger + + sync.Mutex + sinkCount *int32 + Sinks map[SinkAdapter]struct{} +} + +func NewInterceptLogger(opts *LoggerOptions) InterceptLogger { + intercept := &interceptLogger{ + Logger: New(opts), + sinkCount: new(int32), + Sinks: make(map[SinkAdapter]struct{}), + } + + atomic.StoreInt32(intercept.sinkCount, 0) + + return intercept +} + +// Emit the message and args at TRACE level to log and sinks +func (i *interceptLogger) Trace(msg string, args ...interface{}) { + i.Logger.Trace(msg, args...) + if atomic.LoadInt32(i.sinkCount) == 0 { + return + } + + i.Lock() + defer i.Unlock() + for s := range i.Sinks { + s.Accept(i.Name(), Trace, msg, i.retrieveImplied(args...)...) + } +} + +// Emit the message and args at DEBUG level to log and sinks +func (i *interceptLogger) Debug(msg string, args ...interface{}) { + i.Logger.Debug(msg, args...) + if atomic.LoadInt32(i.sinkCount) == 0 { + return + } + + i.Lock() + defer i.Unlock() + for s := range i.Sinks { + s.Accept(i.Name(), Debug, msg, i.retrieveImplied(args...)...) + } +} + +// Emit the message and args at INFO level to log and sinks +func (i *interceptLogger) Info(msg string, args ...interface{}) { + i.Logger.Info(msg, args...) + if atomic.LoadInt32(i.sinkCount) == 0 { + return + } + + i.Lock() + defer i.Unlock() + for s := range i.Sinks { + s.Accept(i.Name(), Info, msg, i.retrieveImplied(args...)...) + } +} + +// Emit the message and args at WARN level to log and sinks +func (i *interceptLogger) Warn(msg string, args ...interface{}) { + i.Logger.Warn(msg, args...) + if atomic.LoadInt32(i.sinkCount) == 0 { + return + } + + i.Lock() + defer i.Unlock() + for s := range i.Sinks { + s.Accept(i.Name(), Warn, msg, i.retrieveImplied(args...)...) + } +} + +// Emit the message and args at ERROR level to log and sinks +func (i *interceptLogger) Error(msg string, args ...interface{}) { + i.Logger.Error(msg, args...) + if atomic.LoadInt32(i.sinkCount) == 0 { + return + } + + i.Lock() + defer i.Unlock() + for s := range i.Sinks { + s.Accept(i.Name(), Error, msg, i.retrieveImplied(args...)...) + } +} + +func (i *interceptLogger) retrieveImplied(args ...interface{}) []interface{} { + top := i.Logger.ImpliedArgs() + + cp := make([]interface{}, len(top)+len(args)) + copy(cp, top) + copy(cp[len(top):], args) + + return cp +} + +// Create a new sub-Logger that a name decending from the current name. +// This is used to create a subsystem specific Logger. +// Registered sinks will subscribe to these messages as well. +func (i *interceptLogger) Named(name string) Logger { + var sub interceptLogger + + sub = *i + + sub.Logger = i.Logger.Named(name) + + return &sub +} + +// Create a new sub-Logger with an explicit name. This ignores the current +// name. This is used to create a standalone logger that doesn't fall +// within the normal hierarchy. Registered sinks will subscribe +// to these messages as well. +func (i *interceptLogger) ResetNamed(name string) Logger { + var sub interceptLogger + + sub = *i + + sub.Logger = i.Logger.ResetNamed(name) + + return &sub +} + +// Create a new sub-Logger that a name decending from the current name. +// This is used to create a subsystem specific Logger. +// Registered sinks will subscribe to these messages as well. +func (i *interceptLogger) NamedIntercept(name string) InterceptLogger { + var sub interceptLogger + + sub = *i + + sub.Logger = i.Logger.Named(name) + + return &sub +} + +// Create a new sub-Logger with an explicit name. This ignores the current +// name. This is used to create a standalone logger that doesn't fall +// within the normal hierarchy. Registered sinks will subscribe +// to these messages as well. +func (i *interceptLogger) ResetNamedIntercept(name string) InterceptLogger { + var sub interceptLogger + + sub = *i + + sub.Logger = i.Logger.ResetNamed(name) + + return &sub +} + +// Return a sub-Logger for which every emitted log message will contain +// the given key/value pairs. This is used to create a context specific +// Logger. +func (i *interceptLogger) With(args ...interface{}) Logger { + var sub interceptLogger + + sub = *i + + sub.Logger = i.Logger.With(args...) + + return &sub +} + +// RegisterSink attaches a SinkAdapter to interceptLoggers sinks. +func (i *interceptLogger) RegisterSink(sink SinkAdapter) { + i.Lock() + defer i.Unlock() + + i.Sinks[sink] = struct{}{} + + atomic.AddInt32(i.sinkCount, 1) +} + +// DeregisterSink removes a SinkAdapter from interceptLoggers sinks. +func (i *interceptLogger) DeregisterSink(sink SinkAdapter) { + i.Lock() + defer i.Unlock() + + delete(i.Sinks, sink) + + atomic.AddInt32(i.sinkCount, -1) +} diff --git a/vendor/github.com/hashicorp/go-hclog/intlogger.go b/vendor/github.com/hashicorp/go-hclog/intlogger.go index a94c1424e..91b038738 100644 --- a/vendor/github.com/hashicorp/go-hclog/intlogger.go +++ b/vendor/github.com/hashicorp/go-hclog/intlogger.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "log" + "os" "reflect" "runtime" "sort" @@ -15,6 +16,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/fatih/color" ) // TimeFormat to use for logging. This is a version of RFC3339 that contains @@ -32,11 +35,18 @@ var ( Warn: "[WARN] ", Error: "[ERROR]", } + + _levelToColor = map[Level]*color.Color{ + Debug: color.New(color.FgHiWhite), + Trace: color.New(color.FgHiGreen), + Info: color.New(color.FgHiBlue), + Warn: color.New(color.FgHiYellow), + Error: color.New(color.FgHiRed), + } ) // Make sure that intLogger is a Logger var _ Logger = &intLogger{} -var _ MultiSinkLogger = &intLogger{} // intLogger is an internal logger implementation. Internal in that it is // defined entirely by this package. @@ -52,13 +62,21 @@ type intLogger struct { writer *writer level *int32 - sinks map[Logger]struct{} - implied []interface{} } // New returns a configured logger. func New(opts *LoggerOptions) Logger { + return newLogger(opts) +} + +// NewSinkAdapter returns a SinkAdapter with configured settings +// defined by LoggerOptions +func NewSinkAdapter(opts *LoggerOptions) SinkAdapter { + return newLogger(opts) +} + +func newLogger(opts *LoggerOptions) *intLogger { if opts == nil { opts = &LoggerOptions{} } @@ -84,11 +102,12 @@ func New(opts *LoggerOptions) Logger { name: opts.Name, timeFormat: TimeFormat, mutex: mutex, - writer: newWriter(output), + writer: newWriter(output, opts.Color), level: new(int32), - sinks: make(map[Logger]struct{}), } + l.setColorization(opts) + if opts.TimeFormat != "" { l.timeFormat = opts.TimeFormat } @@ -98,31 +117,10 @@ func New(opts *LoggerOptions) Logger { return l } -func NewMultiSink(opts *LoggerOptions) MultiSinkLogger { - return New(opts).(MultiSinkLogger) -} - -func (l *intLogger) RegisterSink(logger Logger) { - l.mutex.Lock() - defer l.mutex.Unlock() - - if _, ok := l.sinks[logger]; ok { - return - } - - l.sinks[logger] = struct{}{} -} - -func (l *intLogger) DeregisterSink(logger Logger) { - l.mutex.Lock() - defer l.mutex.Unlock() - delete(l.sinks, logger) -} - // Log a message and a set of key/value pairs if the given level is at // or more severe that the threshold configured in the Logger. -func (l *intLogger) Log(level Level, msg string, args ...interface{}) { - if level < Level(atomic.LoadInt32(l.level)) && len(l.sinks) == 0 { +func (l *intLogger) Log(name string, level Level, msg string, args ...interface{}) { + if level < Level(atomic.LoadInt32(l.level)) { return } @@ -131,36 +129,10 @@ func (l *intLogger) Log(level Level, msg string, args ...interface{}) { l.mutex.Lock() defer l.mutex.Unlock() - for lh := range l.sinks { - lh, ok := lh.(*intLogger) - if !ok { - continue - } - - if level < Level(atomic.LoadInt32(lh.level)) { - continue - } - - // Set the sink name to the name of the calling log - lh.name = l.name - - if lh.json { - lh.logJSON(t, level, msg, args...) - } else { - lh.log(t, level, msg, args...) - } - - lh.writer.Flush(level) - } - - if level < Level(atomic.LoadInt32(l.level)) { - return - } - if l.json { - l.logJSON(t, level, msg, args...) + l.logJSON(t, name, level, msg, args...) } else { - l.log(t, level, msg, args...) + l.log(t, name, level, msg, args...) } l.writer.Flush(level) @@ -196,7 +168,7 @@ func trimCallerPath(path string) string { } // Non-JSON logging format function -func (l *intLogger) log(t time.Time, level Level, msg string, args ...interface{}) { +func (l *intLogger) log(t time.Time, name string, level Level, msg string, args ...interface{}) { l.writer.WriteString(t.Format(l.timeFormat)) l.writer.WriteByte(' ') @@ -219,8 +191,8 @@ func (l *intLogger) log(t time.Time, level Level, msg string, args ...interface{ l.writer.WriteByte(' ') - if l.name != "" { - l.writer.WriteString(l.name) + if name != "" { + l.writer.WriteString(name) l.writer.WriteString(": ") } @@ -349,8 +321,8 @@ func (l *intLogger) renderSlice(v reflect.Value) string { } // JSON logging function -func (l *intLogger) logJSON(t time.Time, level Level, msg string, args ...interface{}) { - vals := l.jsonMapEntry(t, level, msg) +func (l *intLogger) logJSON(t time.Time, name string, level Level, msg string, args ...interface{}) { + vals := l.jsonMapEntry(t, name, level, msg) args = append(l.implied, args...) if args != nil && len(args) > 0 { @@ -392,7 +364,7 @@ func (l *intLogger) logJSON(t time.Time, level Level, msg string, args ...interf err := json.NewEncoder(l.writer).Encode(vals) if err != nil { if _, ok := err.(*json.UnsupportedTypeError); ok { - plainVal := l.jsonMapEntry(t, level, msg) + plainVal := l.jsonMapEntry(t, name, level, msg) plainVal["@warn"] = errJsonUnsupportedTypeMsg json.NewEncoder(l.writer).Encode(plainVal) @@ -400,7 +372,7 @@ func (l *intLogger) logJSON(t time.Time, level Level, msg string, args ...interf } } -func (l intLogger) jsonMapEntry(t time.Time, level Level, msg string) map[string]interface{} { +func (l intLogger) jsonMapEntry(t time.Time, name string, level Level, msg string) map[string]interface{} { vals := map[string]interface{}{ "@message": msg, "@timestamp": t.Format("2006-01-02T15:04:05.000000Z07:00"), @@ -424,8 +396,8 @@ func (l intLogger) jsonMapEntry(t time.Time, level Level, msg string) map[string vals["@level"] = levelStr - if l.name != "" { - vals["@module"] = l.name + if name != "" { + vals["@module"] = name } if l.caller { @@ -438,27 +410,27 @@ func (l intLogger) jsonMapEntry(t time.Time, level Level, msg string) map[string // Emit the message and args at DEBUG level func (l *intLogger) Debug(msg string, args ...interface{}) { - l.Log(Debug, msg, args...) + l.Log(l.Name(), Debug, msg, args...) } // Emit the message and args at TRACE level func (l *intLogger) Trace(msg string, args ...interface{}) { - l.Log(Trace, msg, args...) + l.Log(l.Name(), Trace, msg, args...) } // Emit the message and args at INFO level func (l *intLogger) Info(msg string, args ...interface{}) { - l.Log(Info, msg, args...) + l.Log(l.Name(), Info, msg, args...) } // Emit the message and args at WARN level func (l *intLogger) Warn(msg string, args ...interface{}) { - l.Log(Warn, msg, args...) + l.Log(l.Name(), Warn, msg, args...) } // Emit the message and args at ERROR level func (l *intLogger) Error(msg string, args ...interface{}) { - l.Log(Error, msg, args...) + l.Log(l.Name(), Error, msg, args...) } // Indicate that the logger would emit TRACE level logs @@ -576,3 +548,28 @@ func (l *intLogger) StandardWriter(opts *StandardLoggerOptions) io.Writer { forceLevel: opts.ForceLevel, } } + +// checks if the underlying io.Writer is a file, and +// panics if not. For use by colorization. +func (l *intLogger) checkWriterIsFile() *os.File { + fi, ok := l.writer.w.(*os.File) + if !ok { + panic("Cannot enable coloring of non-file Writers") + } + return fi +} + +// Accept implements the SinkAdapter interface +func (i *intLogger) Accept(name string, level Level, msg string, args ...interface{}) { + i.Log(name, level, msg, args...) +} + +// ImpliedArgs returns the loggers implied args +func (i *intLogger) ImpliedArgs() []interface{} { + return i.implied +} + +// Name returns the loggers name +func (i *intLogger) Name() string { + return i.name +} diff --git a/vendor/github.com/hashicorp/go-hclog/logger.go b/vendor/github.com/hashicorp/go-hclog/logger.go index a7774e923..81045cbfe 100644 --- a/vendor/github.com/hashicorp/go-hclog/logger.go +++ b/vendor/github.com/hashicorp/go-hclog/logger.go @@ -53,6 +53,21 @@ func Fmt(str string, args ...interface{}) Format { return append(Format{str}, args...) } +// ColorOption expresses how the output should be colored, if at all. +type ColorOption uint8 + +const ( + // ColorOff is the default coloration, and does not + // inject color codes into the io.Writer. + ColorOff ColorOption = iota + // AutoColor checks if the io.Writer is a tty, + // and if so enables coloring. + AutoColor + // ForceColor will enable coloring, regardless of whether + // the io.Writer is a tty or not. + ForceColor +) + // LevelFromString returns a Level type for the named log level, or "NoLevel" if // the level string is invalid. This facilitates setting the log level via // config or environment variable by name in a predictable way. @@ -111,9 +126,14 @@ type Logger interface { // Indicate if ERROR logs would be emitted. This and the other Is* guards IsError() bool + ImpliedArgs() []interface{} + // Creates a sublogger that will always have the given key/value pairs With(args ...interface{}) Logger + // Returns the Name of the logger + Name() string + // Create a logger that will prepend the name string on the front of all messages. // If the logger already has a name, the new value will be appended to the current // name. That way, a major subsystem can use this to decorate all it's own logs @@ -136,17 +156,6 @@ type Logger interface { StandardWriter(opts *StandardLoggerOptions) io.Writer } -// MultiSinkLogger describes the interface that allows a logger to -// write to multiple sub loggers which may be configured to have different -// level and writer settings. This is useful for monitor commands to allow -// for streaming of logs at a lower level than what is set for the parent logger -type MultiSinkLogger interface { - Logger - - RegisterSink(logger Logger) - DeregisterSink(logger Logger) -} - // StandardLoggerOptions can be used to configure a new standard logger. type StandardLoggerOptions struct { // Indicate that some minimal parsing should be done on strings to try @@ -184,4 +193,41 @@ type LoggerOptions struct { // The time format to use instead of the default TimeFormat string + + // Color the output. On Windows, colored logs are only avaiable for io.Writers that + // are concretely instances of *os.File. + Color ColorOption +} + +// InterceptLogger describes the interface for using a logger +// that can register different output sinks. +// This is useful for sending lower level log messages +// to a different output while keeping the root logger +// at a higher one. +type InterceptLogger interface { + // Logger is the root logger for an InterceptLogger + Logger + + // RegisterSink adds a SinkAdapter to the InterceptLogger + RegisterSink(sink SinkAdapter) + + // DeregisterSink removes a SinkAdapter from the InterceptLogger + DeregisterSink(sink SinkAdapter) + + // Create a interceptlogger that will prepend the name string on the front of all messages. + // If the logger already has a name, the new value will be appended to the current + // name. That way, a major subsystem can use this to decorate all it's own logs + // without losing context. + NamedIntercept(name string) InterceptLogger + + // Create a interceptlogger that will prepend the name string on the front of all messages. + // This sets the name of the logger to the value directly, unlike Named which honor + // the current name as well. + ResetNamedIntercept(name string) InterceptLogger +} + +// SinkAdapter describes the interface that must be implemented +// in order to Register a new sink to an InterceptLogger +type SinkAdapter interface { + Accept(name string, level Level, msg string, args ...interface{}) } diff --git a/vendor/github.com/hashicorp/go-hclog/nulllogger.go b/vendor/github.com/hashicorp/go-hclog/nulllogger.go index 7ad6b351e..4abdd5583 100644 --- a/vendor/github.com/hashicorp/go-hclog/nulllogger.go +++ b/vendor/github.com/hashicorp/go-hclog/nulllogger.go @@ -35,8 +35,12 @@ func (l *nullLogger) IsWarn() bool { return false } func (l *nullLogger) IsError() bool { return false } +func (l *nullLogger) ImpliedArgs() []interface{} { return []interface{}{} } + func (l *nullLogger) With(args ...interface{}) Logger { return l } +func (l *nullLogger) Name() string { return "" } + func (l *nullLogger) Named(name string) Logger { return l } func (l *nullLogger) ResetNamed(name string) Logger { return l } diff --git a/vendor/github.com/hashicorp/go-hclog/writer.go b/vendor/github.com/hashicorp/go-hclog/writer.go index 7e8ec729d..421a1f06c 100644 --- a/vendor/github.com/hashicorp/go-hclog/writer.go +++ b/vendor/github.com/hashicorp/go-hclog/writer.go @@ -6,19 +6,27 @@ import ( ) type writer struct { - b bytes.Buffer - w io.Writer + b bytes.Buffer + w io.Writer + color ColorOption } -func newWriter(w io.Writer) *writer { - return &writer{w: w} +func newWriter(w io.Writer, color ColorOption) *writer { + return &writer{w: w, color: color} } func (w *writer) Flush(level Level) (err error) { + var unwritten = w.b.Bytes() + + if w.color != ColorOff { + color := _levelToColor[level] + unwritten = []byte(color.Sprintf("%s", unwritten)) + } + if lw, ok := w.w.(LevelWriter); ok { - _, err = lw.LevelWrite(level, w.b.Bytes()) + _, err = lw.LevelWrite(level, unwritten) } else { - _, err = w.w.Write(w.b.Bytes()) + _, err = w.w.Write(unwritten) } w.b.Reset() return err diff --git a/vendor/vendor.json b/vendor/vendor.json index 95ef75246..9a41d61e7 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -209,7 +209,7 @@ {"path":"github.com/hashicorp/go-envparse","checksumSHA1":"FKmqR4DC3nCXtnT9pe02z5CLNWo=","revision":"310ca1881b22af3522e3a8638c0b426629886196","revisionTime":"2018-01-19T21:58:41Z"}, {"path":"github.com/hashicorp/go-getter","checksumSHA1":"d4brua17AGQqMNtngK4xKOUwboY=","revision":"f5101da0117392c6e7960c934f05a2fd689a5b5f","revisionTime":"2019-08-22T19:45:07Z"}, {"path":"github.com/hashicorp/go-getter/helper/url","checksumSHA1":"9J+kDr29yDrwsdu2ULzewmqGjpA=","revision":"b345bfcec894fb7ff3fdf9b21baf2f56ea423d98","revisionTime":"2018-04-10T17:49:45Z"}, - {"path":"github.com/hashicorp/go-hclog","checksumSHA1":"uTAjKuGQr4/gpcgdEtTO+JhD/NY=","revision":"a4c7052ea48d1c284eca6ba6281910f0fd3b7b30","revisionTime":"2019-10-10T18:01:30Z","version":"f-multi-sink-logger","versionExact":"f-multi-sink-logger"}, + {"path":"github.com/hashicorp/go-hclog","checksumSHA1":"+gkR2S9qSVVy3gor4p0Z86RMgZI=","revision":"2bec91ee9db63c8546a105f89da3eb63037c5e6c","revisionTime":"2019-10-18T20:14:58Z","version":"f-multi-sink","versionExact":"f-multi-sink"}, {"path":"github.com/hashicorp/go-immutable-radix","checksumSHA1":"Cas2nprG6pWzf05A2F/OlnjUu2Y=","revision":"8aac2701530899b64bdea735a1de8da899815220","revisionTime":"2017-07-25T22:12:15Z"}, {"path":"github.com/hashicorp/go-memdb","checksumSHA1":"FMAvwDar2bQyYAW4XMFhAt0J5xA=","revision":"20ff6434c1cc49b80963d45bf5c6aa89c78d8d57","revisionTime":"2017-08-31T20:15:40Z"}, {"path":"github.com/hashicorp/go-msgpack/codec","checksumSHA1":"CKGYNUDKre3Z2g4hHNVfp5nTcfA=","revision":"23165f7bc3c2dda1891434ebb9da1511a7bafc1c","revisionTime":"2019-09-27T12:33:13Z","version":"upstream-08f7b40","versionExact":"upstream-08f7b40"},