diff --git a/api/nodes.go b/api/nodes.go index de73e12c4..728705db0 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -57,21 +57,31 @@ type NodeUpdateDrainRequest struct { MarkEligible bool } +// NodeDrainUpdateResponse is used to respond to a node drain update +type NodeDrainUpdateResponse struct { + NodeModifyIndex uint64 + EvalIDs []string + EvalCreateIndex uint64 + WriteMeta +} + // UpdateDrain is used to update the drain strategy for a given node. If // markEligible is true and the drain is being removed, the node will be marked // as having its scheduling being elibile -func (n *Nodes) UpdateDrain(nodeID string, spec *DrainSpec, markEligible bool, q *WriteOptions) (*WriteMeta, error) { +func (n *Nodes) UpdateDrain(nodeID string, spec *DrainSpec, markEligible bool, q *WriteOptions) (*NodeDrainUpdateResponse, error) { req := &NodeUpdateDrainRequest{ NodeID: nodeID, DrainSpec: spec, MarkEligible: markEligible, } - wm, err := n.client.write("/v1/node/"+nodeID+"/drain", req, nil, q) + var resp NodeDrainUpdateResponse + wm, err := n.client.write("/v1/node/"+nodeID+"/drain", req, &resp, q) if err != nil { return nil, err } - return wm, nil + resp.WriteMeta = *wm + return &resp, nil } // NodeUpdateEligibilityRequest is used to update the drain specification for a node. @@ -81,8 +91,16 @@ type NodeUpdateEligibilityRequest struct { Eligibility string } +// NodeEligibilityUpdateResponse is used to respond to a node eligibility update +type NodeEligibilityUpdateResponse struct { + NodeModifyIndex uint64 + EvalIDs []string + EvalCreateIndex uint64 + WriteMeta +} + // ToggleEligibility is used to update the scheduling eligibility of the node -func (n *Nodes) ToggleEligibility(nodeID string, eligible bool, q *WriteOptions) (*WriteMeta, error) { +func (n *Nodes) ToggleEligibility(nodeID string, eligible bool, q *WriteOptions) (*NodeEligibilityUpdateResponse, error) { e := structs.NodeSchedulingEligible if !eligible { e = structs.NodeSchedulingIneligible @@ -93,11 +111,13 @@ func (n *Nodes) ToggleEligibility(nodeID string, eligible bool, q *WriteOptions) Eligibility: e, } - wm, err := n.client.write("/v1/node/"+nodeID+"/eligibility", req, nil, q) + var resp NodeEligibilityUpdateResponse + wm, err := n.client.write("/v1/node/"+nodeID+"/eligibility", req, &resp, q) if err != nil { return nil, err } - return wm, nil + resp.WriteMeta = *wm + return &resp, nil } // Allocations is used to return the allocations associated with a node. diff --git a/api/nodes_test.go b/api/nodes_test.go index 6f6d94499..36c13ae1a 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -176,9 +176,9 @@ func TestNodes_ToggleDrain(t *testing.T) { spec := &DrainSpec{ Deadline: 10 * time.Second, } - wm, err := nodes.UpdateDrain(nodeID, spec, false, nil) + drainOut, err := nodes.UpdateDrain(nodeID, spec, false, nil) require.Nil(err) - assertWriteMeta(t, wm) + assertWriteMeta(t, &drainOut.WriteMeta) // Check again out, _, err = nodes.Info(nodeID, nil) @@ -188,9 +188,9 @@ func TestNodes_ToggleDrain(t *testing.T) { } // Toggle off again - wm, err = nodes.UpdateDrain(nodeID, nil, true, nil) + drainOut, err = nodes.UpdateDrain(nodeID, nil, true, nil) require.Nil(err) - assertWriteMeta(t, wm) + assertWriteMeta(t, &drainOut.WriteMeta) // Check again out, _, err = nodes.Info(nodeID, nil) @@ -240,11 +240,11 @@ func TestNodes_ToggleEligibility(t *testing.T) { } // Toggle it off - wm, err := nodes.ToggleEligibility(nodeID, false, nil) + eligOut, err := nodes.ToggleEligibility(nodeID, false, nil) if err != nil { t.Fatalf("err: %s", err) } - assertWriteMeta(t, wm) + assertWriteMeta(t, &eligOut.WriteMeta) // Check again out, _, err = nodes.Info(nodeID, nil) @@ -256,11 +256,11 @@ func TestNodes_ToggleEligibility(t *testing.T) { } // Toggle on - wm, err = nodes.ToggleEligibility(nodeID, true, nil) + eligOut, err = nodes.ToggleEligibility(nodeID, true, nil) if err != nil { t.Fatalf("err: %s", err) } - assertWriteMeta(t, wm) + assertWriteMeta(t, &eligOut.WriteMeta) // Check again out, _, err = nodes.Info(nodeID, nil) diff --git a/command/agent/node_endpoint.go b/command/agent/node_endpoint.go index bad4fc445..f998d4499 100644 --- a/command/agent/node_endpoint.go +++ b/command/agent/node_endpoint.go @@ -165,12 +165,12 @@ func (s *HTTPServer) nodeToggleEligibility(resp http.ResponseWriter, req *http.R } s.parseWriteRequest(req, &drainRequest.WriteRequest) - var out structs.GenericResponse + var out structs.NodeEligibilityUpdateResponse if err := s.agent.RPC("Node.UpdateEligibility", &drainRequest, &out); err != nil { return nil, err } setIndex(resp, out.Index) - return nil, nil + return out, nil } func (s *HTTPServer) nodeQuery(resp http.ResponseWriter, req *http.Request, diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index 6b3d96c44..ad69c4b5a 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -327,12 +327,16 @@ func TestHTTP_NodeEligible(t *testing.T) { respW := httptest.NewRecorder() // Make the request - _, err = s.Server.NodeSpecificRequest(respW, req) + obj, err := s.Server.NodeSpecificRequest(respW, req) require.Nil(err) // Check for the index require.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + // Check the response + _, ok := obj.(structs.NodeEligibilityUpdateResponse) + require.True(ok) + // Check that the node has been updated state := s.Agent.server.State() out, err := state.NodeByID(nil, node.ID) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 956974af8..63c776d53 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -442,8 +442,7 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, if err != nil { return err } - ws := memdb.NewWatchSet() - node, err := snap.NodeByID(ws, args.NodeID) + node, err := snap.NodeByID(nil, args.NodeID) if err != nil { return err } @@ -474,6 +473,18 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, } reply.NodeModifyIndex = index + // If the node is transistioning to be eligible, create Node evaluations + // because there may be a System job registered that should be evaluated. + if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.MarkEligible && args.DrainStrategy == nil { + evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) + if err != nil { + n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) + return err + } + reply.EvalIDs = evalIDs + reply.EvalCreateIndex = evalIndex + } + // Set the reply index reply.Index = index return nil @@ -481,7 +492,7 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, // UpdateEligibility is used to update the scheduling eligibility of a node func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest, - reply *structs.GenericResponse) error { + reply *structs.NodeEligibilityUpdateResponse) error { if done, err := n.srv.forward("Node.UpdateEligibility", args, args, reply); done { return err } @@ -499,13 +510,19 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest, return fmt.Errorf("missing node ID for setting scheduling eligibility") } + // Check that only allowed types are set + switch args.Eligibility { + case structs.NodeSchedulingEligible, structs.NodeSchedulingIneligible: + default: + return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility) + } + // Look for the node snap, err := n.srv.fsm.State().Snapshot() if err != nil { return err } - ws := memdb.NewWatchSet() - node, err := snap.NodeByID(ws, args.NodeID) + node, err := snap.NodeByID(nil, args.NodeID) if err != nil { return err } @@ -536,6 +553,18 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest, } } + // If the node is transistioning to be eligible, create Node evaluations + // because there may be a System job registered that should be evaluated. + if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.Eligibility == structs.NodeSchedulingEligible { + evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) + if err != nil { + n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) + return err + } + reply.EvalIDs = evalIDs + reply.EvalCreateIndex = evalIndex + } + // Set the reply index reply.Index = index return nil diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 5cace3140..c6aa59d76 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -845,10 +845,25 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) { require.Nil(err) require.True(out.Drain) require.Equal(strategy.Deadline, out.DrainStrategy.Deadline) + // before+deadline should be before the forced deadline require.True(beforeUpdate.Add(strategy.Deadline).Before(out.DrainStrategy.ForceDeadline)) + // now+deadline should be after the forced deadline require.True(time.Now().Add(strategy.Deadline).After(out.DrainStrategy.ForceDeadline)) + + // Register a system job + job := mock.SystemJob() + require.Nil(s1.State().UpsertJob(10, job)) + + // Update the eligibility and expect evals + dereg.DrainStrategy = nil + dereg.MarkEligible = true + var resp3 structs.NodeDrainUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp3)) + require.NotZero(resp3.Index) + require.NotZero(resp3.EvalCreateIndex) + require.Len(resp3.EvalIDs, 1) } func TestClientEndpoint_UpdateDrain_ACL(t *testing.T) { @@ -1062,20 +1077,34 @@ func TestClientEndpoint_UpdateEligibility(t *testing.T) { require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) // Update the eligibility - dereg := &structs.NodeUpdateEligibilityRequest{ + elig := &structs.NodeUpdateEligibilityRequest{ NodeID: node.ID, Eligibility: structs.NodeSchedulingIneligible, WriteRequest: structs.WriteRequest{Region: "global"}, } - var resp2 structs.GenericResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp2)) + var resp2 structs.NodeEligibilityUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", elig, &resp2)) require.NotZero(resp2.Index) + require.Zero(resp2.EvalCreateIndex) + require.Empty(resp2.EvalIDs) // Check for the node in the FSM state := s1.fsm.State() out, err := state.NodeByID(nil, node.ID) require.Nil(err) require.Equal(out.SchedulingEligibility, structs.NodeSchedulingIneligible) + + // Register a system job + job := mock.SystemJob() + require.Nil(s1.State().UpsertJob(10, job)) + + // Update the eligibility and expect evals + elig.Eligibility = structs.NodeSchedulingEligible + var resp3 structs.NodeEligibilityUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", elig, &resp3)) + require.NotZero(resp3.Index) + require.NotZero(resp3.EvalCreateIndex) + require.Len(resp3.EvalIDs, 1) } func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) { @@ -1103,7 +1132,7 @@ func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } { - var resp structs.GenericResponse + var resp structs.NodeEligibilityUpdateResponse err := msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp) require.NotNil(err, "RPC") require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) @@ -1112,14 +1141,14 @@ func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) { // Try with a valid token dereg.AuthToken = validToken.SecretID { - var resp structs.GenericResponse + var resp structs.NodeEligibilityUpdateResponse require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp), "RPC") } // Try with a invalid token dereg.AuthToken = invalidToken.SecretID { - var resp structs.GenericResponse + var resp structs.NodeEligibilityUpdateResponse err := msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp) require.NotNil(err, "RPC") require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) @@ -1128,7 +1157,7 @@ func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) { // Try with a root token dereg.AuthToken = root.SecretID { - var resp structs.GenericResponse + var resp structs.NodeEligibilityUpdateResponse require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp), "RPC") } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 9e0db1659..691dcc5ff 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -929,12 +929,17 @@ type NodeUpdateResponse struct { // NodeDrainUpdateResponse is used to respond to a node drain update type NodeDrainUpdateResponse struct { NodeModifyIndex uint64 - QueryMeta - - // Deprecated in Nomad 0.8 as an evaluation is not immediately created but - // is instead handled by the drainer. EvalIDs []string EvalCreateIndex uint64 + WriteMeta +} + +// NodeEligibilityUpdateResponse is used to respond to a node eligibility update +type NodeEligibilityUpdateResponse struct { + NodeModifyIndex uint64 + EvalIDs []string + EvalCreateIndex uint64 + WriteMeta } // NodeAllocsResponse is used to return allocs for a single node