nomad: client updates generate evaluaitons

This commit is contained in:
Armon Dadgar
2015-08-06 16:39:20 -07:00
parent 91547572dd
commit 7ff9697b22
5 changed files with 215 additions and 8 deletions

View File

@@ -14,7 +14,7 @@ type Client struct {
}
// Register is used to upsert a client that is available for scheduling
func (c *Client) Register(args *structs.NodeRegisterRequest, reply *structs.GenericResponse) error {
func (c *Client) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error {
if done, err := c.srv.forward("Client.Register", args, args, reply); done {
return err
}
@@ -38,6 +38,9 @@ func (c *Client) Register(args *structs.NodeRegisterRequest, reply *structs.Gene
if args.Node.Status == "" {
args.Node.Status = structs.NodeStatusInit
}
if !structs.ValidNodeStatus(args.Node.Status) {
return fmt.Errorf("invalid status for node")
}
// Commit this update via Raft
_, index, err := c.srv.raftApply(structs.NodeRegisterRequestType, args)
@@ -45,6 +48,18 @@ func (c *Client) Register(args *structs.NodeRegisterRequest, reply *structs.Gene
c.srv.logger.Printf("[ERR] nomad.client: Register failed: %v", err)
return err
}
reply.NodeModifyIndex = index
// Check if we should trigger evaluations
if structs.ShouldEvaluateNode(args.Node.Status) {
evalIDs, evalIndex, err := c.createNodeEvals(args.Node.ID, index)
if err != nil {
c.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err)
return err
}
reply.EvalIDs = evalIDs
reply.EvalCreateIndex = evalIndex
}
// Set the reply index
reply.Index = index
@@ -53,7 +68,7 @@ func (c *Client) Register(args *structs.NodeRegisterRequest, reply *structs.Gene
// Deregister is used to remove a client from the client. If a client should
// just be made unavailable for scheduling, a status update is prefered.
func (c *Client) Deregister(args *structs.NodeDeregisterRequest, reply *structs.GenericResponse) error {
func (c *Client) Deregister(args *structs.NodeDeregisterRequest, reply *structs.NodeUpdateResponse) error {
if done, err := c.srv.forward("Client.Deregister", args, args, reply); done {
return err
}
@@ -71,13 +86,23 @@ func (c *Client) Deregister(args *structs.NodeDeregisterRequest, reply *structs.
return err
}
// Set the reply index
// Create the evaluations for this node
evalIDs, evalIndex, err := c.createNodeEvals(args.NodeID, index)
if err != nil {
c.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err)
return err
}
// Setup the reply
reply.EvalIDs = evalIDs
reply.EvalCreateIndex = evalIndex
reply.NodeModifyIndex = index
reply.Index = index
return nil
}
// UpdateStatus is used to update the status of a client node
func (c *Client) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.GenericResponse) error {
func (c *Client) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.NodeUpdateResponse) error {
if done, err := c.srv.forward("Client.UpdateStatus", args, args, reply); done {
return err
}
@@ -87,10 +112,7 @@ func (c *Client) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *stru
if args.NodeID == "" {
return fmt.Errorf("missing node ID for client deregistration")
}
switch args.Status {
case structs.NodeStatusInit, structs.NodeStatusReady,
structs.NodeStatusMaint, structs.NodeStatusDown:
default:
if !structs.ValidNodeStatus(args.Status) {
return fmt.Errorf("invalid status for node")
}
@@ -100,6 +122,18 @@ func (c *Client) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *stru
c.srv.logger.Printf("[ERR] nomad.client: status update failed: %v", err)
return err
}
reply.NodeModifyIndex = index
// Check if we should trigger evaluations
if structs.ShouldEvaluateNode(args.Status) {
evalIDs, evalIndex, err := c.createNodeEvals(args.NodeID, index)
if err != nil {
c.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err)
return err
}
reply.EvalIDs = evalIDs
reply.EvalCreateIndex = evalIndex
}
// Set the reply index
reply.Index = index
@@ -146,3 +180,64 @@ func (c *Client) GetNode(args *structs.NodeSpecificRequest,
c.srv.setQueryMeta(&reply.QueryMeta)
return nil
}
// createNodeEvals is used to create evaluations for each alloc on a node.
// Each Eval is scoped to a job, so we need to potentially trigger many evals.
func (c *Client) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint64, error) {
// Snapshot the state
snap, err := c.srv.fsm.State().Snapshot()
if err != nil {
return nil, 0, fmt.Errorf("failed to snapshot state: %v", err)
}
// Find all the allocations for this node
allocs, err := snap.AllocsByNode(nodeID)
if err != nil {
return nil, 0, fmt.Errorf("failed to find allocs for '%s': %v", nodeID, err)
}
// Fast-path if nothing to do
if len(allocs) == 0 {
return nil, 0, nil
}
// Create an eval for each JobID affected
var evals []*structs.Evaluation
var evalIDs []string
jobIDs := make(map[string]struct{})
for _, alloc := range allocs {
// Deduplicate on JobID
if _, ok := jobIDs[alloc.JobID]; ok {
continue
}
jobIDs[alloc.JobID] = struct{}{}
// Create a new eval
eval := &structs.Evaluation{
ID: generateUUID(),
Priority: alloc.Job.Priority,
Type: alloc.Job.Type,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: alloc.JobID,
NodeID: nodeID,
NodeModifyIndex: nodeIndex,
Status: structs.EvalStatusPending,
}
evals = append(evals, eval)
evalIDs = append(evalIDs, eval.ID)
}
// Create the Raft transaction
update := &structs.EvalUpdateRequest{
Evals: evals,
WriteRequest: structs.WriteRequest{Region: c.srv.config.Region},
}
// Commit this evaluation via Raft
_, evalIndex, err := c.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
return nil, 0, err
}
return evalIDs, evalIndex, nil
}

View File

@@ -185,3 +185,63 @@ func TestClientEndpoint_GetNode(t *testing.T) {
t.Fatalf("unexpected node")
}
}
func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Inject fake evaluations
alloc := mockAlloc()
state := s1.fsm.State()
err := state.UpdateAllocations(1, nil, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
// Create some evaluations
ids, index, err := s1.endpoints.Client.createNodeEvals(alloc.NodeID, 1)
if err != nil {
t.Fatalf("err: %v", err)
}
if index == 0 {
t.Fatalf("bad: %d", index)
}
if len(ids) != 1 {
t.Fatalf("bad: %s", ids)
}
// Lookup the evaluation
eval, err := state.GetEvalByID(ids[0])
if err != nil {
t.Fatalf("err: %v", err)
}
if eval == nil {
t.Fatalf("expected eval")
}
if eval.CreateIndex != index {
t.Fatalf("index mis-match")
}
if eval.Priority != alloc.Job.Priority {
t.Fatalf("bad: %#v", eval)
}
if eval.Type != alloc.Job.Type {
t.Fatalf("bad: %#v", eval)
}
if eval.TriggeredBy != structs.EvalTriggerNodeUpdate {
t.Fatalf("bad: %#v", eval)
}
if eval.JobID != alloc.JobID {
t.Fatalf("bad: %#v", eval)
}
if eval.NodeID != alloc.NodeID {
t.Fatalf("bad: %#v", eval)
}
if eval.NodeModifyIndex != 1 {
t.Fatalf("bad: %#v", eval)
}
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
}

View File

@@ -71,6 +71,9 @@ func TestJobEndpoint_Register(t *testing.T) {
if eval.JobModifyIndex != resp.JobModifyIndex {
t.Fatalf("bad: %#v", eval)
}
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
}
func TestJobEndpoint_Register_Existing(t *testing.T) {
@@ -152,6 +155,9 @@ func TestJobEndpoint_Register_Existing(t *testing.T) {
if eval.JobModifyIndex != resp.JobModifyIndex {
t.Fatalf("bad: %#v", eval)
}
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
}
func TestJobEndpoint_Deregister(t *testing.T) {
@@ -223,6 +229,9 @@ func TestJobEndpoint_Deregister(t *testing.T) {
if eval.JobModifyIndex != resp2.JobModifyIndex {
t.Fatalf("bad: %#v", eval)
}
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
}
func TestJobEndpoint_GetJob(t *testing.T) {

View File

@@ -139,7 +139,9 @@ func mockAlloc() *structs.Allocation {
},
},
},
Job: mockJob(),
}
alloc.JobID = alloc.Job.ID
return alloc
}

View File

@@ -225,6 +225,14 @@ type JobDeregisterResponse struct {
QueryMeta
}
// NodeUpdateResponse is used to respond to a node update
type NodeUpdateResponse struct {
EvalIDs []string
EvalCreateIndex uint64
NodeModifyIndex uint64
QueryMeta
}
// SingleNodeResponse is used to return a single node
type SingleNodeResponse struct {
Node *Node
@@ -253,9 +261,34 @@ const (
NodeStatusInit = "initializing"
NodeStatusReady = "ready"
NodeStatusMaint = "maintenance"
NodeStatusDrain = "drain"
NodeStatusDown = "down"
)
// ShouldEvaluateNode checks if a given node status should trigger an
// evaluation. Some states don't require any further action.
func ShouldEvaluateNode(status string) bool {
switch status {
case NodeStatusInit, NodeStatusReady, NodeStatusMaint:
return false
case NodeStatusDrain, NodeStatusDown:
return true
default:
panic(fmt.Sprintf("unhandled node status %s", status))
}
}
// ValidNodeStatus is used to check if a node status is valid
func ValidNodeStatus(status string) bool {
switch status {
case NodeStatusInit, NodeStatusReady,
NodeStatusMaint, NodeStatusDrain, NodeStatusDown:
return true
default:
return false
}
}
// Node is a representation of a schedulable client node
type Node struct {
// ID is a unique identifier for the node. It can be constructed
@@ -624,6 +657,7 @@ const (
const (
EvalTriggerJobRegister = "job-register"
EvalTriggerJobDeregister = "job-deregister"
EvalTriggerNodeUpdate = "node-update"
)
// Evaluation is used anytime we need to apply business logic as a result
@@ -656,6 +690,13 @@ type Evaluation struct {
// the evaluation was created
JobModifyIndex uint64
// NodeID is the node that was affected triggering the evaluation.
NodeID string
// NodeModifyIndex is the modify index of the node at the time
// the evaluation was created
NodeModifyIndex uint64
// Status of the evaluation
Status string