This commit is contained in:
Alex Dadgar
2018-03-13 17:52:12 -07:00
parent 319f80907c
commit 57ddd511cc
7 changed files with 255 additions and 241 deletions

View File

@@ -389,7 +389,6 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int {
}
func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) {
c.Ui.Output(c.Colorize().Color("\n[bold]Node Events "))
c.outputNodeEvent(node.NodeEvents)
}
@@ -418,11 +417,11 @@ func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) {
}
func formatEventDetails(details map[string]string) string {
var output string
output := make([]string, 0, len(details))
for k, v := range details {
output += fmt.Sprintf("%s: %s, ", k, v)
output = append(output, fmt.Sprintf("%s: %s, ", k, v))
}
return output
return strings.Join(output, ", ")
}
func (c *NodeStatusCommand) formatAttributes(node *api.Node) {

View File

@@ -236,8 +236,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyACLTokenBootstrap(buf[1:], log.Index)
case structs.AutopilotRequestType:
return n.applyAutopilotUpdate(buf[1:], log.Index)
case structs.AddNodeEventsType:
return n.applyAddNodeEventType(buf[1:], log.Index)
case structs.UpsertNodeEventsType:
return n.applyUpsertNodeEventType(buf[1:], log.Index)
}
// Check enterprise only message types.
@@ -630,17 +630,17 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{}
return n.reconcileQueuedAllocations(index)
}
// applyAddNodeEventType applies a node event to the set of currently-available
// events.
func (n *nomadFSM) applyAddNodeEventType(buf []byte, index uint64) interface{} {
// applyUpsertNodeEventType tracks the given node events.
func (n *nomadFSM) applyUpsertNodeEventType(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now())
var req structs.EmitNodeEventsRequest
if err := structs.Decode(buf, &req); err != nil {
n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventRequest: %v", err)
n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventsRequest: %v", err)
return err
}
if err := n.state.AddNodeEvent(index, req.NodeEvents); err != nil {
n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err)
if err := n.state.UpsertNodeEvents(index, req.NodeEvents); err != nil {
n.logger.Printf("[ERR] nomad.fsm: failed to add node events: %v", err)
return err
}

View File

@@ -74,7 +74,7 @@ func makeLog(buf []byte) *raft.Log {
}
}
func TestFSM_ApplyNodeEvent(t *testing.T) {
func TestFSM_UpsertNodeEvents(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
@@ -100,7 +100,7 @@ func TestFSM_ApplyNodeEvent(t *testing.T) {
NodeEvents: allEvents,
WriteRequest: structs.WriteRequest{Region: "global"},
}
buf, err := structs.Encode(structs.AddNodeEventsType, req)
buf, err := structs.Encode(structs.UpsertNodeEventsType, req)
require.Nil(err)
// the response in this case will be an error

View File

@@ -52,29 +52,6 @@ type Node struct {
updatesLock sync.Mutex
}
func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error {
if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "client", "emit_event"}, time.Now())
if args.NodeEvents == nil {
err := fmt.Errorf("No event to add; node event map is nil")
n.srv.logger.Printf("[ERR] nomad.node AddNodeEventsType failed: %v", err)
return err
}
_, index, err := n.srv.raftApply(structs.AddNodeEventsType, args)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.node AddNodeEventsType failed: %v", err)
return err
}
reply.Index = index
return nil
}
// Register is used to upsert a client that is available for scheduling
func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error {
if done, err := n.srv.forward("Node.Register", args, args, reply); done {
@@ -1380,3 +1357,30 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
n.srv.setQueryMeta(&reply.QueryMeta)
return nil
}
func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error {
if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "client", "emit_events"}, time.Now())
if len(args.NodeEvents) == 0 {
return fmt.Errorf("no node events given")
}
for nodeID, events := range args.NodeEvents {
if len(events) == 0 {
return fmt.Errorf("no node events given for node %q", nodeID)
}
}
// TODO ACLs
_, index, err := n.srv.raftApply(structs.UpsertNodeEventsType, args)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.node upserting node events failed: %v", err)
return err
}
reply.Index = index
return nil
}

View File

@@ -537,10 +537,7 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
Subsystem: "Cluster",
Timestamp: node.StatusUpdatedAt,
}
node.NodeEvents = make([]*structs.NodeEvent, 0, 1)
node.NodeEvents = append(node.NodeEvents, nodeEvent)
node.NodeEvents = []*structs.NodeEvent{nodeEvent}
node.CreateIndex = index
node.ModifyIndex = index
}
@@ -634,8 +631,7 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er
// Copy the existing node
existingNode := existing.(*structs.Node)
copyNode := new(structs.Node)
*copyNode = *existingNode
copyNode := existingNode.Copy()
// Update the drain in the copy
copyNode.Drain = drain
@@ -653,6 +649,63 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er
return nil
}
// UpsertNodeEvents adds the node events to the nodes, rotating events as
// necessary.
func (s *StateStore) UpsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()
for nodeID, events := range nodeEvents {
if err := s.upsertNodeEvents(index, nodeID, events, txn); err != nil {
return err
}
}
txn.Commit()
return nil
}
// upsertNodeEvent upserts a node event for a respective node. It also maintains
// that a fixed number of node events are ever stored simultaneously, deleting
// older events once this bound has been reached.
func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*structs.NodeEvent, txn *memdb.Txn) error {
// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
if err != nil {
return fmt.Errorf("node lookup failed: %v", err)
}
if existing == nil {
return fmt.Errorf("node not found")
}
// Copy the existing node
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()
// Add the events, updating the indexes
for _, e := range events {
e.CreateIndex = index
e.ModifyIndex = index
copyNode.NodeEvents = append(copyNode.NodeEvents, e)
}
// Keep node events pruned to not exceed the max allowed
if l := len(copyNode.NodeEvents); l > structs.MaxRetainedNodeEvents {
delta := l - structs.MaxRetainedNodeEvents
copyNode.NodeEvents = copyNode.NodeEvents[delta:]
}
// Insert the node
if err := txn.Insert("nodes", copyNode); err != nil {
return fmt.Errorf("node update failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
return nil
}
// NodeByID is used to lookup a node by ID
func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) {
txn := s.db.Txn(false)
@@ -3693,60 +3746,3 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) {
}
}
}
// addNodeEvent is a function which wraps upsertNodeEvent
func (s *StateStore) AddNodeEvent(index uint64, events map[string][]*structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()
err := s.upsertNodeEvents(index, events, txn)
txn.Commit()
return err
}
// upsertNodeEvent upserts a node event for a respective node. It also maintains
// that only 10 node events are ever stored simultaneously, deleting older
// events once this bound has been reached.
func (s *StateStore) upsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent, txn *memdb.Txn) error {
for nodeID, events := range nodeEvents {
ws := memdb.NewWatchSet()
node, err := s.NodeByID(ws, nodeID)
if err != nil {
return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err)
}
if node == nil {
return fmt.Errorf("unable to look up node by id %s to insert node event", nodeID)
}
// Copy the existing node
copyNode := node.Copy()
nodeEvents := node.NodeEvents
for _, e := range events {
e.CreateIndex = index
e.ModifyIndex = index
// keep node events pruned to below 10 simultaneously
if len(nodeEvents) >= structs.MaxRetainedNodeEvents {
delta := len(nodeEvents) - structs.MaxRetainedNodeEvents
nodeEvents = nodeEvents[delta+1:]
}
nodeEvents = append(nodeEvents, e)
copyNode.NodeEvents = nodeEvents
}
// Insert the node
if err := txn.Insert("nodes", copyNode); err != nil {
return fmt.Errorf("node update failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}
return nil
}

View File

@@ -748,6 +748,95 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) {
}
}
func TestStateStore_AddSingleNodeEvent(t *testing.T) {
require := require.New(t)
state := testStateStore(t)
node := mock.Node()
// We create a new node event every time we register a node
err := state.UpsertNode(1000, node)
require.Nil(err)
require.Equal(1, len(node.NodeEvents))
require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem)
require.Equal("Node Registered", node.NodeEvents[0].Message)
// Create a watchset so we can test that AddNodeEvent fires the watch
ws := memdb.NewWatchSet()
_, err = state.NodeByID(ws, node.ID)
require.Nil(err)
nodeEvent := &structs.NodeEvent{
Message: "failed",
Subsystem: "Driver",
Timestamp: time.Now().Unix(),
}
nodeEvents := map[string][]*structs.NodeEvent{
node.ID: {nodeEvent},
}
err = state.UpsertNodeEvents(uint64(1001), nodeEvents)
require.Nil(err)
require.True(watchFired(ws))
ws = memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.Nil(err)
require.Equal(2, len(out.NodeEvents))
require.Equal(nodeEvent, out.NodeEvents[1])
}
// To prevent stale node events from accumulating, we limit the number of
// stored node events to 10.
func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) {
require := require.New(t)
state := testStateStore(t)
node := mock.Node()
err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}
require.Equal(1, len(node.NodeEvents))
require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem)
require.Equal("Node Registered", node.NodeEvents[0].Message)
var out *structs.Node
for i := 1; i <= 20; i++ {
ws := memdb.NewWatchSet()
out, err = state.NodeByID(ws, node.ID)
require.Nil(err)
nodeEvent := &structs.NodeEvent{
Message: fmt.Sprintf("%dith failed", i),
Subsystem: "Driver",
Timestamp: time.Now().Unix(),
}
nodeEvents := map[string][]*structs.NodeEvent{
out.ID: {nodeEvent},
}
err := state.UpsertNodeEvents(uint64(i), nodeEvents)
require.Nil(err)
require.True(watchFired(ws))
ws = memdb.NewWatchSet()
out, err = state.NodeByID(ws, node.ID)
require.Nil(err)
}
ws := memdb.NewWatchSet()
out, err = state.NodeByID(ws, node.ID)
require.Nil(err)
require.Equal(10, len(out.NodeEvents))
require.Equal(uint64(11), out.NodeEvents[0].CreateIndex)
require.Equal(uint64(20), out.NodeEvents[len(out.NodeEvents)-1].CreateIndex)
}
func TestStateStore_Nodes(t *testing.T) {
state := testStateStore(t)
var nodes []*structs.Node
@@ -6469,95 +6558,6 @@ func TestStateStore_Abandon(t *testing.T) {
}
}
func TestStateStore_AddSingleNodeEvent(t *testing.T) {
require := require.New(t)
state := testStateStore(t)
node := mock.Node()
// We create a new node event every time we register a node
err := state.UpsertNode(1000, node)
require.Nil(err)
require.Equal(1, len(node.NodeEvents))
require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem)
require.Equal("Node Registered", node.NodeEvents[0].Message)
// Create a watchset so we can test that AddNodeEvent fires the watch
ws := memdb.NewWatchSet()
_, err = state.NodeByID(ws, node.ID)
require.Nil(err)
nodeEvent := &structs.NodeEvent{
Message: "failed",
Subsystem: "Driver",
Timestamp: time.Now().Unix(),
}
nodeEvents := map[string][]*structs.NodeEvent{
node.ID: {nodeEvent},
}
err = state.AddNodeEvent(uint64(1001), nodeEvents)
require.Nil(err)
require.True(watchFired(ws))
ws = memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.Nil(err)
require.Equal(2, len(out.NodeEvents))
require.Equal(nodeEvent, out.NodeEvents[1])
}
// To prevent stale node events from accumulating, we limit the number of
// stored node events to 10.
func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) {
require := require.New(t)
state := testStateStore(t)
node := mock.Node()
err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}
require.Equal(1, len(node.NodeEvents))
require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem)
require.Equal("Node Registered", node.NodeEvents[0].Message)
var out *structs.Node
for i := 1; i <= 20; i++ {
ws := memdb.NewWatchSet()
out, err = state.NodeByID(ws, node.ID)
require.Nil(err)
nodeEvent := &structs.NodeEvent{
Message: fmt.Sprintf("%dith failed", i),
Subsystem: "Driver",
Timestamp: time.Now().Unix(),
}
nodeEvents := map[string][]*structs.NodeEvent{
out.ID: {nodeEvent},
}
err := state.AddNodeEvent(uint64(i), nodeEvents)
require.Nil(err)
require.True(watchFired(ws))
ws = memdb.NewWatchSet()
out, err = state.NodeByID(ws, node.ID)
require.Nil(err)
}
ws := memdb.NewWatchSet()
out, err = state.NodeByID(ws, node.ID)
require.Nil(err)
require.Equal(10, len(out.NodeEvents))
require.Equal(uint64(11), out.NodeEvents[0].CreateIndex)
require.Equal(uint64(20), out.NodeEvents[len(out.NodeEvents)-1].CreateIndex)
}
// watchFired is a helper for unit tests that returns if the given watch set
// fired (it doesn't care which watch actually fired). This uses a fixed
// timeout since we already expect the event happened before calling this and

View File

@@ -74,7 +74,7 @@ const (
ACLTokenDeleteRequestType
ACLTokenBootstrapRequestType
AutopilotRequestType
AddNodeEventsType
UpsertNodeEventsType
)
const (
@@ -1055,6 +1055,60 @@ type NodeConnQueryResponse struct {
QueryMeta
}
// EmitNodeEventsRequest is a request to update the node events source
// with a new client-side event
type EmitNodeEventsRequest struct {
// NodeEvents are a map where the key is a node id, and value is a list of
// events for that node
NodeEvents map[string][]*NodeEvent
WriteRequest
}
// EmitNodeEventsResponse is a response to the client about the status of
// the node event source update.
type EmitNodeEventsResponse struct {
Index uint64
WriteMeta
}
// TODO needs to be a more specific name
// Subsystem denotes the subsystem where a node event took place.
type Subsystem string
const (
Drain Subsystem = "Drain"
Driver Subsystem = "Driver"
Heartbeat Subsystem = "Heartbeat"
Cluster Subsystem = "Cluster"
)
// NodeEvent is a single unit representing a nodes state change
type NodeEvent struct {
Message string
Subsystem Subsystem
Details map[string]string
Timestamp int64
CreateIndex uint64
ModifyIndex uint64
}
func (ne *NodeEvent) String() string {
var details []string
for k, v := range ne.Details {
details = append(details, fmt.Sprintf("%s: %s", k, v))
}
return fmt.Sprintf("Message: %s, Subsystem: %s, Details: %s, Timestamp: %d", ne.Message, string(ne.Subsystem), strings.Join(details, ","), ne.Timestamp)
}
func (ne *NodeEvent) Copy() *NodeEvent {
c := new(NodeEvent)
*c = *ne
c.Details = helper.CopyMapStringString(ne.Details)
return c
}
const (
NodeStatusInit = "initializing"
NodeStatusReady = "ready"
@@ -1167,53 +1221,6 @@ type Node struct {
ModifyIndex uint64
}
// Subsystem denotes the subsystem where a node event took place.
type Subsystem string
const (
Drain Subsystem = "Drain"
Driver Subsystem = "Driver"
Heartbeat Subsystem = "Heartbeat"
Cluster Subsystem = "CLuster"
)
// NodeEvent is a single unit representing a nodes state change
type NodeEvent struct {
Message string
Subsystem Subsystem
Details map[string]string
Timestamp int64
CreateIndex uint64
ModifyIndex uint64
}
func (ne *NodeEvent) String() string {
var details string
for k, v := range ne.Details {
details = fmt.Sprintf("%s: %s", k, v)
}
return fmt.Sprintf("Message: %s, Subsystem: %s, Details: %s, Timestamp: %d", ne.Message, string(ne.Subsystem), details, ne.Timestamp)
}
// EmitNodeEventsRequest is a request to update the node events source
// with a new client-side event
type EmitNodeEventsRequest struct {
// NodeEvents are a map where the key is a node id, and value is a list of
// events for that node
NodeEvents map[string][]*NodeEvent
WriteRequest
}
// EmitNodeEventsResponse is a response to the client about the status of
// the node event source update.
type EmitNodeEventsResponse struct {
Index uint64
WriteMeta
}
// Ready returns if the node is ready for running allocations
func (n *Node) Ready() bool {
return n.Status == NodeStatusReady && !n.Drain
@@ -1234,10 +1241,18 @@ func (n *Node) Copy() *Node {
return nn
}
func copyNodeEvents(first []*NodeEvent) []*NodeEvent {
second := make([]*NodeEvent, len(first))
copy(second, first)
return second
// copyNodeEvents is a helper to copy a list of NodeEvent's
func copyNodeEvents(events []*NodeEvent) []*NodeEvent {
l := len(events)
if l == 0 {
return nil
}
c := make([]*NodeEvent, l)
for i, event := range events {
c[i] = event.Copy()
}
return c
}
// TerminalStatus returns if the current status is terminal and