From 57ddd511ccf2f191e1f816d75a1bc521ae6a2be2 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 13 Mar 2018 17:52:12 -0700 Subject: [PATCH] fixes --- command/node_status.go | 7 +- nomad/fsm.go | 16 +-- nomad/fsm_test.go | 4 +- nomad/node_endpoint.go | 50 ++++----- nomad/state/state_store.go | 122 +++++++++++----------- nomad/state/state_store_test.go | 178 ++++++++++++++++---------------- nomad/structs/structs.go | 119 +++++++++++---------- 7 files changed, 255 insertions(+), 241 deletions(-) diff --git a/command/node_status.go b/command/node_status.go index 60fd5c358..85b25bb76 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -389,7 +389,6 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { } func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { - c.Ui.Output(c.Colorize().Color("\n[bold]Node Events ")) c.outputNodeEvent(node.NodeEvents) } @@ -418,11 +417,11 @@ func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) { } func formatEventDetails(details map[string]string) string { - var output string + output := make([]string, 0, len(details)) for k, v := range details { - output += fmt.Sprintf("%s: %s, ", k, v) + output = append(output, fmt.Sprintf("%s: %s, ", k, v)) } - return output + return strings.Join(output, ", ") } func (c *NodeStatusCommand) formatAttributes(node *api.Node) { diff --git a/nomad/fsm.go b/nomad/fsm.go index 2b6329119..de51dfed6 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -236,8 +236,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyACLTokenBootstrap(buf[1:], log.Index) case structs.AutopilotRequestType: return n.applyAutopilotUpdate(buf[1:], log.Index) - case structs.AddNodeEventsType: - return n.applyAddNodeEventType(buf[1:], log.Index) + case structs.UpsertNodeEventsType: + return n.applyUpsertNodeEventType(buf[1:], log.Index) } // Check enterprise only message types. @@ -630,17 +630,17 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} return n.reconcileQueuedAllocations(index) } -// applyAddNodeEventType applies a node event to the set of currently-available -// events. -func (n *nomadFSM) applyAddNodeEventType(buf []byte, index uint64) interface{} { +// applyUpsertNodeEventType tracks the given node events. +func (n *nomadFSM) applyUpsertNodeEventType(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now()) var req structs.EmitNodeEventsRequest if err := structs.Decode(buf, &req); err != nil { - n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventRequest: %v", err) + n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventsRequest: %v", err) return err } - if err := n.state.AddNodeEvent(index, req.NodeEvents); err != nil { - n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err) + if err := n.state.UpsertNodeEvents(index, req.NodeEvents); err != nil { + n.logger.Printf("[ERR] nomad.fsm: failed to add node events: %v", err) return err } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 5d316cb66..6205c7038 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -74,7 +74,7 @@ func makeLog(buf []byte) *raft.Log { } } -func TestFSM_ApplyNodeEvent(t *testing.T) { +func TestFSM_UpsertNodeEvents(t *testing.T) { t.Parallel() require := require.New(t) fsm := testFSM(t) @@ -100,7 +100,7 @@ func TestFSM_ApplyNodeEvent(t *testing.T) { NodeEvents: allEvents, WriteRequest: structs.WriteRequest{Region: "global"}, } - buf, err := structs.Encode(structs.AddNodeEventsType, req) + buf, err := structs.Encode(structs.UpsertNodeEventsType, req) require.Nil(err) // the response in this case will be an error diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index ca7c0cb80..5b78190c7 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -52,29 +52,6 @@ type Node struct { updatesLock sync.Mutex } -func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error { - if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done { - return err - } - defer metrics.MeasureSince([]string{"nomad", "client", "emit_event"}, time.Now()) - - if args.NodeEvents == nil { - err := fmt.Errorf("No event to add; node event map is nil") - n.srv.logger.Printf("[ERR] nomad.node AddNodeEventsType failed: %v", err) - return err - } - - _, index, err := n.srv.raftApply(structs.AddNodeEventsType, args) - - if err != nil { - n.srv.logger.Printf("[ERR] nomad.node AddNodeEventsType failed: %v", err) - return err - } - - reply.Index = index - return nil -} - // Register is used to upsert a client that is available for scheduling func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error { if done, err := n.srv.forward("Node.Register", args, args, reply); done { @@ -1380,3 +1357,30 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, n.srv.setQueryMeta(&reply.QueryMeta) return nil } + +func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error { + if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "client", "emit_events"}, time.Now()) + + if len(args.NodeEvents) == 0 { + return fmt.Errorf("no node events given") + } + for nodeID, events := range args.NodeEvents { + if len(events) == 0 { + return fmt.Errorf("no node events given for node %q", nodeID) + } + } + + // TODO ACLs + + _, index, err := n.srv.raftApply(structs.UpsertNodeEventsType, args) + if err != nil { + n.srv.logger.Printf("[ERR] nomad.node upserting node events failed: %v", err) + return err + } + + reply.Index = index + return nil +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 58399e56b..8a6f4ce50 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -537,10 +537,7 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { Subsystem: "Cluster", Timestamp: node.StatusUpdatedAt, } - - node.NodeEvents = make([]*structs.NodeEvent, 0, 1) - node.NodeEvents = append(node.NodeEvents, nodeEvent) - + node.NodeEvents = []*structs.NodeEvent{nodeEvent} node.CreateIndex = index node.ModifyIndex = index } @@ -634,8 +631,7 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er // Copy the existing node existingNode := existing.(*structs.Node) - copyNode := new(structs.Node) - *copyNode = *existingNode + copyNode := existingNode.Copy() // Update the drain in the copy copyNode.Drain = drain @@ -653,6 +649,63 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er return nil } +// UpsertNodeEvents adds the node events to the nodes, rotating events as +// necessary. +func (s *StateStore) UpsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent) error { + txn := s.db.Txn(true) + defer txn.Abort() + + for nodeID, events := range nodeEvents { + if err := s.upsertNodeEvents(index, nodeID, events, txn); err != nil { + return err + } + } + + txn.Commit() + return nil +} + +// upsertNodeEvent upserts a node event for a respective node. It also maintains +// that a fixed number of node events are ever stored simultaneously, deleting +// older events once this bound has been reached. +func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*structs.NodeEvent, txn *memdb.Txn) error { + // Lookup the node + existing, err := txn.First("nodes", "id", nodeID) + if err != nil { + return fmt.Errorf("node lookup failed: %v", err) + } + if existing == nil { + return fmt.Errorf("node not found") + } + + // Copy the existing node + existingNode := existing.(*structs.Node) + copyNode := existingNode.Copy() + + // Add the events, updating the indexes + for _, e := range events { + e.CreateIndex = index + e.ModifyIndex = index + copyNode.NodeEvents = append(copyNode.NodeEvents, e) + } + + // Keep node events pruned to not exceed the max allowed + if l := len(copyNode.NodeEvents); l > structs.MaxRetainedNodeEvents { + delta := l - structs.MaxRetainedNodeEvents + copyNode.NodeEvents = copyNode.NodeEvents[delta:] + } + + // Insert the node + if err := txn.Insert("nodes", copyNode); err != nil { + return fmt.Errorf("node update failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + return nil +} + // NodeByID is used to lookup a node by ID func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) { txn := s.db.Txn(false) @@ -3693,60 +3746,3 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) { } } } - -// addNodeEvent is a function which wraps upsertNodeEvent -func (s *StateStore) AddNodeEvent(index uint64, events map[string][]*structs.NodeEvent) error { - txn := s.db.Txn(true) - defer txn.Abort() - - err := s.upsertNodeEvents(index, events, txn) - txn.Commit() - return err -} - -// upsertNodeEvent upserts a node event for a respective node. It also maintains -// that only 10 node events are ever stored simultaneously, deleting older -// events once this bound has been reached. -func (s *StateStore) upsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent, txn *memdb.Txn) error { - - for nodeID, events := range nodeEvents { - ws := memdb.NewWatchSet() - node, err := s.NodeByID(ws, nodeID) - - if err != nil { - return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err) - } - - if node == nil { - return fmt.Errorf("unable to look up node by id %s to insert node event", nodeID) - } - - // Copy the existing node - copyNode := node.Copy() - - nodeEvents := node.NodeEvents - - for _, e := range events { - e.CreateIndex = index - e.ModifyIndex = index - - // keep node events pruned to below 10 simultaneously - if len(nodeEvents) >= structs.MaxRetainedNodeEvents { - delta := len(nodeEvents) - structs.MaxRetainedNodeEvents - nodeEvents = nodeEvents[delta+1:] - } - nodeEvents = append(nodeEvents, e) - copyNode.NodeEvents = nodeEvents - } - - // Insert the node - if err := txn.Insert("nodes", copyNode); err != nil { - return fmt.Errorf("node update failed: %v", err) - } - if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) - } - } - - return nil -} diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index b6553654f..acba103c4 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -748,6 +748,95 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) { } } +func TestStateStore_AddSingleNodeEvent(t *testing.T) { + require := require.New(t) + state := testStateStore(t) + + node := mock.Node() + + // We create a new node event every time we register a node + err := state.UpsertNode(1000, node) + require.Nil(err) + + require.Equal(1, len(node.NodeEvents)) + require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) + require.Equal("Node Registered", node.NodeEvents[0].Message) + + // Create a watchset so we can test that AddNodeEvent fires the watch + ws := memdb.NewWatchSet() + _, err = state.NodeByID(ws, node.ID) + require.Nil(err) + + nodeEvent := &structs.NodeEvent{ + Message: "failed", + Subsystem: "Driver", + Timestamp: time.Now().Unix(), + } + nodeEvents := map[string][]*structs.NodeEvent{ + node.ID: {nodeEvent}, + } + err = state.UpsertNodeEvents(uint64(1001), nodeEvents) + require.Nil(err) + + require.True(watchFired(ws)) + + ws = memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) + require.Nil(err) + + require.Equal(2, len(out.NodeEvents)) + require.Equal(nodeEvent, out.NodeEvents[1]) +} + +// To prevent stale node events from accumulating, we limit the number of +// stored node events to 10. +func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { + require := require.New(t) + state := testStateStore(t) + + node := mock.Node() + + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + require.Equal(1, len(node.NodeEvents)) + require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) + require.Equal("Node Registered", node.NodeEvents[0].Message) + + var out *structs.Node + for i := 1; i <= 20; i++ { + ws := memdb.NewWatchSet() + out, err = state.NodeByID(ws, node.ID) + require.Nil(err) + + nodeEvent := &structs.NodeEvent{ + Message: fmt.Sprintf("%dith failed", i), + Subsystem: "Driver", + Timestamp: time.Now().Unix(), + } + + nodeEvents := map[string][]*structs.NodeEvent{ + out.ID: {nodeEvent}, + } + err := state.UpsertNodeEvents(uint64(i), nodeEvents) + require.Nil(err) + + require.True(watchFired(ws)) + ws = memdb.NewWatchSet() + out, err = state.NodeByID(ws, node.ID) + require.Nil(err) + } + + ws := memdb.NewWatchSet() + out, err = state.NodeByID(ws, node.ID) + require.Nil(err) + + require.Equal(10, len(out.NodeEvents)) + require.Equal(uint64(11), out.NodeEvents[0].CreateIndex) + require.Equal(uint64(20), out.NodeEvents[len(out.NodeEvents)-1].CreateIndex) +} + func TestStateStore_Nodes(t *testing.T) { state := testStateStore(t) var nodes []*structs.Node @@ -6469,95 +6558,6 @@ func TestStateStore_Abandon(t *testing.T) { } } -func TestStateStore_AddSingleNodeEvent(t *testing.T) { - require := require.New(t) - state := testStateStore(t) - - node := mock.Node() - - // We create a new node event every time we register a node - err := state.UpsertNode(1000, node) - require.Nil(err) - - require.Equal(1, len(node.NodeEvents)) - require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) - require.Equal("Node Registered", node.NodeEvents[0].Message) - - // Create a watchset so we can test that AddNodeEvent fires the watch - ws := memdb.NewWatchSet() - _, err = state.NodeByID(ws, node.ID) - require.Nil(err) - - nodeEvent := &structs.NodeEvent{ - Message: "failed", - Subsystem: "Driver", - Timestamp: time.Now().Unix(), - } - nodeEvents := map[string][]*structs.NodeEvent{ - node.ID: {nodeEvent}, - } - err = state.AddNodeEvent(uint64(1001), nodeEvents) - require.Nil(err) - - require.True(watchFired(ws)) - - ws = memdb.NewWatchSet() - out, err := state.NodeByID(ws, node.ID) - require.Nil(err) - - require.Equal(2, len(out.NodeEvents)) - require.Equal(nodeEvent, out.NodeEvents[1]) -} - -// To prevent stale node events from accumulating, we limit the number of -// stored node events to 10. -func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { - require := require.New(t) - state := testStateStore(t) - - node := mock.Node() - - err := state.UpsertNode(1000, node) - if err != nil { - t.Fatalf("err: %v", err) - } - require.Equal(1, len(node.NodeEvents)) - require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) - require.Equal("Node Registered", node.NodeEvents[0].Message) - - var out *structs.Node - for i := 1; i <= 20; i++ { - ws := memdb.NewWatchSet() - out, err = state.NodeByID(ws, node.ID) - require.Nil(err) - - nodeEvent := &structs.NodeEvent{ - Message: fmt.Sprintf("%dith failed", i), - Subsystem: "Driver", - Timestamp: time.Now().Unix(), - } - - nodeEvents := map[string][]*structs.NodeEvent{ - out.ID: {nodeEvent}, - } - err := state.AddNodeEvent(uint64(i), nodeEvents) - require.Nil(err) - - require.True(watchFired(ws)) - ws = memdb.NewWatchSet() - out, err = state.NodeByID(ws, node.ID) - require.Nil(err) - } - - ws := memdb.NewWatchSet() - out, err = state.NodeByID(ws, node.ID) - require.Nil(err) - - require.Equal(10, len(out.NodeEvents)) - require.Equal(uint64(11), out.NodeEvents[0].CreateIndex) - require.Equal(uint64(20), out.NodeEvents[len(out.NodeEvents)-1].CreateIndex) -} - // watchFired is a helper for unit tests that returns if the given watch set // fired (it doesn't care which watch actually fired). This uses a fixed // timeout since we already expect the event happened before calling this and diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 78b507025..e3fbe9cf7 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -74,7 +74,7 @@ const ( ACLTokenDeleteRequestType ACLTokenBootstrapRequestType AutopilotRequestType - AddNodeEventsType + UpsertNodeEventsType ) const ( @@ -1055,6 +1055,60 @@ type NodeConnQueryResponse struct { QueryMeta } +// EmitNodeEventsRequest is a request to update the node events source +// with a new client-side event +type EmitNodeEventsRequest struct { + // NodeEvents are a map where the key is a node id, and value is a list of + // events for that node + NodeEvents map[string][]*NodeEvent + + WriteRequest +} + +// EmitNodeEventsResponse is a response to the client about the status of +// the node event source update. +type EmitNodeEventsResponse struct { + Index uint64 + WriteMeta +} + +// TODO needs to be a more specific name +// Subsystem denotes the subsystem where a node event took place. +type Subsystem string + +const ( + Drain Subsystem = "Drain" + Driver Subsystem = "Driver" + Heartbeat Subsystem = "Heartbeat" + Cluster Subsystem = "Cluster" +) + +// NodeEvent is a single unit representing a node’s state change +type NodeEvent struct { + Message string + Subsystem Subsystem + Details map[string]string + Timestamp int64 + CreateIndex uint64 + ModifyIndex uint64 +} + +func (ne *NodeEvent) String() string { + var details []string + for k, v := range ne.Details { + details = append(details, fmt.Sprintf("%s: %s", k, v)) + } + + return fmt.Sprintf("Message: %s, Subsystem: %s, Details: %s, Timestamp: %d", ne.Message, string(ne.Subsystem), strings.Join(details, ","), ne.Timestamp) +} + +func (ne *NodeEvent) Copy() *NodeEvent { + c := new(NodeEvent) + *c = *ne + c.Details = helper.CopyMapStringString(ne.Details) + return c +} + const ( NodeStatusInit = "initializing" NodeStatusReady = "ready" @@ -1167,53 +1221,6 @@ type Node struct { ModifyIndex uint64 } -// Subsystem denotes the subsystem where a node event took place. -type Subsystem string - -const ( - Drain Subsystem = "Drain" - Driver Subsystem = "Driver" - Heartbeat Subsystem = "Heartbeat" - Cluster Subsystem = "CLuster" -) - -// NodeEvent is a single unit representing a node’s state change -type NodeEvent struct { - Message string - Subsystem Subsystem - Details map[string]string - Timestamp int64 - - CreateIndex uint64 - ModifyIndex uint64 -} - -func (ne *NodeEvent) String() string { - var details string - for k, v := range ne.Details { - details = fmt.Sprintf("%s: %s", k, v) - } - - return fmt.Sprintf("Message: %s, Subsystem: %s, Details: %s, Timestamp: %d", ne.Message, string(ne.Subsystem), details, ne.Timestamp) -} - -// EmitNodeEventsRequest is a request to update the node events source -// with a new client-side event -type EmitNodeEventsRequest struct { - // NodeEvents are a map where the key is a node id, and value is a list of - // events for that node - NodeEvents map[string][]*NodeEvent - - WriteRequest -} - -// EmitNodeEventsResponse is a response to the client about the status of -// the node event source update. -type EmitNodeEventsResponse struct { - Index uint64 - WriteMeta -} - // Ready returns if the node is ready for running allocations func (n *Node) Ready() bool { return n.Status == NodeStatusReady && !n.Drain @@ -1234,10 +1241,18 @@ func (n *Node) Copy() *Node { return nn } -func copyNodeEvents(first []*NodeEvent) []*NodeEvent { - second := make([]*NodeEvent, len(first)) - copy(second, first) - return second +// copyNodeEvents is a helper to copy a list of NodeEvent's +func copyNodeEvents(events []*NodeEvent) []*NodeEvent { + l := len(events) + if l == 0 { + return nil + } + + c := make([]*NodeEvent, l) + for i, event := range events { + c[i] = event.Copy() + } + return c } // TerminalStatus returns if the current status is terminal and