From 12819975ee7ae152b07ca62a67efb58d264ba18e Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Thu, 10 Oct 2019 15:30:37 -0400 Subject: [PATCH] remove log_writer prefix output with proper spacing update gzip handler, adjust first byte flow to allow gzip handler bypass wip, first stab at wiring up rpc endpoint --- api/agent.go | 5 +- api/agent_test.go | 27 +++-- client/client.go | 8 +- client/config/config.go | 2 +- client/monitor_endpoint.go | 153 +++++++++++++++++++++++++ client/rpc.go | 3 + client/structs/structs.go | 10 ++ command/agent/agent_endpoint.go | 164 ++++++++++++++++++++++----- command/agent/agent_endpoint_test.go | 38 ++++++- command/agent/command.go | 17 ++- command/agent/http.go | 2 +- command/agent/log_writer.go | 83 -------------- command/agent/log_writer_test.go | 52 --------- command/agent_monitor.go | 14 ++- vendor/vendor.json | 2 +- 15 files changed, 387 insertions(+), 193 deletions(-) create mode 100644 client/monitor_endpoint.go delete mode 100644 command/agent/log_writer.go delete mode 100644 command/agent/log_writer_test.go diff --git a/api/agent.go b/api/agent.go index 6df8482bd..1fa20a00e 100644 --- a/api/agent.go +++ b/api/agent.go @@ -240,7 +240,7 @@ func (a *Agent) Health() (*AgentHealthResponse, error) { // Monitor returns a channel which will receive streaming logs from the agent // Providing a non-nil stopCh can be used to close the connection and stop log streaming -func (a *Agent) Monitor(loglevel string, stopCh <-chan struct{}, q *QueryOptions) (chan string, error) { +func (a *Agent) Monitor(loglevel string, nodeID string, stopCh <-chan struct{}, q *QueryOptions) (chan string, error) { r, err := a.client.newRequest("GET", "/v1/agent/monitor") if err != nil { return nil, err @@ -250,6 +250,9 @@ func (a *Agent) Monitor(loglevel string, stopCh <-chan struct{}, q *QueryOptions if loglevel != "" { r.params.Add("loglevel", loglevel) } + if nodeID != "" { + r.params.Add("nodeID", nodeID) + } _, resp, err := requireOK(a.client.doRequest(r)) if err != nil { diff --git a/api/agent_test.go b/api/agent_test.go index b80e3ac5a..080f9af42 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -3,10 +3,11 @@ package api import ( "reflect" "sort" - "strings" "testing" "time" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/api/internal/testutil" "github.com/stretchr/testify/assert" ) @@ -267,19 +268,27 @@ func TestAgent_Monitor(t *testing.T) { agent := c.Agent() - logCh, err := agent.Monitor("info", nil, nil) + doneCh := make(chan struct{}) + logCh, err := agent.Monitor("debug", doneCh, nil) + defer close(doneCh) if err != nil { t.Fatalf("err: %v", err) } + // make a request to generate some logs + _, err = agent.Region() + require.NoError(t, err) + // Wait for the first log message and validate it - select { - case log := <-logCh: - // TODO: checkout why stub_asset.go help text returns here - if !strings.Contains(log, "[INFO ] nomad: raft: Initial configuration") { - t.Fatalf("bad: %q", log) + for { + select { + case log := <-logCh: + if log == " " { + return + } + require.Contains(t, log, "[DEBUG]") + case <-time.After(10 * time.Second): + require.Fail(t, "failed to get a log message") } - case <-time.After(1000 * time.Second): - t.Fatalf("failed to get a log message") } } diff --git a/client/client.go b/client/client.go index e5d688ef1..a1bf34c08 100644 --- a/client/client.go +++ b/client/client.go @@ -163,8 +163,8 @@ type Client struct { configCopy *config.Config configLock sync.RWMutex - logger hclog.Logger - rpcLogger hclog.Logger + logger hclog.MultiSinkLogger + rpcLogger hclog.MultiSinkLogger connPool *pool.ConnPool @@ -304,7 +304,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic } // Create the logger - logger := cfg.Logger.ResetNamed("client") + logger := cfg.Logger.ResetNamed("client").(hclog.MultiSinkLogger) // Create the client c := &Client{ @@ -316,7 +316,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic tlsWrap: tlsWrap, streamingRpcs: structs.NewStreamingRpcRegistry(), logger: logger, - rpcLogger: logger.Named("rpc"), + rpcLogger: logger.Named("rpc").(hclog.MultiSinkLogger), allocs: make(map[string]AllocRunner), allocUpdates: make(chan *structs.Allocation, 64), shutdownCh: make(chan struct{}), diff --git a/client/config/config.go b/client/config/config.go index 4783bea56..0df3cac27 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -81,7 +81,7 @@ type Config struct { LogOutput io.Writer // Logger provides a logger to thhe client - Logger log.Logger + Logger log.MultiSinkLogger // Region is the clients region Region string diff --git a/client/monitor_endpoint.go b/client/monitor_endpoint.go new file mode 100644 index 000000000..315714ec6 --- /dev/null +++ b/client/monitor_endpoint.go @@ -0,0 +1,153 @@ +package client + +import ( + "context" + "errors" + "io" + "sync" + "time" + + "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/ugorji/go/codec" + + metrics "github.com/armon/go-metrics" + log "github.com/hashicorp/go-hclog" + cstructs "github.com/hashicorp/nomad/client/structs" +) + +type Monitor struct { + c *Client +} + +func NewMonitorEndpoint(c *Client) *Monitor { + m := &Monitor{c: c} + m.c.streamingRpcs.Register("Client.Monitor", m.monitor) + return m +} + +func (m *Monitor) monitor(conn io.ReadWriteCloser) { + defer metrics.MeasureSince([]string{"client", "monitor", "monitor"}, time.Now()) + // defer conn.Close() + + // Decode arguments + var req cstructs.MonitorRequest + decoder := codec.NewDecoder(conn, structs.MsgpackHandle) + encoder := codec.NewEncoder(conn, structs.MsgpackHandle) + + if err := decoder.Decode(&req); err != nil { + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } + + // Check acl + if aclObj, err := m.c.ResolveToken(req.QueryOptions.AuthToken); err != nil { + handleStreamResultError(err, helper.Int64ToPtr(403), encoder) + return + } else if aclObj != nil && !aclObj.AllowNsOp(req.Namespace, acl.NamespaceCapabilityReadFS) { + handleStreamResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder) + return + } + + var logLevel log.Level + if req.LogLevel == "" { + logLevel = log.LevelFromString("INFO") + } else { + logLevel = log.LevelFromString(req.LogLevel) + } + + if logLevel == log.NoLevel { + handleStreamResultError(errors.New("Unknown log level"), helper.Int64ToPtr(400), encoder) + return + } + + // var buf bytes.Buffer + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + streamWriter := newStreamWriter(512) + + streamLog := log.New(&log.LoggerOptions{ + Level: logLevel, + Output: streamWriter, + }) + m.c.logger.RegisterSink(streamLog) + defer m.c.logger.DeregisterSink(streamLog) + + go func() { + for { + if _, err := conn.Read(nil); err != nil { + // One end of the pipe was explicitly closed, exit cleanly + cancel() + return + } + select { + case <-ctx.Done(): + return + } + } + }() + + var streamErr error +OUTER: + for { + select { + case log := <-streamWriter.logCh: + var resp cstructs.StreamErrWrapper + if err := encoder.Encode(resp); err != nil { + streamErr = err + break OUTER + } + resp.Payload = []byte(log) + encoder.Reset(conn) + case <-ctx.Done(): + break OUTER + } + } + + if streamErr != nil { + handleStreamResultError(streamErr, helper.Int64ToPtr(500), encoder) + return + } + +} + +type streamWriter struct { + sync.Mutex + logs []string + logCh chan string + index int + droppedCount int +} + +func newStreamWriter(buf int) *streamWriter { + return &streamWriter{ + logs: make([]string, buf), + logCh: make(chan string, buf), + index: 0, + } +} + +func (d *streamWriter) Write(p []byte) (n int, err error) { + d.Lock() + defer d.Unlock() + + // Strip off newlines at the end if there are any since we store + // individual log lines in the agent. + n = len(p) + if p[n-1] == '\n' { + p = p[:n-1] + } + + d.logs[d.index] = string(p) + d.index = (d.index + 1) % len(d.logs) + + select { + case d.logCh <- string(p): + default: + d.droppedCount++ + } + return +} diff --git a/client/rpc.go b/client/rpc.go index beaec6f2c..6a3b83717 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -23,6 +23,7 @@ type rpcEndpoints struct { ClientStats *ClientStats FileSystem *FileSystem Allocations *Allocations + Monitor *Monitor } // ClientRPC is used to make a local, client only RPC call @@ -218,6 +219,7 @@ func (c *Client) setupClientRpc() { c.endpoints.ClientStats = &ClientStats{c} c.endpoints.FileSystem = NewFileSystemEndpoint(c) c.endpoints.Allocations = NewAllocationsEndpoint(c) + c.endpoints.Monitor = NewMonitorEndpoint(c) // Create the RPC Server c.rpcServer = rpc.NewServer() @@ -234,6 +236,7 @@ func (c *Client) setupClientRpcServer(server *rpc.Server) { server.Register(c.endpoints.ClientStats) server.Register(c.endpoints.FileSystem) server.Register(c.endpoints.Allocations) + server.Register(c.endpoints.Monitor) } // rpcConnListener is a long lived function that listens for new connections diff --git a/client/structs/structs.go b/client/structs/structs.go index eff8ceaf3..6a6206744 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -34,6 +34,16 @@ type ClientStatsResponse struct { structs.QueryMeta } +type MonitorRequest struct { + // LogLevel is the log level filter we want to stream logs on + LogLevel string + + // LogJSON specifies if log format should be unstructured or json + LogJSON bool + + structs.QueryOptions +} + // AllocFileInfo holds information about a file inside the AllocDir type AllocFileInfo struct { Name string diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index ac5597dfb..46401fc61 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -1,19 +1,25 @@ package agent import ( + "bytes" + "context" "encoding/json" "fmt" + "io" "net" "net/http" "sort" "strings" "sync" + "github.com/docker/docker/pkg/ioutils" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/acl" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/serf/serf" "github.com/mitchellh/copystructure" + "github.com/ugorji/go/codec" ) type Member struct { @@ -169,44 +175,144 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( return nil, CodedError(400, fmt.Sprintf("Unknown log level: %s", logLevel)) } - // Create flusher for streaming - flusher, ok := resp.(http.Flusher) - if !ok { - return nil, CodedError(400, "Streaming not supported") - } + // START - streamWriter := newStreamWriter(512) + // Determine if we are targeting a server or client + nodeID := req.URL.Query().Get("nodeID") + if nodeID != "" { - streamLog := log.New(&log.LoggerOptions{ - Level: log.LevelFromString(logLevel), - Output: streamWriter, - }) - s.agent.logger.RegisterSink(streamLog) - defer s.agent.logger.DeregisterSink(streamLog) + // Build the request and parse the ACL token + args := cstructs.MonitorRequest{ + LogLevel: logLevel, + LogJSON: false, + } + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) - notify := resp.(http.CloseNotifier).CloseNotify() + // Determine the handler to use + useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForNode(nodeID) - // Send header so client can start streaming body - resp.WriteHeader(http.StatusOK) + // Make the RPC + var handler structs.StreamingRpcHandler + var handlerErr error + if useLocalClient { + handler, handlerErr = s.agent.Client().StreamingRpcHandler("Client.Monitor") + } else if useClientRPC { + handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler("Client.Monitor") + } else if useServerRPC { + handler, handlerErr = s.agent.Server().StreamingRpcHandler("Client.Monitor") + } else { + handlerErr = CodedError(400, "No local Node and node_id not provided") + } - // 0 byte write is needed before the Flush call so that if we are using - // a gzip stream it will go ahead and write out the HTTP response header - resp.Write([]byte("")) - flusher.Flush() + if handlerErr != nil { + return nil, CodedError(500, handlerErr.Error()) + } + httpPipe, handlerPipe := net.Pipe() + decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle) + encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle) - for { - select { - case <-notify: - s.agent.logger.DeregisterSink(streamLog) - if streamWriter.droppedCount > 0 { - s.agent.logger.Warn(fmt.Sprintf("agent: Dropped %d logs during monitor request", streamWriter.droppedCount)) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-ctx.Done() + httpPipe.Close() + }() + + // Create an ouput that gets flushed on every write + output := ioutils.NewWriteFlusher(resp) + + // Create a channel that decodes the results + errCh := make(chan HTTPCodedError, 2) + + // stream the response + go func() { + defer cancel() + + // Send the request + if err := encoder.Encode(args); err != nil { + errCh <- CodedError(500, err.Error()) + return + } + + for { + select { + case <-ctx.Done(): + errCh <- nil + return + default: + } + + var res cstructs.StreamErrWrapper + if err := decoder.Decode(&res); err != nil { + errCh <- CodedError(500, err.Error()) + return + } + decoder.Reset(httpPipe) + + if err := res.Error; err != nil { + if err.Code != nil { + errCh <- CodedError(int(*err.Code), err.Error()) + return + } + } + + if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil { + errCh <- CodedError(500, err.Error()) + return + } + } + }() + + handler(handlerPipe) + cancel() + codedErr := <-errCh + + if codedErr != nil && + (codedErr == io.EOF || + strings.Contains(codedErr.Error(), "closed") || + strings.Contains(codedErr.Error(), "EOF")) { + codedErr = nil + } + return nil, codedErr + } else { + // Create flusher for streaming + flusher, ok := resp.(http.Flusher) + if !ok { + return nil, CodedError(400, "Streaming not supported") + } + + streamWriter := newStreamWriter(512) + streamLog := log.New(&log.LoggerOptions{ + Level: log.LevelFromString(logLevel), + Output: streamWriter, + }) + s.agent.logger.RegisterSink(streamLog) + defer s.agent.logger.DeregisterSink(streamLog) + + notify := resp.(http.CloseNotifier).CloseNotify() + + // Send header so client can start streaming body + resp.WriteHeader(http.StatusOK) + // gziphanlder needs a byte to be written and flushed in order + // to tell gzip handler to ignore this response and not compress + resp.Write([]byte("\n")) + flusher.Flush() + + for { + select { + case <-notify: + s.agent.logger.DeregisterSink(streamLog) + if streamWriter.droppedCount > 0 { + s.agent.logger.Warn(fmt.Sprintf("Dropped %d logs during monitor request", streamWriter.droppedCount)) + } + return nil, nil + case log := <-streamWriter.logCh: + fmt.Fprintln(resp, log) + flusher.Flush() } - return nil, nil - case log := <-streamWriter.logCh: - fmt.Fprintln(resp, log) - flusher.Flush() } } + + return nil, nil } type streamWriter struct { diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index 9909e3162..50c4a0d25 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -255,7 +255,7 @@ func TestHTTP_AgentMonitor(t *testing.T) { httpTest(t, nil, func(s *TestAgent) { { - req, err := http.NewRequest("GET", "/v1/agent/monitor?loglevel=unkown", nil) + req, err := http.NewRequest("GET", "/v1/agent/monitor?loglevel=unknown", nil) require.Nil(t, err) resp := newClosableRecorder() @@ -301,6 +301,42 @@ func TestHTTP_AgentMonitor(t *testing.T) { require.Fail(t, err.Error()) }) } + + // stream logs for a given node + { + req, err := http.NewRequest("GET", "/v1/agent/monitor?loglevel=warn&nodeID="+s.client.NodeID(), nil) + require.Nil(t, err) + resp := newClosableRecorder() + defer resp.Close() + + go func() { + _, err = s.Server.AgentMonitor(resp, req) + require.NoError(t, err) + }() + + // send the same log a few times until monitor sink is + // fully set up + maxLogAttempts := 10 + tried := 0 + testutil.WaitForResult(func() (bool, error) { + if tried < maxLogAttempts { + s.Server.logger.Debug("log that should not be sent") + s.Server.logger.Warn("log that should be sent") + tried++ + } + + got := resp.Body.String() + want := "[WARN] http: log that should be sent" + if strings.Contains(got, want) { + require.NotContains(t, resp.Body.String(), "[DEBUG]") + return true, nil + } + + return false, fmt.Errorf("missing expected log, got: %v, want: %v", got, want) + }, func(err error) { + require.Fail(t, err.Error()) + }) + } }) } diff --git a/command/agent/command.go b/command/agent/command.go index c0a441e5e..af20fb0ff 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -373,8 +373,8 @@ func (c *Command) isValidConfig(config, cmdConfig *Config) bool { return true } -// setupLoggers is used to setup the logGate, logWriter, and our logOutput -func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, *logWriter, io.Writer) { +// setupLoggers is used to setup the logGate, and our logOutput +func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, io.Writer) { // Setup logging. First create the gated log writer, which will // store logs until we're ready to show them. Then create the level // filter, filtering logs of the specified level. @@ -390,19 +390,18 @@ func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, *logWriter, c.Ui.Error(fmt.Sprintf( "Invalid log level: %s. Valid log levels are: %v", c.logFilter.MinLevel, c.logFilter.Levels)) - return nil, nil, nil + return nil, nil } // Create a log writer, and wrap a logOutput around it - logWriter := NewLogWriter(512) - writers := []io.Writer{c.logFilter, logWriter} + writers := []io.Writer{c.logFilter} // Check if syslog is enabled if config.EnableSyslog { l, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, config.SyslogFacility, "nomad") if err != nil { c.Ui.Error(fmt.Sprintf("Syslog setup failed: %v", err)) - return nil, nil, nil + return nil, nil } writers = append(writers, &SyslogWrapper{l, c.logFilter}) } @@ -422,7 +421,7 @@ func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, *logWriter, duration, err := time.ParseDuration(config.LogRotateDuration) if err != nil { c.Ui.Error(fmt.Sprintf("Failed to parse log rotation duration: %v", err)) - return nil, nil, nil + return nil, nil } logRotateDuration = duration } else { @@ -444,7 +443,7 @@ func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, *logWriter, c.logOutput = io.MultiWriter(writers...) log.SetOutput(c.logOutput) - return logGate, logWriter, c.logOutput + return logGate, c.logOutput } // setupAgent is used to start the agent and various interfaces @@ -597,7 +596,7 @@ func (c *Command) Run(args []string) int { } // Setup the log outputs - logGate, _, logOutput := c.setupLoggers(config) + logGate, logOutput := c.setupLoggers(config) if logGate == nil { return 1 } diff --git a/command/agent/http.go b/command/agent/http.go index 045a2fa49..1ee6540c4 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -108,7 +108,7 @@ func NewHTTPServer(agent *Agent, config *Config) (*HTTPServer, error) { // Handle requests with gzip compression // Use MinSize of 1 to allow a zero byte flush to return // response header used for streaming - gzip, err := gziphandler.GzipHandlerWithOpts(gziphandler.MinSize(1)) + gzip, err := gziphandler.GzipHandlerWithOpts(gziphandler.MinSize(0)) if err != nil { return nil, err } diff --git a/command/agent/log_writer.go b/command/agent/log_writer.go deleted file mode 100644 index ebb96878b..000000000 --- a/command/agent/log_writer.go +++ /dev/null @@ -1,83 +0,0 @@ -package agent - -import ( - "sync" -) - -// LogHandler interface is used for clients that want to subscribe -// to logs, for example to stream them over an IPC mechanism -type LogHandler interface { - HandleLog(string) -} - -// logWriter implements io.Writer so it can be used as a log sink. -// It maintains a circular buffer of logs, and a set of handlers to -// which it can stream the logs to. -type logWriter struct { - sync.Mutex - logs []string - index int - handlers map[LogHandler]struct{} -} - -// NewLogWriter creates a logWriter with the given buffer capacity -func NewLogWriter(buf int) *logWriter { - return &logWriter{ - logs: make([]string, buf), - index: 0, - handlers: make(map[LogHandler]struct{}), - } -} - -// RegisterHandler adds a log handler to receive logs, and sends -// the last buffered logs to the handler -func (l *logWriter) RegisterHandler(lh LogHandler) { - l.Lock() - defer l.Unlock() - - // Do nothing if already registered - if _, ok := l.handlers[lh]; ok { - return - } - - // Register - l.handlers[lh] = struct{}{} - - // Send the old logs - if l.logs[l.index] != "" { - for i := l.index; i < len(l.logs); i++ { - lh.HandleLog(l.logs[i]) - } - } - for i := 0; i < l.index; i++ { - lh.HandleLog(l.logs[i]) - } -} - -// DeregisterHandler removes a LogHandler and prevents more invocations -func (l *logWriter) DeregisterHandler(lh LogHandler) { - l.Lock() - defer l.Unlock() - delete(l.handlers, lh) -} - -// Write is used to accumulate new logs -func (l *logWriter) Write(p []byte) (n int, err error) { - l.Lock() - defer l.Unlock() - - // Strip off newlines at the end if there are any since we store - // individual log lines in the agent. - n = len(p) - if p[n-1] == '\n' { - p = p[:n-1] - } - - l.logs[l.index] = string(p) - l.index = (l.index + 1) % len(l.logs) - - for lh := range l.handlers { - lh.HandleLog(string(p)) - } - return -} diff --git a/command/agent/log_writer_test.go b/command/agent/log_writer_test.go deleted file mode 100644 index 19c23c573..000000000 --- a/command/agent/log_writer_test.go +++ /dev/null @@ -1,52 +0,0 @@ -package agent - -import ( - "testing" -) - -type MockLogHandler struct { - logs []string -} - -func (m *MockLogHandler) HandleLog(l string) { - m.logs = append(m.logs, l) -} - -func TestLogWriter(t *testing.T) { - t.Parallel() - h := &MockLogHandler{} - w := NewLogWriter(4) - - // Write some logs - w.Write([]byte("one")) // Gets dropped! - w.Write([]byte("two")) - w.Write([]byte("three")) - w.Write([]byte("four")) - w.Write([]byte("five")) - - // Register a handler, sends old! - w.RegisterHandler(h) - - w.Write([]byte("six")) - w.Write([]byte("seven")) - - // Deregister - w.DeregisterHandler(h) - - w.Write([]byte("eight")) - w.Write([]byte("nine")) - - out := []string{ - "two", - "three", - "four", - "five", - "six", - "seven", - } - for idx := range out { - if out[idx] != h.logs[idx] { - t.Fatalf("mismatch %v", h.logs) - } - } -} diff --git a/command/agent_monitor.go b/command/agent_monitor.go index aeffc172e..1d0341c1e 100644 --- a/command/agent_monitor.go +++ b/command/agent_monitor.go @@ -6,6 +6,8 @@ import ( "os/signal" "strings" "syscall" + + "github.com/mitchellh/cli" ) type MonitorCommand struct { @@ -35,11 +37,19 @@ func (c *MonitorCommand) Synopsis() string { func (c *MonitorCommand) Name() string { return "monitor" } func (c *MonitorCommand) Run(args []string) int { - var logLevel string + c.Ui = &cli.PrefixedUi{ + OutputPrefix: " ", + InfoPrefix: " ", + ErrorPrefix: "==> ", + Ui: c.Ui, + } + var logLevel string + var nodeID string flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.StringVar(&logLevel, "log-level", "", "") + flags.StringVar(&nodeID, "node-id", "", "") if err := flags.Parse(args); err != nil { return 1 @@ -53,7 +63,7 @@ func (c *MonitorCommand) Run(args []string) int { } eventDoneCh := make(chan struct{}) - logCh, err := client.Agent().Monitor(logLevel, eventDoneCh, nil) + logCh, err := client.Agent().Monitor(logLevel, nodeID, eventDoneCh, nil) if err != nil { c.Ui.Error(fmt.Sprintf("Error starting monitor: %s", err)) c.Ui.Error(commandErrorText(c)) diff --git a/vendor/vendor.json b/vendor/vendor.json index 275933b8b..95ef75246 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -20,7 +20,7 @@ {"path":"github.com/Microsoft/go-winio/pkg/guid","checksumSHA1":"/ykkyb7gmtZC68n7T24xwbmlCBc=","origin":"github.com/endocrimes/go-winio/pkg/guid","revision":"fb47a8b419480a700368c176bc1d5d7e3393b98d","revisionTime":"2019-06-20T17:03:19Z","version":"dani/safe-relisten","versionExact":"dani/safe-relisten"}, {"path":"github.com/NVIDIA/gpu-monitoring-tools","checksumSHA1":"kF1vk+8Xvb3nGBiw9+qbUc0SZ4M=","revision":"86f2a9fac6c5b597dc494420005144b8ef7ec9fb","revisionTime":"2018-08-29T22:20:09Z"}, {"path":"github.com/NVIDIA/gpu-monitoring-tools/bindings/go/nvml","checksumSHA1":"P8FATSSgpe5A17FyPrGpsX95Xw8=","revision":"86f2a9fac6c5b597dc494420005144b8ef7ec9fb","revisionTime":"2018-08-29T22:20:09Z"}, - {"path":"github.com/NYTimes/gziphandler","checksumSHA1":"Ylaw7hBEShLk8L5U89e7l6OKWKo=","revision":"dd0439581c7657cb652dfe5c71d7d48baf39541d","revisionTime":"2019-02-21T23:16:47Z"}, + {"path":"github.com/NYTimes/gziphandler","checksumSHA1":"Ylaw7hBEShLk8L5U89e7l6OKWKo=","revision":"dd0439581c7657cb652dfe5c71d7d48baf39541d","revisionTime":"2019-02-21T23:16:47Z","version":"master","versionExact":"master"}, {"path":"github.com/Nvveen/Gotty","checksumSHA1":"Aqy8/FoAIidY/DeQ5oTYSZ4YFVc=","revision":"cd527374f1e5bff4938207604a14f2e38a9cf512","revisionTime":"2012-06-04T00:48:16Z"}, {"path":"github.com/StackExchange/wmi","checksumSHA1":"qtjd74+bErubh+qyv3s+lWmn9wc=","revision":"ea383cf3ba6ec950874b8486cd72356d007c768f","revisionTime":"2017-04-10T19:29:09Z"}, {"path":"github.com/agext/levenshtein","checksumSHA1":"jQh1fnoKPKMURvKkpdRjN695nAQ=","revision":"5f10fee965225ac1eecdc234c09daf5cd9e7f7b6","revisionTime":"2017-02-17T06:30:20Z"},