diff --git a/client/client.go b/client/client.go index 05f5afc93..8a17b414b 100644 --- a/client/client.go +++ b/client/client.go @@ -81,10 +81,6 @@ const ( // allocSyncRetryIntv is the interval on which we retry updating // the status of the allocation allocSyncRetryIntv = 5 * time.Second - - // nodeEventsEmitIntv is the interval at which node events are synced with - // the server - nodeEventsEmitIntv = 3 * time.Second ) // ClientStatsReporter exposes all the APIs related to resource usage of a Nomad @@ -1062,7 +1058,7 @@ func (c *Client) registerAndHeartbeat() { go c.watchNodeUpdates() // Start watching for emitting node events - go c.watchEmitEvents() + go c.watchNodeEvents() // Setup the heartbeat timer, for the initial registration // we want to do this quickly. We want to do it extra quickly @@ -1147,7 +1143,7 @@ func (c *Client) run() { // these kinds of events include when a driver moves from healthy to unhealthy // (and vice versa) func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error { - nodeID := c.Node().ID + nodeID := c.NodeID() nodeEvents := map[string][]*structs.NodeEvent{ nodeID: events, } @@ -1159,44 +1155,42 @@ func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error { if err := c.RPC("Node.EmitEvents", &req, &resp); err != nil { return fmt.Errorf("Emitting node event failed: %v", err) } - c.logger.Printf("[INFO] client: emit node events complete") return nil } -// watchEmitEvents is a handler which receives node events and on a interval and -// submits them in batch format to the server -func (c *Client) watchEmitEvents() { - batchEvents := make([]*structs.NodeEvent, 0) +// watchNodeEvents is a handler which receives node events and on a interval +// and submits them in batch format to the server +func (c *Client) watchNodeEvents() { + // batchEvents stores events that have yet to be published + var batchEvents []*structs.NodeEvent - timer := time.NewTimer(c.retryIntv(nodeEventsEmitIntv)) + // Create and drain the timer + timer := time.NewTimer(0) + timer.Stop() + select { + case <-timer.C: + default: + } defer timer.Stop() for { select { case event := <-c.triggerEmitNodeEvent: - batchEvents = append(batchEvents, event) - - case <-timer.C: + if l := len(batchEvents); l <= structs.MaxRetainedNodeEvents { + batchEvents = append(batchEvents, event) + } else { + // Drop the oldest event + c.logger.Printf("[WARN] client: dropping node event: %v", batchEvents[0]) + batchEvents = append(batchEvents[1:], event) + } timer.Reset(c.retryIntv(nodeUpdateRetryIntv)) - - if len(batchEvents) == 0 { - // if we haven't received any events to emit, continue until the next - // time interval - continue + case <-timer.C: + if err := c.submitNodeEvents(batchEvents); err != nil { + c.logger.Printf("[ERR] client: submitting node events failed: %v", err) + timer.Reset(c.retryIntv(nodeUpdateRetryIntv)) } - - err := c.submitNodeEvents(batchEvents) - if err != nil { - batchEvents = make([]*structs.NodeEvent, 0) - c.logger.Printf("[ERR] client: Failure in thie process of trying to submit node events: %v", err) - } else if len(batchEvents) >= structs.MaxRetainedNodeEvents { - // Truncate list to under 10 - batchEvents = make([]*structs.NodeEvent, 0) - } - case <-c.shutdownCh: return - default: } } } diff --git a/command/node_status.go b/command/node_status.go index bf5fef683..60fd5c358 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -389,9 +389,6 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { } func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { - if !c.verbose { - return - } c.Ui.Output(c.Colorize().Color("\n[bold]Node Events ")) c.outputNodeEvent(node.NodeEvents) @@ -400,14 +397,22 @@ func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) { size := len(events) nodeEvents := make([]string, size+1) - nodeEvents[0] = "Time|Subsystem|Message|Details" + if c.verbose { + nodeEvents[0] = "Time|Subsystem|Message|Details" + } else { + nodeEvents[0] = "Time|Subsystem|Message" + } for i, event := range events { timestamp := formatUnixNanoTime(event.Timestamp) subsystem := event.Subsystem msg := event.Message - details := formatEventDetails(event.Details) - nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s|%s", timestamp, subsystem, msg, details) + if c.verbose { + details := formatEventDetails(event.Details) + nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s|%s", timestamp, subsystem, msg, details) + } else { + nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s", timestamp, subsystem, msg) + } } c.Ui.Output(formatList(nodeEvents)) } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 916c7ee08..ca7c0cb80 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -58,6 +58,12 @@ func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.Em } defer metrics.MeasureSince([]string{"nomad", "client", "emit_event"}, time.Now()) + if args.NodeEvents == nil { + err := fmt.Errorf("No event to add; node event map is nil") + n.srv.logger.Printf("[ERR] nomad.node AddNodeEventsType failed: %v", err) + return err + } + _, index, err := n.srv.raftApply(structs.AddNodeEventsType, args) if err != nil { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 5a774c22b..58399e56b 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3722,8 +3722,7 @@ func (s *StateStore) upsertNodeEvents(index uint64, nodeEvents map[string][]*str } // Copy the existing node - copyNode := new(structs.Node) - *copyNode = *node + copyNode := node.Copy() nodeEvents := node.NodeEvents @@ -3733,7 +3732,7 @@ func (s *StateStore) upsertNodeEvents(index uint64, nodeEvents map[string][]*str // keep node events pruned to below 10 simultaneously if len(nodeEvents) >= structs.MaxRetainedNodeEvents { - delta := len(nodeEvents) - 10 + delta := len(nodeEvents) - structs.MaxRetainedNodeEvents nodeEvents = nodeEvents[delta+1:] } nodeEvents = append(nodeEvents, e) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e9d2e7d49..78b507025 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1188,6 +1188,15 @@ type NodeEvent struct { ModifyIndex uint64 } +func (ne *NodeEvent) String() string { + var details string + for k, v := range ne.Details { + details = fmt.Sprintf("%s: %s", k, v) + } + + return fmt.Sprintf("Message: %s, Subsystem: %s, Details: %s, Timestamp: %d", ne.Message, string(ne.Subsystem), details, ne.Timestamp) +} + // EmitNodeEventsRequest is a request to update the node events source // with a new client-side event type EmitNodeEventsRequest struct { @@ -1221,16 +1230,14 @@ func (n *Node) Copy() *Node { nn.Reserved = nn.Reserved.Copy() nn.Links = helper.CopyMapStringString(nn.Links) nn.Meta = helper.CopyMapStringString(nn.Meta) - nn.NodeEvents = copyNodeEvents(n) + nn.NodeEvents = copyNodeEvents(n.NodeEvents) return nn } -func copyNodeEvents(first *Node) []*NodeEvent { - nodeEvents := make([]*NodeEvent, 0) - for _, e := range first.NodeEvents { - nodeEvents = append(nodeEvents, e) - } - return nodeEvents +func copyNodeEvents(first []*NodeEvent) []*NodeEvent { + second := make([]*NodeEvent, len(first)) + copy(second, first) + return second } // TerminalStatus returns if the current status is terminal and