mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 19:05:42 +03:00
RPC/FSM/State Store for Eligibility
This commit is contained in:
committed by
Michael Schurter
parent
5c101de725
commit
dcafa8b460
16
nomad/fsm.go
16
nomad/fsm.go
@@ -242,6 +242,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
|
||||
return n.applyBatchDeregisterJob(buf[1:], log.Index)
|
||||
case structs.AllocUpdateDesiredTransitionRequestType:
|
||||
return n.applyAllocUpdateDesiredTransition(buf[1:], log.Index)
|
||||
case structs.NodeUpdateEligibilityRequestType:
|
||||
return n.applyNodeEligibilityUpdate(buf[1:], log.Index)
|
||||
}
|
||||
|
||||
// Check enterprise only message types.
|
||||
@@ -335,6 +337,20 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interface{} {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_eligibility_update"}, time.Now())
|
||||
var req structs.NodeUpdateEligibilityRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
|
||||
if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility); err != nil {
|
||||
n.logger.Printf("[ERR] nomad.fsm: UpdateNodeEligibility failed: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_job"}, time.Now())
|
||||
var req structs.JobRegisterRequest
|
||||
|
||||
@@ -316,6 +316,67 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
|
||||
require.Equal(node.DrainStrategy, strategy)
|
||||
}
|
||||
|
||||
func TestFSM_UpdateNodeEligibility(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
fsm := testFSM(t)
|
||||
|
||||
node := mock.Node()
|
||||
req := structs.NodeRegisterRequest{
|
||||
Node: node,
|
||||
}
|
||||
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
|
||||
require.Nil(err)
|
||||
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
require.Nil(resp)
|
||||
|
||||
// Set the eligibility
|
||||
req2 := structs.NodeUpdateEligibilityRequest{
|
||||
NodeID: node.ID,
|
||||
Eligibility: structs.NodeSchedulingIneligible,
|
||||
}
|
||||
buf, err = structs.Encode(structs.NodeUpdateEligibilityRequestType, req2)
|
||||
require.Nil(err)
|
||||
|
||||
resp = fsm.Apply(makeLog(buf))
|
||||
require.Nil(resp)
|
||||
|
||||
// Lookup the node and check
|
||||
node, err = fsm.State().NodeByID(nil, req.Node.ID)
|
||||
require.Nil(err)
|
||||
require.Equal(node.SchedulingEligibility, structs.NodeSchedulingIneligible)
|
||||
|
||||
// Update the drain
|
||||
strategy := &structs.DrainStrategy{
|
||||
DrainSpec: structs.DrainSpec{
|
||||
Deadline: 10 * time.Second,
|
||||
},
|
||||
}
|
||||
req3 := structs.NodeUpdateDrainRequest{
|
||||
NodeID: node.ID,
|
||||
DrainStrategy: strategy,
|
||||
}
|
||||
buf, err = structs.Encode(structs.NodeUpdateDrainRequestType, req3)
|
||||
require.Nil(err)
|
||||
resp = fsm.Apply(makeLog(buf))
|
||||
require.Nil(resp)
|
||||
|
||||
// Try forcing eligibility
|
||||
req4 := structs.NodeUpdateEligibilityRequest{
|
||||
NodeID: node.ID,
|
||||
Eligibility: structs.NodeSchedulingEligible,
|
||||
}
|
||||
buf, err = structs.Encode(structs.NodeUpdateEligibilityRequestType, req4)
|
||||
require.Nil(err)
|
||||
|
||||
resp = fsm.Apply(makeLog(buf))
|
||||
require.NotNil(resp)
|
||||
err, ok := resp.(error)
|
||||
require.True(ok)
|
||||
require.Contains(err.Error(), "draining")
|
||||
}
|
||||
|
||||
func TestFSM_RegisterJob(t *testing.T) {
|
||||
t.Parallel()
|
||||
fsm := testFSM(t)
|
||||
|
||||
@@ -456,6 +456,62 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateEligibility is used to update the scheduling eligibility of a node
|
||||
func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
|
||||
reply *structs.GenericResponse) error {
|
||||
if done, err := n.srv.forward("Node.UpdateEligibility", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "client", "update_eligibility"}, time.Now())
|
||||
|
||||
// Check node write permissions
|
||||
if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
|
||||
return err
|
||||
} else if aclObj != nil && !aclObj.AllowNodeWrite() {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
|
||||
// Verify the arguments
|
||||
if args.NodeID == "" {
|
||||
return fmt.Errorf("missing node ID for setting scheduling eligibility")
|
||||
}
|
||||
|
||||
// Look for the node
|
||||
snap, err := n.srv.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ws := memdb.NewWatchSet()
|
||||
node, err := snap.NodeByID(ws, args.NodeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if node == nil {
|
||||
return fmt.Errorf("node not found")
|
||||
}
|
||||
|
||||
if node.DrainStrategy != nil && args.Eligibility == structs.NodeSchedulingEligible {
|
||||
return fmt.Errorf("can not set node's scheduling eligibility to eligible while it is draining")
|
||||
}
|
||||
|
||||
// Commit this update via Raft
|
||||
outErr, index, err := n.srv.raftApply(structs.NodeUpdateEligibilityRequestType, args)
|
||||
if err != nil {
|
||||
n.srv.logger.Printf("[ERR] nomad.client: eligibility update failed: %v", err)
|
||||
return err
|
||||
}
|
||||
if outErr != nil {
|
||||
if err, ok := outErr.(error); ok && err != nil {
|
||||
n.srv.logger.Printf("[ERR] nomad.client: eligibility update failed: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Set the reply index
|
||||
reply.Index = index
|
||||
return nil
|
||||
}
|
||||
|
||||
// Evaluate is used to force a re-evaluation of the node
|
||||
func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUpdateResponse) error {
|
||||
if done, err := n.srv.forward("Node.Evaluate", args, args, reply); done {
|
||||
|
||||
@@ -980,6 +980,97 @@ func TestClientEndpoint_Drain_Down(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestClientEndpoint_UpdateEligibility(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
s1 := TestServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request
|
||||
node := mock.Node()
|
||||
reg := &structs.NodeRegisterRequest{
|
||||
Node: node,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
|
||||
// Fetch the response
|
||||
var resp structs.NodeUpdateResponse
|
||||
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
|
||||
|
||||
// Update the eligibility
|
||||
dereg := &structs.NodeUpdateEligibilityRequest{
|
||||
NodeID: node.ID,
|
||||
Eligibility: structs.NodeSchedulingIneligible,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
var resp2 structs.GenericResponse
|
||||
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp2))
|
||||
require.NotZero(resp2.Index)
|
||||
|
||||
// Check for the node in the FSM
|
||||
state := s1.fsm.State()
|
||||
out, err := state.NodeByID(nil, node.ID)
|
||||
require.Nil(err)
|
||||
require.Equal(out.SchedulingEligibility, structs.NodeSchedulingIneligible)
|
||||
}
|
||||
|
||||
func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1, root := TestACLServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
require := require.New(t)
|
||||
|
||||
// Create the node
|
||||
node := mock.Node()
|
||||
state := s1.fsm.State()
|
||||
|
||||
require.Nil(state.UpsertNode(1, node), "UpsertNode")
|
||||
|
||||
// Create the policy and tokens
|
||||
validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyWrite))
|
||||
invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead))
|
||||
|
||||
// Update the status without a token and expect failure
|
||||
dereg := &structs.NodeUpdateEligibilityRequest{
|
||||
NodeID: node.ID,
|
||||
Eligibility: structs.NodeSchedulingIneligible,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
{
|
||||
var resp structs.GenericResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp)
|
||||
require.NotNil(err, "RPC")
|
||||
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
|
||||
}
|
||||
|
||||
// Try with a valid token
|
||||
dereg.AuthToken = validToken.SecretID
|
||||
{
|
||||
var resp structs.GenericResponse
|
||||
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp), "RPC")
|
||||
}
|
||||
|
||||
// Try with a invalid token
|
||||
dereg.AuthToken = invalidToken.SecretID
|
||||
{
|
||||
var resp structs.GenericResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp)
|
||||
require.NotNil(err, "RPC")
|
||||
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
|
||||
}
|
||||
|
||||
// Try with a root token
|
||||
dereg.AuthToken = root.SecretID
|
||||
{
|
||||
var resp structs.GenericResponse
|
||||
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp), "RPC")
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientEndpoint_GetNode(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, nil)
|
||||
|
||||
@@ -509,7 +509,7 @@ func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) erro
|
||||
|
||||
// UpsertNode is used to register a node or update a node definition
|
||||
// This is assumed to be triggered by the client, so we retain the value
|
||||
// of drain which is set by the scheduler.
|
||||
// of drain/eligibility which is set by the scheduler.
|
||||
func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
@@ -525,10 +525,12 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
|
||||
exist := existing.(*structs.Node)
|
||||
node.CreateIndex = exist.CreateIndex
|
||||
node.ModifyIndex = index
|
||||
node.Drain = exist.Drain // Retain the drain mode
|
||||
|
||||
// Retain node events that have already been set on the node
|
||||
node.Events = exist.Events
|
||||
|
||||
node.Drain = exist.Drain // Retain the drain mode
|
||||
node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility
|
||||
} else {
|
||||
// Because this is the first time the node is being registered, we should
|
||||
// also create a node registration event
|
||||
@@ -659,6 +661,46 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain *structs
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateNodeEligibility is used to update the scheduling eligibility of a node
|
||||
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string) error {
|
||||
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
|
||||
// Lookup the node
|
||||
existing, err := txn.First("nodes", "id", nodeID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("node lookup failed: %v", err)
|
||||
}
|
||||
if existing == nil {
|
||||
return fmt.Errorf("node not found")
|
||||
}
|
||||
|
||||
// Copy the existing node
|
||||
existingNode := existing.(*structs.Node)
|
||||
copyNode := existingNode.Copy()
|
||||
|
||||
// Check if this is a valid action
|
||||
if copyNode.DrainStrategy != nil && eligibility == structs.NodeSchedulingEligible {
|
||||
return fmt.Errorf("can not set node's scheduling eligibility to eligible while it is draining")
|
||||
}
|
||||
|
||||
// Update the eligibility in the copy
|
||||
copyNode.SchedulingEligibility = eligibility
|
||||
copyNode.ModifyIndex = index
|
||||
|
||||
// Insert the node
|
||||
if err := txn.Insert("nodes", copyNode); err != nil {
|
||||
return fmt.Errorf("node update failed: %v", err)
|
||||
}
|
||||
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
txn.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpsertNodeEvents adds the node events to the nodes, rotating events as
|
||||
// necessary.
|
||||
func (s *StateStore) UpsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent) error {
|
||||
|
||||
@@ -822,6 +822,52 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) {
|
||||
require.Equal(uint64(20), out.Events[len(out.Events)-1].CreateIndex)
|
||||
}
|
||||
|
||||
func TestStateStore_UpdateNodeEligibility(t *testing.T) {
|
||||
require := require.New(t)
|
||||
state := testStateStore(t)
|
||||
node := mock.Node()
|
||||
|
||||
err := state.UpsertNode(1000, node)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
expectedEligibility := structs.NodeSchedulingIneligible
|
||||
|
||||
// Create a watchset so we can test that update node drain fires the watch
|
||||
ws := memdb.NewWatchSet()
|
||||
if _, err := state.NodeByID(ws, node.ID); err != nil {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility))
|
||||
require.True(watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
out, err := state.NodeByID(ws, node.ID)
|
||||
require.Nil(err)
|
||||
require.Equal(out.SchedulingEligibility, expectedEligibility)
|
||||
require.EqualValues(1001, out.ModifyIndex)
|
||||
|
||||
index, err := state.Index("nodes")
|
||||
require.Nil(err)
|
||||
require.EqualValues(1001, index)
|
||||
require.False(watchFired(ws))
|
||||
|
||||
// Set a drain strategy
|
||||
expectedDrain := &structs.DrainStrategy{
|
||||
DrainSpec: structs.DrainSpec{
|
||||
Deadline: -1 * time.Second,
|
||||
},
|
||||
}
|
||||
require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain))
|
||||
|
||||
// Try to set the node to eligible
|
||||
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible)
|
||||
require.NotNil(err)
|
||||
require.Contains(err.Error(), "while it is draining")
|
||||
}
|
||||
|
||||
func TestStateStore_Nodes(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
var nodes []*structs.Node
|
||||
|
||||
@@ -79,6 +79,7 @@ const (
|
||||
UpsertNodeEventsType
|
||||
JobBatchDeregisterRequestType
|
||||
AllocUpdateDesiredTransitionRequestType
|
||||
NodeUpdateEligibilityRequestType
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -310,6 +311,13 @@ type NodeUpdateDrainRequest struct {
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// NodeUpdateEligibilityRequest is used for updating the scheduling eligibility
|
||||
type NodeUpdateEligibilityRequest struct {
|
||||
NodeID string
|
||||
Eligibility string
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// NodeEvaluateRequest is used to re-evaluate the node
|
||||
type NodeEvaluateRequest struct {
|
||||
NodeID string
|
||||
|
||||
Reference in New Issue
Block a user