From dcafa8b46027178a4bb602462f4b209f5a9fe253 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 26 Feb 2018 16:34:42 -0800 Subject: [PATCH] RPC/FSM/State Store for Eligibility --- nomad/fsm.go | 16 ++++++ nomad/fsm_test.go | 61 ++++++++++++++++++++++ nomad/node_endpoint.go | 56 ++++++++++++++++++++ nomad/node_endpoint_test.go | 91 +++++++++++++++++++++++++++++++++ nomad/state/state_store.go | 46 ++++++++++++++++- nomad/state/state_store_test.go | 46 +++++++++++++++++ nomad/structs/structs.go | 8 +++ 7 files changed, 322 insertions(+), 2 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index a946d523f..7df2582db 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -242,6 +242,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyBatchDeregisterJob(buf[1:], log.Index) case structs.AllocUpdateDesiredTransitionRequestType: return n.applyAllocUpdateDesiredTransition(buf[1:], log.Index) + case structs.NodeUpdateEligibilityRequestType: + return n.applyNodeEligibilityUpdate(buf[1:], log.Index) } // Check enterprise only message types. @@ -335,6 +337,20 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} { return nil } +func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "node_eligibility_update"}, time.Now()) + var req structs.NodeUpdateEligibilityRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility); err != nil { + n.logger.Printf("[ERR] nomad.fsm: UpdateNodeEligibility failed: %v", err) + return err + } + return nil +} + func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "register_job"}, time.Now()) var req structs.JobRegisterRequest diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 922ba2ca6..b834b432f 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -316,6 +316,67 @@ func TestFSM_UpdateNodeDrain(t *testing.T) { require.Equal(node.DrainStrategy, strategy) } +func TestFSM_UpdateNodeEligibility(t *testing.T) { + t.Parallel() + require := require.New(t) + fsm := testFSM(t) + + node := mock.Node() + req := structs.NodeRegisterRequest{ + Node: node, + } + buf, err := structs.Encode(structs.NodeRegisterRequestType, req) + require.Nil(err) + + resp := fsm.Apply(makeLog(buf)) + require.Nil(resp) + + // Set the eligibility + req2 := structs.NodeUpdateEligibilityRequest{ + NodeID: node.ID, + Eligibility: structs.NodeSchedulingIneligible, + } + buf, err = structs.Encode(structs.NodeUpdateEligibilityRequestType, req2) + require.Nil(err) + + resp = fsm.Apply(makeLog(buf)) + require.Nil(resp) + + // Lookup the node and check + node, err = fsm.State().NodeByID(nil, req.Node.ID) + require.Nil(err) + require.Equal(node.SchedulingEligibility, structs.NodeSchedulingIneligible) + + // Update the drain + strategy := &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 10 * time.Second, + }, + } + req3 := structs.NodeUpdateDrainRequest{ + NodeID: node.ID, + DrainStrategy: strategy, + } + buf, err = structs.Encode(structs.NodeUpdateDrainRequestType, req3) + require.Nil(err) + resp = fsm.Apply(makeLog(buf)) + require.Nil(resp) + + // Try forcing eligibility + req4 := structs.NodeUpdateEligibilityRequest{ + NodeID: node.ID, + Eligibility: structs.NodeSchedulingEligible, + } + buf, err = structs.Encode(structs.NodeUpdateEligibilityRequestType, req4) + require.Nil(err) + + resp = fsm.Apply(makeLog(buf)) + require.NotNil(resp) + err, ok := resp.(error) + require.True(ok) + require.Contains(err.Error(), "draining") +} + func TestFSM_RegisterJob(t *testing.T) { t.Parallel() fsm := testFSM(t) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 082491e89..f46de1661 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -456,6 +456,62 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, return nil } +// UpdateEligibility is used to update the scheduling eligibility of a node +func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest, + reply *structs.GenericResponse) error { + if done, err := n.srv.forward("Node.UpdateEligibility", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "client", "update_eligibility"}, time.Now()) + + // Check node write permissions + if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNodeWrite() { + return structs.ErrPermissionDenied + } + + // Verify the arguments + if args.NodeID == "" { + return fmt.Errorf("missing node ID for setting scheduling eligibility") + } + + // Look for the node + snap, err := n.srv.fsm.State().Snapshot() + if err != nil { + return err + } + ws := memdb.NewWatchSet() + node, err := snap.NodeByID(ws, args.NodeID) + if err != nil { + return err + } + if node == nil { + return fmt.Errorf("node not found") + } + + if node.DrainStrategy != nil && args.Eligibility == structs.NodeSchedulingEligible { + return fmt.Errorf("can not set node's scheduling eligibility to eligible while it is draining") + } + + // Commit this update via Raft + outErr, index, err := n.srv.raftApply(structs.NodeUpdateEligibilityRequestType, args) + if err != nil { + n.srv.logger.Printf("[ERR] nomad.client: eligibility update failed: %v", err) + return err + } + if outErr != nil { + if err, ok := outErr.(error); ok && err != nil { + n.srv.logger.Printf("[ERR] nomad.client: eligibility update failed: %v", err) + return err + } + } + + // Set the reply index + reply.Index = index + return nil +} + // Evaluate is used to force a re-evaluation of the node func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUpdateResponse) error { if done, err := n.srv.forward("Node.Evaluate", args, args, reply); done { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 7649cc2db..87c418d0d 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -980,6 +980,97 @@ func TestClientEndpoint_Drain_Down(t *testing.T) { }) } +func TestClientEndpoint_UpdateEligibility(t *testing.T) { + t.Parallel() + require := require.New(t) + s1 := TestServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.NodeUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) + + // Update the eligibility + dereg := &structs.NodeUpdateEligibilityRequest{ + NodeID: node.ID, + Eligibility: structs.NodeSchedulingIneligible, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp2 structs.GenericResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp2)) + require.NotZero(resp2.Index) + + // Check for the node in the FSM + state := s1.fsm.State() + out, err := state.NodeByID(nil, node.ID) + require.Nil(err) + require.Equal(out.SchedulingEligibility, structs.NodeSchedulingIneligible) +} + +func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) { + t.Parallel() + s1, root := TestACLServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + require := require.New(t) + + // Create the node + node := mock.Node() + state := s1.fsm.State() + + require.Nil(state.UpsertNode(1, node), "UpsertNode") + + // Create the policy and tokens + validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyWrite)) + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead)) + + // Update the status without a token and expect failure + dereg := &structs.NodeUpdateEligibilityRequest{ + NodeID: node.ID, + Eligibility: structs.NodeSchedulingIneligible, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + { + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp) + require.NotNil(err, "RPC") + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with a valid token + dereg.AuthToken = validToken.SecretID + { + var resp structs.GenericResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp), "RPC") + } + + // Try with a invalid token + dereg.AuthToken = invalidToken.SecretID + { + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp) + require.NotNil(err, "RPC") + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with a root token + dereg.AuthToken = root.SecretID + { + var resp structs.GenericResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp), "RPC") + } +} + func TestClientEndpoint_GetNode(t *testing.T) { t.Parallel() s1 := TestServer(t, nil) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index cfef5e483..e48a940e8 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -509,7 +509,7 @@ func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) erro // UpsertNode is used to register a node or update a node definition // This is assumed to be triggered by the client, so we retain the value -// of drain which is set by the scheduler. +// of drain/eligibility which is set by the scheduler. func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { txn := s.db.Txn(true) defer txn.Abort() @@ -525,10 +525,12 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { exist := existing.(*structs.Node) node.CreateIndex = exist.CreateIndex node.ModifyIndex = index - node.Drain = exist.Drain // Retain the drain mode // Retain node events that have already been set on the node node.Events = exist.Events + + node.Drain = exist.Drain // Retain the drain mode + node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility } else { // Because this is the first time the node is being registered, we should // also create a node registration event @@ -659,6 +661,46 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain *structs return nil } +// UpdateNodeEligibility is used to update the scheduling eligibility of a node +func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string) error { + + txn := s.db.Txn(true) + defer txn.Abort() + + // Lookup the node + existing, err := txn.First("nodes", "id", nodeID) + if err != nil { + return fmt.Errorf("node lookup failed: %v", err) + } + if existing == nil { + return fmt.Errorf("node not found") + } + + // Copy the existing node + existingNode := existing.(*structs.Node) + copyNode := existingNode.Copy() + + // Check if this is a valid action + if copyNode.DrainStrategy != nil && eligibility == structs.NodeSchedulingEligible { + return fmt.Errorf("can not set node's scheduling eligibility to eligible while it is draining") + } + + // Update the eligibility in the copy + copyNode.SchedulingEligibility = eligibility + copyNode.ModifyIndex = index + + // Insert the node + if err := txn.Insert("nodes", copyNode); err != nil { + return fmt.Errorf("node update failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + txn.Commit() + return nil +} + // UpsertNodeEvents adds the node events to the nodes, rotating events as // necessary. func (s *StateStore) UpsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent) error { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 81e17eaab..1bf1467de 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -822,6 +822,52 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { require.Equal(uint64(20), out.Events[len(out.Events)-1].CreateIndex) } +func TestStateStore_UpdateNodeEligibility(t *testing.T) { + require := require.New(t) + state := testStateStore(t) + node := mock.Node() + + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + expectedEligibility := structs.NodeSchedulingIneligible + + // Create a watchset so we can test that update node drain fires the watch + ws := memdb.NewWatchSet() + if _, err := state.NodeByID(ws, node.ID); err != nil { + t.Fatalf("bad: %v", err) + } + + require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility)) + require.True(watchFired(ws)) + + ws = memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) + require.Nil(err) + require.Equal(out.SchedulingEligibility, expectedEligibility) + require.EqualValues(1001, out.ModifyIndex) + + index, err := state.Index("nodes") + require.Nil(err) + require.EqualValues(1001, index) + require.False(watchFired(ws)) + + // Set a drain strategy + expectedDrain := &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: -1 * time.Second, + }, + } + require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain)) + + // Try to set the node to eligible + err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible) + require.NotNil(err) + require.Contains(err.Error(), "while it is draining") +} + func TestStateStore_Nodes(t *testing.T) { state := testStateStore(t) var nodes []*structs.Node diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d28c89fa8..e1d9b0777 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -79,6 +79,7 @@ const ( UpsertNodeEventsType JobBatchDeregisterRequestType AllocUpdateDesiredTransitionRequestType + NodeUpdateEligibilityRequestType ) const ( @@ -310,6 +311,13 @@ type NodeUpdateDrainRequest struct { WriteRequest } +// NodeUpdateEligibilityRequest is used for updating the scheduling eligibility +type NodeUpdateEligibilityRequest struct { + NodeID string + Eligibility string + WriteRequest +} + // NodeEvaluateRequest is used to re-evaluate the node type NodeEvaluateRequest struct { NodeID string