New monitor pkg for shared monitor functionality

Adds new package that can be used by client and server RPC endpoints to
facilitate monitoring based off of a logger

clean up old code

small comment about write

rm old comment about minsize

rename to Monitor

Removes connection logic from monitor command

Keep connection logic in endpoints, use a channel to send results from
monitoring

use new multisink logger and interfaces

small test for dropped messages

update go-hclogger and update sink/intercept logger interfaces
This commit is contained in:
Drew Bailey
2019-10-15 15:14:25 -04:00
parent 890b8a43fb
commit 8095b4868a
34 changed files with 969 additions and 338 deletions

View File

@@ -163,8 +163,8 @@ type Client struct {
configCopy *config.Config
configLock sync.RWMutex
logger hclog.MultiSinkLogger
rpcLogger hclog.MultiSinkLogger
logger hclog.InterceptLogger
rpcLogger hclog.Logger
connPool *pool.ConnPool
@@ -304,7 +304,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
}
// Create the logger
logger := cfg.Logger.ResetNamed("client").(hclog.MultiSinkLogger)
logger := cfg.Logger.ResetNamedIntercept("client")
// Create the client
c := &Client{
@@ -316,7 +316,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
tlsWrap: tlsWrap,
streamingRpcs: structs.NewStreamingRpcRegistry(),
logger: logger,
rpcLogger: logger.Named("rpc").(hclog.MultiSinkLogger),
rpcLogger: logger.Named("rpc"),
allocs: make(map[string]AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),

View File

@@ -81,7 +81,7 @@ type Config struct {
LogOutput io.Writer
// Logger provides a logger to thhe client
Logger log.MultiSinkLogger
Logger log.InterceptLogger
// Region is the clients region
Region string

View File

@@ -4,10 +4,11 @@ import (
"context"
"errors"
"io"
"sync"
"strings"
"time"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
@@ -62,39 +63,36 @@ func (m *Monitor) monitor(conn io.ReadWriteCloser) {
return
}
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer close(stopCh)
defer cancel()
streamWriter := newStreamWriter(512)
streamLog := log.New(&log.LoggerOptions{
Level: logLevel,
Output: streamWriter,
monitor := monitor.New(512, m.c.logger, &log.LoggerOptions{
Level: logLevel,
JSONFormat: false,
})
m.c.logger.RegisterSink(streamLog)
defer m.c.logger.DeregisterSink(streamLog)
go func() {
for {
if _, err := conn.Read(nil); err != nil {
// One end of the pipe was explicitly closed, exit cleanly
cancel()
return
}
select {
case <-ctx.Done():
return
}
if _, err := conn.Read(nil); err != nil {
close(stopCh)
cancel()
return
}
select {
case <-ctx.Done():
return
}
}()
logCh := monitor.Start(stopCh)
var streamErr error
OUTER:
for {
select {
case log := <-streamWriter.logCh:
case log := <-logCh:
var resp cstructs.StreamErrWrapper
resp.Payload = log
if err := encoder.Encode(resp); err != nil {
streamErr = err
@@ -107,46 +105,15 @@ OUTER:
}
if streamErr != nil {
handleStreamResultError(streamErr, helper.Int64ToPtr(500), encoder)
// Nothing to do as conn is closed
if streamErr == io.EOF || strings.Contains(streamErr.Error(), "closed") {
return
}
// Attempt to send the error
encoder.Encode(&cstructs.StreamErrWrapper{
Error: cstructs.NewRpcError(streamErr, helper.Int64ToPtr(500)),
})
return
}
}
type streamWriter struct {
sync.Mutex
logs []string
logCh chan []byte
index int
droppedCount int
}
func newStreamWriter(buf int) *streamWriter {
return &streamWriter{
logs: make([]string, buf),
logCh: make(chan []byte, buf),
index: 0,
}
}
func (d *streamWriter) Write(p []byte) (n int, err error) {
d.Lock()
defer d.Unlock()
// Strip off newlines at the end if there are any since we store
// individual log lines in the agent.
// n = len(p)
// if p[n-1] == '\n' {
// p = p[:n-1]
// }
d.logs[d.index] = string(p)
d.index = (d.index + 1) % len(d.logs)
select {
case d.logCh <- p:
default:
d.droppedCount++
}
return
}

View File

@@ -0,0 +1,95 @@
package client
import (
"fmt"
"io"
"net"
"strings"
"testing"
"time"
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
"github.com/ugorji/go/codec"
)
func TestMonitor_Monitor(t *testing.T) {
t.Parallel()
require := require.New(t)
// start server and client
s := nomad.TestServer(t, nil)
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
c, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer cleanup()
req := cstructs.MonitorRequest{
LogLevel: "debug",
NodeID: c.NodeID(),
}
handler, err := c.StreamingRpcHandler("Agent.Monitor")
require.Nil(err)
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(1 * time.Second)
expected := "[DEBUG]"
received := ""
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for logs")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
received += string(msg.Payload)
if strings.Contains(received, expected) {
require.Nil(p2.Close())
break OUTER
}
}
}
}

View File

@@ -41,6 +41,9 @@ type MonitorRequest struct {
// LogJSON specifies if log format should be unstructured or json
LogJSON bool
// NodeID is the node we want to track the logs of
NodeID string
structs.QueryOptions
}

View File

@@ -53,8 +53,8 @@ type Agent struct {
config *Config
configLock sync.Mutex
logger log.MultiSinkLogger
httpLogger log.MultiSinkLogger
logger log.InterceptLogger
httpLogger log.Logger
logOutput io.Writer
// consulService is Nomad's custom Consul client for managing services
@@ -87,7 +87,7 @@ type Agent struct {
}
// NewAgent is used to create a new agent with the given configuration
func NewAgent(config *Config, logger log.MultiSinkLogger, logOutput io.Writer, inmem *metrics.InmemSink) (*Agent, error) {
func NewAgent(config *Config, logger log.InterceptLogger, logOutput io.Writer, inmem *metrics.InmemSink) (*Agent, error) {
a := &Agent{
config: config,
logOutput: logOutput,
@@ -97,7 +97,7 @@ func NewAgent(config *Config, logger log.MultiSinkLogger, logOutput io.Writer, i
// Create the loggers
a.logger = logger
a.httpLogger = a.logger.ResetNamed("http").(log.MultiSinkLogger)
a.httpLogger = a.logger.ResetNamed("http")
// Global logger should match internal logger as much as possible
golog.SetFlags(golog.LstdFlags | golog.Lmicroseconds)

View File

@@ -10,7 +10,6 @@ import (
"net/http"
"sort"
"strings"
"sync"
"github.com/docker/docker/pkg/ioutils"
log "github.com/hashicorp/go-hclog"
@@ -175,182 +174,102 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
return nil, CodedError(400, fmt.Sprintf("Unknown log level: %s", logLevel))
}
// START
// Determine if we are targeting a server or client
nodeID := req.URL.Query().Get("nodeID")
if nodeID != "" {
// Build the request and parse the ACL token
args := cstructs.MonitorRequest{
LogLevel: logLevel,
LogJSON: false,
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
// Build the request and parse the ACL token
args := cstructs.MonitorRequest{
NodeID: nodeID,
LogLevel: logLevel,
LogJSON: false,
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
// Determine the handler to use
useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForNode(nodeID)
// Determine the handler to use
useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForNode(nodeID)
// Make the RPC
var handler structs.StreamingRpcHandler
var handlerErr error
if useLocalClient {
handler, handlerErr = s.agent.Client().StreamingRpcHandler("Agent.Monitor")
} else if useClientRPC {
handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler("Agent.Monitor")
} else if useServerRPC {
handler, handlerErr = s.agent.Server().StreamingRpcHandler("Agent.Monitor")
} else {
handlerErr = CodedError(400, "No local Node and node_id not provided")
}
if handlerErr != nil {
return nil, CodedError(500, handlerErr.Error())
}
httpPipe, handlerPipe := net.Pipe()
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
httpPipe.Close()
}()
// Create an ouput that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)
// Create a channel that decodes the results
errCh := make(chan HTTPCodedError, 2)
// stream the response
go func() {
defer cancel()
// Send the request
if err := encoder.Encode(args); err != nil {
errCh <- CodedError(500, err.Error())
return
}
for {
select {
case <-ctx.Done():
errCh <- nil
return
default:
}
var res cstructs.StreamErrWrapper
if err := decoder.Decode(&res); err != nil {
errCh <- CodedError(500, err.Error())
return
}
decoder.Reset(httpPipe)
if err := res.Error; err != nil {
if err.Code != nil {
errCh <- CodedError(int(*err.Code), err.Error())
return
}
}
if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil {
errCh <- CodedError(500, err.Error())
return
}
}
}()
handler(handlerPipe)
cancel()
codedErr := <-errCh
if codedErr != nil &&
(codedErr == io.EOF ||
strings.Contains(codedErr.Error(), "closed") ||
strings.Contains(codedErr.Error(), "EOF")) {
codedErr = nil
}
return nil, codedErr
// Make the RPC
var handler structs.StreamingRpcHandler
var handlerErr error
if useLocalClient {
handler, handlerErr = s.agent.Client().StreamingRpcHandler("Agent.Monitor")
} else if useClientRPC {
handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler("Agent.Monitor")
} else if useServerRPC {
handler, handlerErr = s.agent.Server().StreamingRpcHandler("Agent.Monitor")
} else {
// Create flusher for streaming
flusher, ok := resp.(http.Flusher)
if !ok {
return nil, CodedError(400, "Streaming not supported")
handlerErr = CodedError(400, "No local Node and node_id not provided")
}
if handlerErr != nil {
return nil, CodedError(500, handlerErr.Error())
}
httpPipe, handlerPipe := net.Pipe()
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
httpPipe.Close()
}()
// Create an ouput that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)
// Create a channel that decodes the results
errCh := make(chan HTTPCodedError, 2)
// stream the response
go func() {
defer cancel()
// Send the request
if err := encoder.Encode(args); err != nil {
errCh <- CodedError(500, err.Error())
return
}
streamWriter := newStreamWriter(512)
streamLog := log.New(&log.LoggerOptions{
Level: log.LevelFromString(logLevel),
Output: streamWriter,
})
s.agent.logger.RegisterSink(streamLog)
defer s.agent.logger.DeregisterSink(streamLog)
notify := resp.(http.CloseNotifier).CloseNotify()
// Send header so client can start streaming body
resp.WriteHeader(http.StatusOK)
// gziphanlder needs a byte to be written and flushed in order
// to tell gzip handler to ignore this response and not compress
resp.Write([]byte("\n"))
flusher.Flush()
for {
select {
case <-notify:
s.agent.logger.DeregisterSink(streamLog)
if streamWriter.droppedCount > 0 {
s.agent.logger.Warn(fmt.Sprintf("Dropped %d logs during monitor request", streamWriter.droppedCount))
case <-ctx.Done():
errCh <- nil
return
default:
}
var res cstructs.StreamErrWrapper
if err := decoder.Decode(&res); err != nil {
errCh <- CodedError(500, err.Error())
return
}
decoder.Reset(httpPipe)
if err := res.Error; err != nil {
if err.Code != nil {
errCh <- CodedError(int(*err.Code), err.Error())
return
}
return nil, nil
case log := <-streamWriter.logCh:
fmt.Fprintln(resp, log)
flusher.Flush()
}
if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil {
errCh <- CodedError(500, err.Error())
return
}
}
}()
handler(handlerPipe)
cancel()
codedErr := <-errCh
if codedErr != nil &&
(codedErr == io.EOF ||
strings.Contains(codedErr.Error(), "closed") ||
strings.Contains(codedErr.Error(), "EOF")) {
codedErr = nil
}
return nil, nil
}
type streamWriter struct {
sync.Mutex
logs []string
logCh chan string
index int
droppedCount int
}
func newStreamWriter(buf int) *streamWriter {
return &streamWriter{
logs: make([]string, buf),
logCh: make(chan string, buf),
index: 0,
}
}
func (d *streamWriter) Write(p []byte) (n int, err error) {
d.Lock()
defer d.Unlock()
// Strip off newlines at the end if there are any since we store
// individual log lines in the agent.
n = len(p)
if p[n-1] == '\n' {
p = p[:n-1]
}
d.logs[d.index] = string(p)
d.index = (d.index + 1) % len(d.logs)
select {
case d.logCh <- string(p):
default:
d.droppedCount++
}
return
return nil, codedErr
}
func (s *HTTPServer) AgentForceLeaveRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {

View File

@@ -447,7 +447,7 @@ func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, io.Writer)
}
// setupAgent is used to start the agent and various interfaces
func (c *Command) setupAgent(config *Config, logger hclog.MultiSinkLogger, logOutput io.Writer, inmem *metrics.InmemSink) error {
func (c *Command) setupAgent(config *Config, logger hclog.InterceptLogger, logOutput io.Writer, inmem *metrics.InmemSink) error {
c.Ui.Output("Starting Nomad agent...")
agent, err := NewAgent(config, logger, logOutput, inmem)
if err != nil {
@@ -602,7 +602,7 @@ func (c *Command) Run(args []string) int {
}
// Create logger
logger := hclog.NewMultiSink(&hclog.LoggerOptions{
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Name: "agent",
Level: hclog.LevelFromString(config.LogLevel),
Output: logOutput,

View File

@@ -106,8 +106,6 @@ func NewHTTPServer(agent *Agent, config *Config) (*HTTPServer, error) {
srv.registerHandlers(config.EnableDebug)
// Handle requests with gzip compression
// Use MinSize of 1 to allow a zero byte flush to return
// response header used for streaming
gzip, err := gziphandler.GzipHandlerWithOpts(gziphandler.MinSize(0))
if err != nil {
return nil, err

View File

@@ -0,0 +1,72 @@
package monitor
import (
"sync"
log "github.com/hashicorp/go-hclog"
)
type Monitor struct {
sync.Mutex
sink log.SinkAdapter
logger log.InterceptLogger
logCh chan []byte
index int
droppedCount int
bufSize int
}
func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor {
sw := &Monitor{
logger: logger,
logCh: make(chan []byte, buf),
index: 0,
bufSize: buf,
}
opts.Output = sw
sink := log.NewSinkAdapter(opts)
sw.sink = sink
return sw
}
func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
d.logger.RegisterSink(d.sink)
logCh := make(chan []byte, d.bufSize)
go func() {
for {
select {
case log := <-d.logCh:
logCh <- log
case <-stopCh:
d.logger.DeregisterSink(d.sink)
close(d.logCh)
return
}
}
}()
return logCh
}
// Write attemps to send latest log to logCh
// it drops the log if channel is unavailable to receive
func (d *Monitor) Write(p []byte) (n int, err error) {
bytes := make([]byte, len(p))
copy(bytes, p)
select {
case d.logCh <- bytes:
default:
d.Lock()
defer d.Unlock()
d.droppedCount++
if d.droppedCount > 10 {
d.logger.Warn("Monitor dropped %d logs during monitor request", d.droppedCount)
d.droppedCount = 0
}
}
return
}

View File

@@ -0,0 +1,62 @@
package monitor
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
log "github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
)
func TestMonitor_Start(t *testing.T) {
t.Parallel()
logger := log.NewInterceptLogger(&log.LoggerOptions{
Level: log.Error,
})
m := New(512, logger, &log.LoggerOptions{
Level: log.Debug,
})
closeCh := make(chan struct{})
defer close(closeCh)
logCh := m.Start(closeCh)
go func() {
for {
select {
case log := <-logCh:
require.Contains(t, string(log), "[DEBUG] test log")
case <-time.After(1 * time.Second):
t.Fatal("Expected to receive from log channel")
}
}
}()
logger.Debug("test log")
}
func TestMonitor_DroppedMessages(t *testing.T) {
t.Parallel()
logger := log.NewInterceptLogger(&log.LoggerOptions{
Level: log.Warn,
})
m := New(5, logger, &log.LoggerOptions{
Level: log.Debug,
})
doneCh := make(chan struct{})
defer close(doneCh)
m.Start(doneCh)
for i := 0; i <= 6; i++ {
logger.Debug("test message")
}
assert.Equal(t, 1, m.droppedCount)
}

View File

@@ -216,7 +216,7 @@ func (a *TestAgent) start() (*Agent, error) {
return nil, fmt.Errorf("unable to set up in memory metrics needed for agent initialization")
}
logger := hclog.NewMultiSink(&hclog.LoggerOptions{
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Name: "agent",
Level: hclog.LevelFromString(a.Config.LogLevel),
Output: a.LogOutput,

View File

@@ -19,7 +19,7 @@ job "example" {
network {
mbits = 10
port "db" {}
port "db" {}
}
}
}

View File

@@ -316,7 +316,7 @@ job "example" {
network {
mbits = 10
port "db" {}
port "db" {}
}
}
# The "service" stanza instructs Nomad to register this task as a service

View File

@@ -23,7 +23,7 @@ job "client" {
network {
mbits = 10
port "http"{}
port "http" {}
}
}

View File

@@ -49,7 +49,7 @@ job "consul-example" {
network {
mbits = 10
port "db" {}
port "db" {}
}
}

View File

@@ -29,7 +29,7 @@ job "hello" {
network {
mbits = 10
port "web" {}
port "web" {}
}
}

View File

@@ -39,7 +39,7 @@ job "redis" {
network {
mbits = 10
port "db" {}
port "db" {}
}
}

View File

@@ -28,7 +28,7 @@ job "nginx" {
network {
mbits = 1
port "http"{}
port "http" {}
}
}

View File

@@ -64,7 +64,7 @@ EOH
resources {
network {
mbits = 10
port "prometheus_ui"{}
port "prometheus_ui" {}
}
}

View File

@@ -72,7 +72,7 @@ func Logger(t LogPrinter) *log.Logger {
}
//HCLogger returns a new test hc-logger.
func HCLogger(t LogPrinter) hclog.MultiSinkLogger {
func HCLogger(t LogPrinter) hclog.InterceptLogger {
level := hclog.Trace
envLogLevel := os.Getenv("NOMAD_TEST_LOG_LEVEL")
if envLogLevel != "" {
@@ -83,7 +83,7 @@ func HCLogger(t LogPrinter) hclog.MultiSinkLogger {
Output: NewWriter(t),
IncludeLocation: true,
}
return hclog.NewMultiSink(opts)
return hclog.NewInterceptLogger(opts)
}
type prefixStdout struct {

View File

@@ -0,0 +1,186 @@
package nomad
import (
"context"
"errors"
"fmt"
"io"
"net"
"strings"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/acl"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
)
type Monitor struct {
srv *Server
}
func (m *Monitor) register() {
m.srv.streamingRpcs.Register("Agent.Monitor", m.monitor)
}
func (m *Monitor) monitor(conn io.ReadWriteCloser) {
defer conn.Close()
// Decode args
var args cstructs.MonitorRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&args); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
// Check node read permissions
if aclObj, err := m.srv.ResolveToken(args.AuthToken); err != nil {
handleStreamResultError(err, nil, encoder)
return
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadFS) {
handleStreamResultError(structs.ErrPermissionDenied, nil, encoder)
return
}
var logLevel log.Level
if args.LogLevel == "" {
logLevel = log.LevelFromString("INFO")
} else {
logLevel = log.LevelFromString(args.LogLevel)
}
if logLevel == log.NoLevel {
handleStreamResultError(errors.New("Unknown log level"), helper.Int64ToPtr(400), encoder)
return
}
// Targeting a client so forward the request
if args.NodeID != "" {
nodeID := args.NodeID
snap, err := m.srv.State().Snapshot()
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}
node, err := snap.NodeByID(nil, nodeID)
if err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
if node == nil {
err := fmt.Errorf("Unknown node %q", nodeID)
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
if err := nodeSupportsRpc(node); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
// Get the Connection to the client either by fowarding to another server
// or creating direct stream
var clientConn net.Conn
state, ok := m.srv.getNodeConn(nodeID)
if !ok {
// Determine the server that has a connection to the node
srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region())
if err != nil {
var code *int64
if structs.IsErrNoNodeConn(err) {
code = helper.Int64ToPtr(404)
}
handleStreamResultError(err, code, encoder)
return
}
conn, err := m.srv.streamingRpc(srv, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}
clientConn = conn
} else {
stream, err := NodeStreamingRpc(state.Session, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}
clientConn = stream
}
defer clientConn.Close()
// Send the Request
outEncoder := codec.NewEncoder(clientConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
handleStreamResultError(err, nil, encoder)
return
}
structs.Bridge(conn, clientConn)
return
}
// NodeID was empty, so monitor this current server
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
monitor := monitor.New(512, m.srv.logger, &log.LoggerOptions{
Level: logLevel,
JSONFormat: false,
})
go func() {
if _, err := conn.Read(nil); err != nil {
close(stopCh)
cancel()
return
}
select {
case <-ctx.Done():
return
}
}()
logCh := monitor.Start(stopCh)
var streamErr error
OUTER:
for {
select {
case log := <-logCh:
var resp cstructs.StreamErrWrapper
resp.Payload = log
if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
}
encoder.Reset(conn)
case <-ctx.Done():
break OUTER
}
}
if streamErr != nil {
// Nothing to do as conn is closed
if streamErr == io.EOF || strings.Contains(streamErr.Error(), "closed") {
return
}
// Attempt to send the error
encoder.Encode(&cstructs.StreamErrWrapper{
Error: cstructs.NewRpcError(streamErr, helper.Int64ToPtr(500)),
})
return
}
}

View File

@@ -76,7 +76,7 @@ type Config struct {
LogOutput io.Writer
// Logger is the logger used by the server.
Logger log.Logger
Logger log.InterceptLogger
// ProtocolVersion is the protocol version to speak. This must be between
// ProtocolVersionMin and ProtocolVersionMax.

View File

@@ -91,7 +91,7 @@ const (
type Server struct {
config *Config
logger log.Logger
logger log.InterceptLogger
// Connection pool to other Nomad servers
connPool *pool.ConnPool
@@ -252,6 +252,7 @@ type endpoints struct {
// Client endpoints
ClientStats *ClientStats
FileSystem *FileSystem
Monitor *Monitor
ClientAllocations *ClientAllocations
}
@@ -290,7 +291,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error)
}
// Create the logger
logger := config.Logger.ResetNamed("nomad")
logger := config.Logger.ResetNamedIntercept("nomad")
// Create the server
s := &Server{
@@ -1044,6 +1045,9 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) {
// Streaming endpoints
s.staticEndpoints.FileSystem = &FileSystem{srv: s, logger: s.logger.Named("client_fs")}
s.staticEndpoints.FileSystem.register()
s.staticEndpoints.Monitor = &Monitor{srv: s}
s.staticEndpoints.Monitor.register()
}
// Register the static handlers

27
vendor/github.com/hashicorp/go-hclog/colorize_unix.go generated vendored Normal file
View File

@@ -0,0 +1,27 @@
// +build !windows
package hclog
import (
"github.com/mattn/go-isatty"
)
// setColorization will mutate the values of this logger
// to approperately configure colorization options. It provides
// a wrapper to the output stream on Windows systems.
func (l *intLogger) setColorization(opts *LoggerOptions) {
switch opts.Color {
case ColorOff:
fallthrough
case ForceColor:
return
case AutoColor:
fi := l.checkWriterIsFile()
isUnixTerm := isatty.IsTerminal(fi.Fd())
isCygwinTerm := isatty.IsCygwinTerminal(fi.Fd())
isTerm := isUnixTerm || isCygwinTerm
if !isTerm {
l.writer.color = ColorOff
}
}
}

View File

@@ -0,0 +1,33 @@
// +build windows
package hclog
import (
"os"
colorable "github.com/mattn/go-colorable"
"github.com/mattn/go-isatty"
)
// setColorization will mutate the values of this logger
// to approperately configure colorization options. It provides
// a wrapper to the output stream on Windows systems.
func (l *intLogger) setColorization(opts *LoggerOptions) {
switch opts.Color {
case ColorOff:
return
case ForceColor:
fi := l.checkWriterIsFile()
l.writer.w = colorable.NewColorable(fi)
case AutoColor:
fi := l.checkWriterIsFile()
isUnixTerm := isatty.IsTerminal(os.Stdout.Fd())
isCygwinTerm := isatty.IsCygwinTerminal(os.Stdout.Fd())
isTerm := isUnixTerm || isCygwinTerm
if !isTerm {
l.writer.color = ColorOff
return
}
l.writer.w = colorable.NewColorable(fi)
}
}

View File

@@ -2,6 +2,11 @@ module github.com/hashicorp/go-hclog
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.7.0
github.com/mattn/go-colorable v0.1.4
github.com/mattn/go-isatty v0.0.10
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.2.2
)
go 1.13

View File

@@ -1,6 +1,16 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.10 h1:qxFzApOv4WsAL965uUPIsXzAKCZxN2p9UqdhFS4ZW10=
github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

195
vendor/github.com/hashicorp/go-hclog/interceptlogger.go generated vendored Normal file
View File

@@ -0,0 +1,195 @@
package hclog
import (
"sync"
"sync/atomic"
)
var _ Logger = &interceptLogger{}
type interceptLogger struct {
Logger
sync.Mutex
sinkCount *int32
Sinks map[SinkAdapter]struct{}
}
func NewInterceptLogger(opts *LoggerOptions) InterceptLogger {
intercept := &interceptLogger{
Logger: New(opts),
sinkCount: new(int32),
Sinks: make(map[SinkAdapter]struct{}),
}
atomic.StoreInt32(intercept.sinkCount, 0)
return intercept
}
// Emit the message and args at TRACE level to log and sinks
func (i *interceptLogger) Trace(msg string, args ...interface{}) {
i.Logger.Trace(msg, args...)
if atomic.LoadInt32(i.sinkCount) == 0 {
return
}
i.Lock()
defer i.Unlock()
for s := range i.Sinks {
s.Accept(i.Name(), Trace, msg, i.retrieveImplied(args...)...)
}
}
// Emit the message and args at DEBUG level to log and sinks
func (i *interceptLogger) Debug(msg string, args ...interface{}) {
i.Logger.Debug(msg, args...)
if atomic.LoadInt32(i.sinkCount) == 0 {
return
}
i.Lock()
defer i.Unlock()
for s := range i.Sinks {
s.Accept(i.Name(), Debug, msg, i.retrieveImplied(args...)...)
}
}
// Emit the message and args at INFO level to log and sinks
func (i *interceptLogger) Info(msg string, args ...interface{}) {
i.Logger.Info(msg, args...)
if atomic.LoadInt32(i.sinkCount) == 0 {
return
}
i.Lock()
defer i.Unlock()
for s := range i.Sinks {
s.Accept(i.Name(), Info, msg, i.retrieveImplied(args...)...)
}
}
// Emit the message and args at WARN level to log and sinks
func (i *interceptLogger) Warn(msg string, args ...interface{}) {
i.Logger.Warn(msg, args...)
if atomic.LoadInt32(i.sinkCount) == 0 {
return
}
i.Lock()
defer i.Unlock()
for s := range i.Sinks {
s.Accept(i.Name(), Warn, msg, i.retrieveImplied(args...)...)
}
}
// Emit the message and args at ERROR level to log and sinks
func (i *interceptLogger) Error(msg string, args ...interface{}) {
i.Logger.Error(msg, args...)
if atomic.LoadInt32(i.sinkCount) == 0 {
return
}
i.Lock()
defer i.Unlock()
for s := range i.Sinks {
s.Accept(i.Name(), Error, msg, i.retrieveImplied(args...)...)
}
}
func (i *interceptLogger) retrieveImplied(args ...interface{}) []interface{} {
top := i.Logger.ImpliedArgs()
cp := make([]interface{}, len(top)+len(args))
copy(cp, top)
copy(cp[len(top):], args)
return cp
}
// Create a new sub-Logger that a name decending from the current name.
// This is used to create a subsystem specific Logger.
// Registered sinks will subscribe to these messages as well.
func (i *interceptLogger) Named(name string) Logger {
var sub interceptLogger
sub = *i
sub.Logger = i.Logger.Named(name)
return &sub
}
// Create a new sub-Logger with an explicit name. This ignores the current
// name. This is used to create a standalone logger that doesn't fall
// within the normal hierarchy. Registered sinks will subscribe
// to these messages as well.
func (i *interceptLogger) ResetNamed(name string) Logger {
var sub interceptLogger
sub = *i
sub.Logger = i.Logger.ResetNamed(name)
return &sub
}
// Create a new sub-Logger that a name decending from the current name.
// This is used to create a subsystem specific Logger.
// Registered sinks will subscribe to these messages as well.
func (i *interceptLogger) NamedIntercept(name string) InterceptLogger {
var sub interceptLogger
sub = *i
sub.Logger = i.Logger.Named(name)
return &sub
}
// Create a new sub-Logger with an explicit name. This ignores the current
// name. This is used to create a standalone logger that doesn't fall
// within the normal hierarchy. Registered sinks will subscribe
// to these messages as well.
func (i *interceptLogger) ResetNamedIntercept(name string) InterceptLogger {
var sub interceptLogger
sub = *i
sub.Logger = i.Logger.ResetNamed(name)
return &sub
}
// Return a sub-Logger for which every emitted log message will contain
// the given key/value pairs. This is used to create a context specific
// Logger.
func (i *interceptLogger) With(args ...interface{}) Logger {
var sub interceptLogger
sub = *i
sub.Logger = i.Logger.With(args...)
return &sub
}
// RegisterSink attaches a SinkAdapter to interceptLoggers sinks.
func (i *interceptLogger) RegisterSink(sink SinkAdapter) {
i.Lock()
defer i.Unlock()
i.Sinks[sink] = struct{}{}
atomic.AddInt32(i.sinkCount, 1)
}
// DeregisterSink removes a SinkAdapter from interceptLoggers sinks.
func (i *interceptLogger) DeregisterSink(sink SinkAdapter) {
i.Lock()
defer i.Unlock()
delete(i.Sinks, sink)
atomic.AddInt32(i.sinkCount, -1)
}

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"io"
"log"
"os"
"reflect"
"runtime"
"sort"
@@ -15,6 +16,8 @@ import (
"sync"
"sync/atomic"
"time"
"github.com/fatih/color"
)
// TimeFormat to use for logging. This is a version of RFC3339 that contains
@@ -32,11 +35,18 @@ var (
Warn: "[WARN] ",
Error: "[ERROR]",
}
_levelToColor = map[Level]*color.Color{
Debug: color.New(color.FgHiWhite),
Trace: color.New(color.FgHiGreen),
Info: color.New(color.FgHiBlue),
Warn: color.New(color.FgHiYellow),
Error: color.New(color.FgHiRed),
}
)
// Make sure that intLogger is a Logger
var _ Logger = &intLogger{}
var _ MultiSinkLogger = &intLogger{}
// intLogger is an internal logger implementation. Internal in that it is
// defined entirely by this package.
@@ -52,13 +62,21 @@ type intLogger struct {
writer *writer
level *int32
sinks map[Logger]struct{}
implied []interface{}
}
// New returns a configured logger.
func New(opts *LoggerOptions) Logger {
return newLogger(opts)
}
// NewSinkAdapter returns a SinkAdapter with configured settings
// defined by LoggerOptions
func NewSinkAdapter(opts *LoggerOptions) SinkAdapter {
return newLogger(opts)
}
func newLogger(opts *LoggerOptions) *intLogger {
if opts == nil {
opts = &LoggerOptions{}
}
@@ -84,11 +102,12 @@ func New(opts *LoggerOptions) Logger {
name: opts.Name,
timeFormat: TimeFormat,
mutex: mutex,
writer: newWriter(output),
writer: newWriter(output, opts.Color),
level: new(int32),
sinks: make(map[Logger]struct{}),
}
l.setColorization(opts)
if opts.TimeFormat != "" {
l.timeFormat = opts.TimeFormat
}
@@ -98,31 +117,10 @@ func New(opts *LoggerOptions) Logger {
return l
}
func NewMultiSink(opts *LoggerOptions) MultiSinkLogger {
return New(opts).(MultiSinkLogger)
}
func (l *intLogger) RegisterSink(logger Logger) {
l.mutex.Lock()
defer l.mutex.Unlock()
if _, ok := l.sinks[logger]; ok {
return
}
l.sinks[logger] = struct{}{}
}
func (l *intLogger) DeregisterSink(logger Logger) {
l.mutex.Lock()
defer l.mutex.Unlock()
delete(l.sinks, logger)
}
// Log a message and a set of key/value pairs if the given level is at
// or more severe that the threshold configured in the Logger.
func (l *intLogger) Log(level Level, msg string, args ...interface{}) {
if level < Level(atomic.LoadInt32(l.level)) && len(l.sinks) == 0 {
func (l *intLogger) Log(name string, level Level, msg string, args ...interface{}) {
if level < Level(atomic.LoadInt32(l.level)) {
return
}
@@ -131,36 +129,10 @@ func (l *intLogger) Log(level Level, msg string, args ...interface{}) {
l.mutex.Lock()
defer l.mutex.Unlock()
for lh := range l.sinks {
lh, ok := lh.(*intLogger)
if !ok {
continue
}
if level < Level(atomic.LoadInt32(lh.level)) {
continue
}
// Set the sink name to the name of the calling log
lh.name = l.name
if lh.json {
lh.logJSON(t, level, msg, args...)
} else {
lh.log(t, level, msg, args...)
}
lh.writer.Flush(level)
}
if level < Level(atomic.LoadInt32(l.level)) {
return
}
if l.json {
l.logJSON(t, level, msg, args...)
l.logJSON(t, name, level, msg, args...)
} else {
l.log(t, level, msg, args...)
l.log(t, name, level, msg, args...)
}
l.writer.Flush(level)
@@ -196,7 +168,7 @@ func trimCallerPath(path string) string {
}
// Non-JSON logging format function
func (l *intLogger) log(t time.Time, level Level, msg string, args ...interface{}) {
func (l *intLogger) log(t time.Time, name string, level Level, msg string, args ...interface{}) {
l.writer.WriteString(t.Format(l.timeFormat))
l.writer.WriteByte(' ')
@@ -219,8 +191,8 @@ func (l *intLogger) log(t time.Time, level Level, msg string, args ...interface{
l.writer.WriteByte(' ')
if l.name != "" {
l.writer.WriteString(l.name)
if name != "" {
l.writer.WriteString(name)
l.writer.WriteString(": ")
}
@@ -349,8 +321,8 @@ func (l *intLogger) renderSlice(v reflect.Value) string {
}
// JSON logging function
func (l *intLogger) logJSON(t time.Time, level Level, msg string, args ...interface{}) {
vals := l.jsonMapEntry(t, level, msg)
func (l *intLogger) logJSON(t time.Time, name string, level Level, msg string, args ...interface{}) {
vals := l.jsonMapEntry(t, name, level, msg)
args = append(l.implied, args...)
if args != nil && len(args) > 0 {
@@ -392,7 +364,7 @@ func (l *intLogger) logJSON(t time.Time, level Level, msg string, args ...interf
err := json.NewEncoder(l.writer).Encode(vals)
if err != nil {
if _, ok := err.(*json.UnsupportedTypeError); ok {
plainVal := l.jsonMapEntry(t, level, msg)
plainVal := l.jsonMapEntry(t, name, level, msg)
plainVal["@warn"] = errJsonUnsupportedTypeMsg
json.NewEncoder(l.writer).Encode(plainVal)
@@ -400,7 +372,7 @@ func (l *intLogger) logJSON(t time.Time, level Level, msg string, args ...interf
}
}
func (l intLogger) jsonMapEntry(t time.Time, level Level, msg string) map[string]interface{} {
func (l intLogger) jsonMapEntry(t time.Time, name string, level Level, msg string) map[string]interface{} {
vals := map[string]interface{}{
"@message": msg,
"@timestamp": t.Format("2006-01-02T15:04:05.000000Z07:00"),
@@ -424,8 +396,8 @@ func (l intLogger) jsonMapEntry(t time.Time, level Level, msg string) map[string
vals["@level"] = levelStr
if l.name != "" {
vals["@module"] = l.name
if name != "" {
vals["@module"] = name
}
if l.caller {
@@ -438,27 +410,27 @@ func (l intLogger) jsonMapEntry(t time.Time, level Level, msg string) map[string
// Emit the message and args at DEBUG level
func (l *intLogger) Debug(msg string, args ...interface{}) {
l.Log(Debug, msg, args...)
l.Log(l.Name(), Debug, msg, args...)
}
// Emit the message and args at TRACE level
func (l *intLogger) Trace(msg string, args ...interface{}) {
l.Log(Trace, msg, args...)
l.Log(l.Name(), Trace, msg, args...)
}
// Emit the message and args at INFO level
func (l *intLogger) Info(msg string, args ...interface{}) {
l.Log(Info, msg, args...)
l.Log(l.Name(), Info, msg, args...)
}
// Emit the message and args at WARN level
func (l *intLogger) Warn(msg string, args ...interface{}) {
l.Log(Warn, msg, args...)
l.Log(l.Name(), Warn, msg, args...)
}
// Emit the message and args at ERROR level
func (l *intLogger) Error(msg string, args ...interface{}) {
l.Log(Error, msg, args...)
l.Log(l.Name(), Error, msg, args...)
}
// Indicate that the logger would emit TRACE level logs
@@ -576,3 +548,28 @@ func (l *intLogger) StandardWriter(opts *StandardLoggerOptions) io.Writer {
forceLevel: opts.ForceLevel,
}
}
// checks if the underlying io.Writer is a file, and
// panics if not. For use by colorization.
func (l *intLogger) checkWriterIsFile() *os.File {
fi, ok := l.writer.w.(*os.File)
if !ok {
panic("Cannot enable coloring of non-file Writers")
}
return fi
}
// Accept implements the SinkAdapter interface
func (i *intLogger) Accept(name string, level Level, msg string, args ...interface{}) {
i.Log(name, level, msg, args...)
}
// ImpliedArgs returns the loggers implied args
func (i *intLogger) ImpliedArgs() []interface{} {
return i.implied
}
// Name returns the loggers name
func (i *intLogger) Name() string {
return i.name
}

View File

@@ -53,6 +53,21 @@ func Fmt(str string, args ...interface{}) Format {
return append(Format{str}, args...)
}
// ColorOption expresses how the output should be colored, if at all.
type ColorOption uint8
const (
// ColorOff is the default coloration, and does not
// inject color codes into the io.Writer.
ColorOff ColorOption = iota
// AutoColor checks if the io.Writer is a tty,
// and if so enables coloring.
AutoColor
// ForceColor will enable coloring, regardless of whether
// the io.Writer is a tty or not.
ForceColor
)
// LevelFromString returns a Level type for the named log level, or "NoLevel" if
// the level string is invalid. This facilitates setting the log level via
// config or environment variable by name in a predictable way.
@@ -111,9 +126,14 @@ type Logger interface {
// Indicate if ERROR logs would be emitted. This and the other Is* guards
IsError() bool
ImpliedArgs() []interface{}
// Creates a sublogger that will always have the given key/value pairs
With(args ...interface{}) Logger
// Returns the Name of the logger
Name() string
// Create a logger that will prepend the name string on the front of all messages.
// If the logger already has a name, the new value will be appended to the current
// name. That way, a major subsystem can use this to decorate all it's own logs
@@ -136,17 +156,6 @@ type Logger interface {
StandardWriter(opts *StandardLoggerOptions) io.Writer
}
// MultiSinkLogger describes the interface that allows a logger to
// write to multiple sub loggers which may be configured to have different
// level and writer settings. This is useful for monitor commands to allow
// for streaming of logs at a lower level than what is set for the parent logger
type MultiSinkLogger interface {
Logger
RegisterSink(logger Logger)
DeregisterSink(logger Logger)
}
// StandardLoggerOptions can be used to configure a new standard logger.
type StandardLoggerOptions struct {
// Indicate that some minimal parsing should be done on strings to try
@@ -184,4 +193,41 @@ type LoggerOptions struct {
// The time format to use instead of the default
TimeFormat string
// Color the output. On Windows, colored logs are only avaiable for io.Writers that
// are concretely instances of *os.File.
Color ColorOption
}
// InterceptLogger describes the interface for using a logger
// that can register different output sinks.
// This is useful for sending lower level log messages
// to a different output while keeping the root logger
// at a higher one.
type InterceptLogger interface {
// Logger is the root logger for an InterceptLogger
Logger
// RegisterSink adds a SinkAdapter to the InterceptLogger
RegisterSink(sink SinkAdapter)
// DeregisterSink removes a SinkAdapter from the InterceptLogger
DeregisterSink(sink SinkAdapter)
// Create a interceptlogger that will prepend the name string on the front of all messages.
// If the logger already has a name, the new value will be appended to the current
// name. That way, a major subsystem can use this to decorate all it's own logs
// without losing context.
NamedIntercept(name string) InterceptLogger
// Create a interceptlogger that will prepend the name string on the front of all messages.
// This sets the name of the logger to the value directly, unlike Named which honor
// the current name as well.
ResetNamedIntercept(name string) InterceptLogger
}
// SinkAdapter describes the interface that must be implemented
// in order to Register a new sink to an InterceptLogger
type SinkAdapter interface {
Accept(name string, level Level, msg string, args ...interface{})
}

View File

@@ -35,8 +35,12 @@ func (l *nullLogger) IsWarn() bool { return false }
func (l *nullLogger) IsError() bool { return false }
func (l *nullLogger) ImpliedArgs() []interface{} { return []interface{}{} }
func (l *nullLogger) With(args ...interface{}) Logger { return l }
func (l *nullLogger) Name() string { return "" }
func (l *nullLogger) Named(name string) Logger { return l }
func (l *nullLogger) ResetNamed(name string) Logger { return l }

View File

@@ -6,19 +6,27 @@ import (
)
type writer struct {
b bytes.Buffer
w io.Writer
b bytes.Buffer
w io.Writer
color ColorOption
}
func newWriter(w io.Writer) *writer {
return &writer{w: w}
func newWriter(w io.Writer, color ColorOption) *writer {
return &writer{w: w, color: color}
}
func (w *writer) Flush(level Level) (err error) {
var unwritten = w.b.Bytes()
if w.color != ColorOff {
color := _levelToColor[level]
unwritten = []byte(color.Sprintf("%s", unwritten))
}
if lw, ok := w.w.(LevelWriter); ok {
_, err = lw.LevelWrite(level, w.b.Bytes())
_, err = lw.LevelWrite(level, unwritten)
} else {
_, err = w.w.Write(w.b.Bytes())
_, err = w.w.Write(unwritten)
}
w.b.Reset()
return err

2
vendor/vendor.json vendored
View File

@@ -209,7 +209,7 @@
{"path":"github.com/hashicorp/go-envparse","checksumSHA1":"FKmqR4DC3nCXtnT9pe02z5CLNWo=","revision":"310ca1881b22af3522e3a8638c0b426629886196","revisionTime":"2018-01-19T21:58:41Z"},
{"path":"github.com/hashicorp/go-getter","checksumSHA1":"d4brua17AGQqMNtngK4xKOUwboY=","revision":"f5101da0117392c6e7960c934f05a2fd689a5b5f","revisionTime":"2019-08-22T19:45:07Z"},
{"path":"github.com/hashicorp/go-getter/helper/url","checksumSHA1":"9J+kDr29yDrwsdu2ULzewmqGjpA=","revision":"b345bfcec894fb7ff3fdf9b21baf2f56ea423d98","revisionTime":"2018-04-10T17:49:45Z"},
{"path":"github.com/hashicorp/go-hclog","checksumSHA1":"uTAjKuGQr4/gpcgdEtTO+JhD/NY=","revision":"a4c7052ea48d1c284eca6ba6281910f0fd3b7b30","revisionTime":"2019-10-10T18:01:30Z","version":"f-multi-sink-logger","versionExact":"f-multi-sink-logger"},
{"path":"github.com/hashicorp/go-hclog","checksumSHA1":"+gkR2S9qSVVy3gor4p0Z86RMgZI=","revision":"2bec91ee9db63c8546a105f89da3eb63037c5e6c","revisionTime":"2019-10-18T20:14:58Z","version":"f-multi-sink","versionExact":"f-multi-sink"},
{"path":"github.com/hashicorp/go-immutable-radix","checksumSHA1":"Cas2nprG6pWzf05A2F/OlnjUu2Y=","revision":"8aac2701530899b64bdea735a1de8da899815220","revisionTime":"2017-07-25T22:12:15Z"},
{"path":"github.com/hashicorp/go-memdb","checksumSHA1":"FMAvwDar2bQyYAW4XMFhAt0J5xA=","revision":"20ff6434c1cc49b80963d45bf5c6aa89c78d8d57","revisionTime":"2017-08-31T20:15:40Z"},
{"path":"github.com/hashicorp/go-msgpack/codec","checksumSHA1":"CKGYNUDKre3Z2g4hHNVfp5nTcfA=","revision":"23165f7bc3c2dda1891434ebb9da1511a7bafc1c","revisionTime":"2019-09-27T12:33:13Z","version":"upstream-08f7b40","versionExact":"upstream-08f7b40"},