Merge pull request #4056 from hashicorp/b-system-eligible

Create evals for system jobs when drain is unset
This commit is contained in:
Alex Dadgar
2018-03-27 16:25:57 -07:00
committed by GitHub
7 changed files with 120 additions and 33 deletions

View File

@@ -57,21 +57,31 @@ type NodeUpdateDrainRequest struct {
MarkEligible bool
}
// NodeDrainUpdateResponse is used to respond to a node drain update
type NodeDrainUpdateResponse struct {
NodeModifyIndex uint64
EvalIDs []string
EvalCreateIndex uint64
WriteMeta
}
// UpdateDrain is used to update the drain strategy for a given node. If
// markEligible is true and the drain is being removed, the node will be marked
// as having its scheduling being elibile
func (n *Nodes) UpdateDrain(nodeID string, spec *DrainSpec, markEligible bool, q *WriteOptions) (*WriteMeta, error) {
func (n *Nodes) UpdateDrain(nodeID string, spec *DrainSpec, markEligible bool, q *WriteOptions) (*NodeDrainUpdateResponse, error) {
req := &NodeUpdateDrainRequest{
NodeID: nodeID,
DrainSpec: spec,
MarkEligible: markEligible,
}
wm, err := n.client.write("/v1/node/"+nodeID+"/drain", req, nil, q)
var resp NodeDrainUpdateResponse
wm, err := n.client.write("/v1/node/"+nodeID+"/drain", req, &resp, q)
if err != nil {
return nil, err
}
return wm, nil
resp.WriteMeta = *wm
return &resp, nil
}
// NodeUpdateEligibilityRequest is used to update the drain specification for a node.
@@ -81,8 +91,16 @@ type NodeUpdateEligibilityRequest struct {
Eligibility string
}
// NodeEligibilityUpdateResponse is used to respond to a node eligibility update
type NodeEligibilityUpdateResponse struct {
NodeModifyIndex uint64
EvalIDs []string
EvalCreateIndex uint64
WriteMeta
}
// ToggleEligibility is used to update the scheduling eligibility of the node
func (n *Nodes) ToggleEligibility(nodeID string, eligible bool, q *WriteOptions) (*WriteMeta, error) {
func (n *Nodes) ToggleEligibility(nodeID string, eligible bool, q *WriteOptions) (*NodeEligibilityUpdateResponse, error) {
e := structs.NodeSchedulingEligible
if !eligible {
e = structs.NodeSchedulingIneligible
@@ -93,11 +111,13 @@ func (n *Nodes) ToggleEligibility(nodeID string, eligible bool, q *WriteOptions)
Eligibility: e,
}
wm, err := n.client.write("/v1/node/"+nodeID+"/eligibility", req, nil, q)
var resp NodeEligibilityUpdateResponse
wm, err := n.client.write("/v1/node/"+nodeID+"/eligibility", req, &resp, q)
if err != nil {
return nil, err
}
return wm, nil
resp.WriteMeta = *wm
return &resp, nil
}
// Allocations is used to return the allocations associated with a node.

View File

@@ -176,9 +176,9 @@ func TestNodes_ToggleDrain(t *testing.T) {
spec := &DrainSpec{
Deadline: 10 * time.Second,
}
wm, err := nodes.UpdateDrain(nodeID, spec, false, nil)
drainOut, err := nodes.UpdateDrain(nodeID, spec, false, nil)
require.Nil(err)
assertWriteMeta(t, wm)
assertWriteMeta(t, &drainOut.WriteMeta)
// Check again
out, _, err = nodes.Info(nodeID, nil)
@@ -188,9 +188,9 @@ func TestNodes_ToggleDrain(t *testing.T) {
}
// Toggle off again
wm, err = nodes.UpdateDrain(nodeID, nil, true, nil)
drainOut, err = nodes.UpdateDrain(nodeID, nil, true, nil)
require.Nil(err)
assertWriteMeta(t, wm)
assertWriteMeta(t, &drainOut.WriteMeta)
// Check again
out, _, err = nodes.Info(nodeID, nil)
@@ -240,11 +240,11 @@ func TestNodes_ToggleEligibility(t *testing.T) {
}
// Toggle it off
wm, err := nodes.ToggleEligibility(nodeID, false, nil)
eligOut, err := nodes.ToggleEligibility(nodeID, false, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
assertWriteMeta(t, wm)
assertWriteMeta(t, &eligOut.WriteMeta)
// Check again
out, _, err = nodes.Info(nodeID, nil)
@@ -256,11 +256,11 @@ func TestNodes_ToggleEligibility(t *testing.T) {
}
// Toggle on
wm, err = nodes.ToggleEligibility(nodeID, true, nil)
eligOut, err = nodes.ToggleEligibility(nodeID, true, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
assertWriteMeta(t, wm)
assertWriteMeta(t, &eligOut.WriteMeta)
// Check again
out, _, err = nodes.Info(nodeID, nil)

View File

@@ -165,12 +165,12 @@ func (s *HTTPServer) nodeToggleEligibility(resp http.ResponseWriter, req *http.R
}
s.parseWriteRequest(req, &drainRequest.WriteRequest)
var out structs.GenericResponse
var out structs.NodeEligibilityUpdateResponse
if err := s.agent.RPC("Node.UpdateEligibility", &drainRequest, &out); err != nil {
return nil, err
}
setIndex(resp, out.Index)
return nil, nil
return out, nil
}
func (s *HTTPServer) nodeQuery(resp http.ResponseWriter, req *http.Request,

View File

@@ -327,12 +327,16 @@ func TestHTTP_NodeEligible(t *testing.T) {
respW := httptest.NewRecorder()
// Make the request
_, err = s.Server.NodeSpecificRequest(respW, req)
obj, err := s.Server.NodeSpecificRequest(respW, req)
require.Nil(err)
// Check for the index
require.NotZero(respW.HeaderMap.Get("X-Nomad-Index"))
// Check the response
_, ok := obj.(structs.NodeEligibilityUpdateResponse)
require.True(ok)
// Check that the node has been updated
state := s.Agent.server.State()
out, err := state.NodeByID(nil, node.ID)

View File

@@ -442,8 +442,7 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
if err != nil {
return err
}
ws := memdb.NewWatchSet()
node, err := snap.NodeByID(ws, args.NodeID)
node, err := snap.NodeByID(nil, args.NodeID)
if err != nil {
return err
}
@@ -474,6 +473,18 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
}
reply.NodeModifyIndex = index
// If the node is transistioning to be eligible, create Node evaluations
// because there may be a System job registered that should be evaluated.
if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.MarkEligible && args.DrainStrategy == nil {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err)
return err
}
reply.EvalIDs = evalIDs
reply.EvalCreateIndex = evalIndex
}
// Set the reply index
reply.Index = index
return nil
@@ -481,7 +492,7 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
// UpdateEligibility is used to update the scheduling eligibility of a node
func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
reply *structs.GenericResponse) error {
reply *structs.NodeEligibilityUpdateResponse) error {
if done, err := n.srv.forward("Node.UpdateEligibility", args, args, reply); done {
return err
}
@@ -499,13 +510,19 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
return fmt.Errorf("missing node ID for setting scheduling eligibility")
}
// Check that only allowed types are set
switch args.Eligibility {
case structs.NodeSchedulingEligible, structs.NodeSchedulingIneligible:
default:
return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility)
}
// Look for the node
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}
ws := memdb.NewWatchSet()
node, err := snap.NodeByID(ws, args.NodeID)
node, err := snap.NodeByID(nil, args.NodeID)
if err != nil {
return err
}
@@ -536,6 +553,18 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
}
}
// If the node is transistioning to be eligible, create Node evaluations
// because there may be a System job registered that should be evaluated.
if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.Eligibility == structs.NodeSchedulingEligible {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err)
return err
}
reply.EvalIDs = evalIDs
reply.EvalCreateIndex = evalIndex
}
// Set the reply index
reply.Index = index
return nil

View File

@@ -845,10 +845,25 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) {
require.Nil(err)
require.True(out.Drain)
require.Equal(strategy.Deadline, out.DrainStrategy.Deadline)
// before+deadline should be before the forced deadline
require.True(beforeUpdate.Add(strategy.Deadline).Before(out.DrainStrategy.ForceDeadline))
// now+deadline should be after the forced deadline
require.True(time.Now().Add(strategy.Deadline).After(out.DrainStrategy.ForceDeadline))
// Register a system job
job := mock.SystemJob()
require.Nil(s1.State().UpsertJob(10, job))
// Update the eligibility and expect evals
dereg.DrainStrategy = nil
dereg.MarkEligible = true
var resp3 structs.NodeDrainUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp3))
require.NotZero(resp3.Index)
require.NotZero(resp3.EvalCreateIndex)
require.Len(resp3.EvalIDs, 1)
}
func TestClientEndpoint_UpdateDrain_ACL(t *testing.T) {
@@ -1062,20 +1077,34 @@ func TestClientEndpoint_UpdateEligibility(t *testing.T) {
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
// Update the eligibility
dereg := &structs.NodeUpdateEligibilityRequest{
elig := &structs.NodeUpdateEligibilityRequest{
NodeID: node.ID,
Eligibility: structs.NodeSchedulingIneligible,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.GenericResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp2))
var resp2 structs.NodeEligibilityUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", elig, &resp2))
require.NotZero(resp2.Index)
require.Zero(resp2.EvalCreateIndex)
require.Empty(resp2.EvalIDs)
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.NodeByID(nil, node.ID)
require.Nil(err)
require.Equal(out.SchedulingEligibility, structs.NodeSchedulingIneligible)
// Register a system job
job := mock.SystemJob()
require.Nil(s1.State().UpsertJob(10, job))
// Update the eligibility and expect evals
elig.Eligibility = structs.NodeSchedulingEligible
var resp3 structs.NodeEligibilityUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", elig, &resp3))
require.NotZero(resp3.Index)
require.NotZero(resp3.EvalCreateIndex)
require.Len(resp3.EvalIDs, 1)
}
func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) {
@@ -1103,7 +1132,7 @@ func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) {
WriteRequest: structs.WriteRequest{Region: "global"},
}
{
var resp structs.GenericResponse
var resp structs.NodeEligibilityUpdateResponse
err := msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp)
require.NotNil(err, "RPC")
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
@@ -1112,14 +1141,14 @@ func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) {
// Try with a valid token
dereg.AuthToken = validToken.SecretID
{
var resp structs.GenericResponse
var resp structs.NodeEligibilityUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp), "RPC")
}
// Try with a invalid token
dereg.AuthToken = invalidToken.SecretID
{
var resp structs.GenericResponse
var resp structs.NodeEligibilityUpdateResponse
err := msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp)
require.NotNil(err, "RPC")
require.Equal(err.Error(), structs.ErrPermissionDenied.Error())
@@ -1128,7 +1157,7 @@ func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) {
// Try with a root token
dereg.AuthToken = root.SecretID
{
var resp structs.GenericResponse
var resp structs.NodeEligibilityUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp), "RPC")
}
}

View File

@@ -929,12 +929,17 @@ type NodeUpdateResponse struct {
// NodeDrainUpdateResponse is used to respond to a node drain update
type NodeDrainUpdateResponse struct {
NodeModifyIndex uint64
QueryMeta
// Deprecated in Nomad 0.8 as an evaluation is not immediately created but
// is instead handled by the drainer.
EvalIDs []string
EvalCreateIndex uint64
WriteMeta
}
// NodeEligibilityUpdateResponse is used to respond to a node eligibility update
type NodeEligibilityUpdateResponse struct {
NodeModifyIndex uint64
EvalIDs []string
EvalCreateIndex uint64
WriteMeta
}
// NodeAllocsResponse is used to return allocs for a single node