mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 10:55:42 +03:00
Emit events based on eligibility
This commit is contained in:
@@ -384,7 +384,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac
|
||||
return err
|
||||
}
|
||||
|
||||
if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility); err != nil {
|
||||
if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.NodeEvent); err != nil {
|
||||
n.logger.Printf("[ERR] nomad.fsm: UpdateNodeEligibility failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -451,10 +451,17 @@ func TestFSM_UpdateNodeEligibility(t *testing.T) {
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
require.Nil(resp)
|
||||
|
||||
event := &structs.NodeEvent{
|
||||
Message: "Node marked as ineligible",
|
||||
Subsystem: structs.NodeEventSubsystemCluster,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Set the eligibility
|
||||
req2 := structs.NodeUpdateEligibilityRequest{
|
||||
NodeID: node.ID,
|
||||
Eligibility: structs.NodeSchedulingIneligible,
|
||||
NodeEvent: event,
|
||||
}
|
||||
buf, err = structs.Encode(structs.NodeUpdateEligibilityRequestType, req2)
|
||||
require.Nil(err)
|
||||
@@ -466,6 +473,8 @@ func TestFSM_UpdateNodeEligibility(t *testing.T) {
|
||||
node, err = fsm.State().NodeByID(nil, req.Node.ID)
|
||||
require.Nil(err)
|
||||
require.Equal(node.SchedulingEligibility, structs.NodeSchedulingIneligible)
|
||||
require.Len(node.Events, 2)
|
||||
require.Equal(event.Message, node.Events[1].Message)
|
||||
|
||||
// Update the drain
|
||||
strategy := &structs.DrainStrategy{
|
||||
|
||||
@@ -32,6 +32,14 @@ const (
|
||||
NodeDrainEventDrainSet = "Node drain strategy set"
|
||||
NodeDrainEventDrainDisabled = "Node drain disabled"
|
||||
NodeDrainEventDrainUpdated = "Node drain stategy updated"
|
||||
|
||||
// NodeEligibilityEventEligible is used when the nodes eligiblity is marked
|
||||
// eligible
|
||||
NodeEligibilityEventEligible = "Node marked as eligible for scheduling"
|
||||
|
||||
// NodeEligibilityEventIneligible is used when the nodes eligiblity is marked
|
||||
// ineligible
|
||||
NodeEligibilityEventIneligible = "Node marked as ineligible for scheduling"
|
||||
)
|
||||
|
||||
// Node endpoint is used for client interactions
|
||||
@@ -532,6 +540,9 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
|
||||
if args.NodeID == "" {
|
||||
return fmt.Errorf("missing node ID for setting scheduling eligibility")
|
||||
}
|
||||
if args.NodeEvent != nil {
|
||||
return fmt.Errorf("node event may not be set")
|
||||
}
|
||||
|
||||
// Check that only allowed types are set
|
||||
switch args.Eligibility {
|
||||
@@ -563,6 +574,16 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
|
||||
return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility)
|
||||
}
|
||||
|
||||
// Construct the node event
|
||||
args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster)
|
||||
if node.SchedulingEligibility == args.Eligibility {
|
||||
return nil // Nothing to do
|
||||
} else if args.Eligibility == structs.NodeSchedulingEligible {
|
||||
args.NodeEvent.SetMessage(NodeEligibilityEventEligible)
|
||||
} else {
|
||||
args.NodeEvent.SetMessage(NodeEligibilityEventIneligible)
|
||||
}
|
||||
|
||||
// Commit this update via Raft
|
||||
outErr, index, err := n.srv.raftApply(structs.NodeUpdateEligibilityRequestType, args)
|
||||
if err != nil {
|
||||
|
||||
@@ -1095,6 +1095,8 @@ func TestClientEndpoint_UpdateEligibility(t *testing.T) {
|
||||
out, err := state.NodeByID(nil, node.ID)
|
||||
require.Nil(err)
|
||||
require.Equal(out.SchedulingEligibility, structs.NodeSchedulingIneligible)
|
||||
require.Len(out.Events, 2)
|
||||
require.Equal(NodeEligibilityEventIneligible, out.Events[1].Message)
|
||||
|
||||
// Register a system job
|
||||
job := mock.SystemJob()
|
||||
@@ -1107,6 +1109,11 @@ func TestClientEndpoint_UpdateEligibility(t *testing.T) {
|
||||
require.NotZero(resp3.Index)
|
||||
require.NotZero(resp3.EvalCreateIndex)
|
||||
require.Len(resp3.EvalIDs, 1)
|
||||
|
||||
out, err = state.NodeByID(nil, node.ID)
|
||||
require.Nil(err)
|
||||
require.Len(out.Events, 3)
|
||||
require.Equal(NodeEligibilityEventEligible, out.Events[2].Message)
|
||||
}
|
||||
|
||||
func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) {
|
||||
|
||||
@@ -688,7 +688,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
|
||||
}
|
||||
|
||||
// UpdateNodeEligibility is used to update the scheduling eligibility of a node
|
||||
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string) error {
|
||||
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, event *structs.NodeEvent) error {
|
||||
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
@@ -706,6 +706,11 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil
|
||||
existingNode := existing.(*structs.Node)
|
||||
copyNode := existingNode.Copy()
|
||||
|
||||
// Add the event if given
|
||||
if event != nil {
|
||||
appendNodeEvents(index, copyNode, []*structs.NodeEvent{event})
|
||||
}
|
||||
|
||||
// Check if this is a valid action
|
||||
if copyNode.DrainStrategy != nil && eligibility == structs.NodeSchedulingEligible {
|
||||
return fmt.Errorf("can not set node's scheduling eligibility to eligible while it is draining")
|
||||
|
||||
@@ -952,13 +952,20 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility))
|
||||
event := &structs.NodeEvent{
|
||||
Message: "Node marked as ineligible",
|
||||
Subsystem: structs.NodeEventSubsystemCluster,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, event))
|
||||
require.True(watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
out, err := state.NodeByID(ws, node.ID)
|
||||
require.Nil(err)
|
||||
require.Equal(out.SchedulingEligibility, expectedEligibility)
|
||||
require.Len(out.Events, 2)
|
||||
require.Equal(out.Events[1], event)
|
||||
require.EqualValues(1001, out.ModifyIndex)
|
||||
|
||||
index, err := state.Index("nodes")
|
||||
@@ -975,7 +982,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
|
||||
require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, nil))
|
||||
|
||||
// Try to set the node to eligible
|
||||
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible)
|
||||
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, nil)
|
||||
require.NotNil(err)
|
||||
require.Contains(err.Error(), "while it is draining")
|
||||
}
|
||||
|
||||
@@ -376,6 +376,10 @@ type DrainUpdate struct {
|
||||
type NodeUpdateEligibilityRequest struct {
|
||||
NodeID string
|
||||
Eligibility string
|
||||
|
||||
// NodeEvent is the event added to the node
|
||||
NodeEvent *NodeEvent
|
||||
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user