From 7ff9697b22bfb2690e91a7d24a40a014552bdbb5 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 6 Aug 2015 16:39:20 -0700 Subject: [PATCH] nomad: client updates generate evaluaitons --- nomad/client_endpoint.go | 111 +++++++++++++++++++++++++++++++--- nomad/client_endpoint_test.go | 60 ++++++++++++++++++ nomad/job_endpoint_test.go | 9 +++ nomad/state_store_test.go | 2 + nomad/structs/structs.go | 41 +++++++++++++ 5 files changed, 215 insertions(+), 8 deletions(-) diff --git a/nomad/client_endpoint.go b/nomad/client_endpoint.go index aecd16495..3b9abe338 100644 --- a/nomad/client_endpoint.go +++ b/nomad/client_endpoint.go @@ -14,7 +14,7 @@ type Client struct { } // Register is used to upsert a client that is available for scheduling -func (c *Client) Register(args *structs.NodeRegisterRequest, reply *structs.GenericResponse) error { +func (c *Client) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error { if done, err := c.srv.forward("Client.Register", args, args, reply); done { return err } @@ -38,6 +38,9 @@ func (c *Client) Register(args *structs.NodeRegisterRequest, reply *structs.Gene if args.Node.Status == "" { args.Node.Status = structs.NodeStatusInit } + if !structs.ValidNodeStatus(args.Node.Status) { + return fmt.Errorf("invalid status for node") + } // Commit this update via Raft _, index, err := c.srv.raftApply(structs.NodeRegisterRequestType, args) @@ -45,6 +48,18 @@ func (c *Client) Register(args *structs.NodeRegisterRequest, reply *structs.Gene c.srv.logger.Printf("[ERR] nomad.client: Register failed: %v", err) return err } + reply.NodeModifyIndex = index + + // Check if we should trigger evaluations + if structs.ShouldEvaluateNode(args.Node.Status) { + evalIDs, evalIndex, err := c.createNodeEvals(args.Node.ID, index) + if err != nil { + c.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) + return err + } + reply.EvalIDs = evalIDs + reply.EvalCreateIndex = evalIndex + } // Set the reply index reply.Index = index @@ -53,7 +68,7 @@ func (c *Client) Register(args *structs.NodeRegisterRequest, reply *structs.Gene // Deregister is used to remove a client from the client. If a client should // just be made unavailable for scheduling, a status update is prefered. -func (c *Client) Deregister(args *structs.NodeDeregisterRequest, reply *structs.GenericResponse) error { +func (c *Client) Deregister(args *structs.NodeDeregisterRequest, reply *structs.NodeUpdateResponse) error { if done, err := c.srv.forward("Client.Deregister", args, args, reply); done { return err } @@ -71,13 +86,23 @@ func (c *Client) Deregister(args *structs.NodeDeregisterRequest, reply *structs. return err } - // Set the reply index + // Create the evaluations for this node + evalIDs, evalIndex, err := c.createNodeEvals(args.NodeID, index) + if err != nil { + c.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) + return err + } + + // Setup the reply + reply.EvalIDs = evalIDs + reply.EvalCreateIndex = evalIndex + reply.NodeModifyIndex = index reply.Index = index return nil } // UpdateStatus is used to update the status of a client node -func (c *Client) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.GenericResponse) error { +func (c *Client) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.NodeUpdateResponse) error { if done, err := c.srv.forward("Client.UpdateStatus", args, args, reply); done { return err } @@ -87,10 +112,7 @@ func (c *Client) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *stru if args.NodeID == "" { return fmt.Errorf("missing node ID for client deregistration") } - switch args.Status { - case structs.NodeStatusInit, structs.NodeStatusReady, - structs.NodeStatusMaint, structs.NodeStatusDown: - default: + if !structs.ValidNodeStatus(args.Status) { return fmt.Errorf("invalid status for node") } @@ -100,6 +122,18 @@ func (c *Client) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *stru c.srv.logger.Printf("[ERR] nomad.client: status update failed: %v", err) return err } + reply.NodeModifyIndex = index + + // Check if we should trigger evaluations + if structs.ShouldEvaluateNode(args.Status) { + evalIDs, evalIndex, err := c.createNodeEvals(args.NodeID, index) + if err != nil { + c.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) + return err + } + reply.EvalIDs = evalIDs + reply.EvalCreateIndex = evalIndex + } // Set the reply index reply.Index = index @@ -146,3 +180,64 @@ func (c *Client) GetNode(args *structs.NodeSpecificRequest, c.srv.setQueryMeta(&reply.QueryMeta) return nil } + +// createNodeEvals is used to create evaluations for each alloc on a node. +// Each Eval is scoped to a job, so we need to potentially trigger many evals. +func (c *Client) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint64, error) { + // Snapshot the state + snap, err := c.srv.fsm.State().Snapshot() + if err != nil { + return nil, 0, fmt.Errorf("failed to snapshot state: %v", err) + } + + // Find all the allocations for this node + allocs, err := snap.AllocsByNode(nodeID) + if err != nil { + return nil, 0, fmt.Errorf("failed to find allocs for '%s': %v", nodeID, err) + } + + // Fast-path if nothing to do + if len(allocs) == 0 { + return nil, 0, nil + } + + // Create an eval for each JobID affected + var evals []*structs.Evaluation + var evalIDs []string + jobIDs := make(map[string]struct{}) + + for _, alloc := range allocs { + // Deduplicate on JobID + if _, ok := jobIDs[alloc.JobID]; ok { + continue + } + jobIDs[alloc.JobID] = struct{}{} + + // Create a new eval + eval := &structs.Evaluation{ + ID: generateUUID(), + Priority: alloc.Job.Priority, + Type: alloc.Job.Type, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: alloc.JobID, + NodeID: nodeID, + NodeModifyIndex: nodeIndex, + Status: structs.EvalStatusPending, + } + evals = append(evals, eval) + evalIDs = append(evalIDs, eval.ID) + } + + // Create the Raft transaction + update := &structs.EvalUpdateRequest{ + Evals: evals, + WriteRequest: structs.WriteRequest{Region: c.srv.config.Region}, + } + + // Commit this evaluation via Raft + _, evalIndex, err := c.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + return nil, 0, err + } + return evalIDs, evalIndex, nil +} diff --git a/nomad/client_endpoint_test.go b/nomad/client_endpoint_test.go index f56cc415d..a89b69449 100644 --- a/nomad/client_endpoint_test.go +++ b/nomad/client_endpoint_test.go @@ -185,3 +185,63 @@ func TestClientEndpoint_GetNode(t *testing.T) { t.Fatalf("unexpected node") } } + +func TestClientEndpoint_CreateNodeEvals(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Inject fake evaluations + alloc := mockAlloc() + state := s1.fsm.State() + err := state.UpdateAllocations(1, nil, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create some evaluations + ids, index, err := s1.endpoints.Client.createNodeEvals(alloc.NodeID, 1) + if err != nil { + t.Fatalf("err: %v", err) + } + if index == 0 { + t.Fatalf("bad: %d", index) + } + if len(ids) != 1 { + t.Fatalf("bad: %s", ids) + } + + // Lookup the evaluation + eval, err := state.GetEvalByID(ids[0]) + if err != nil { + t.Fatalf("err: %v", err) + } + if eval == nil { + t.Fatalf("expected eval") + } + if eval.CreateIndex != index { + t.Fatalf("index mis-match") + } + + if eval.Priority != alloc.Job.Priority { + t.Fatalf("bad: %#v", eval) + } + if eval.Type != alloc.Job.Type { + t.Fatalf("bad: %#v", eval) + } + if eval.TriggeredBy != structs.EvalTriggerNodeUpdate { + t.Fatalf("bad: %#v", eval) + } + if eval.JobID != alloc.JobID { + t.Fatalf("bad: %#v", eval) + } + if eval.NodeID != alloc.NodeID { + t.Fatalf("bad: %#v", eval) + } + if eval.NodeModifyIndex != 1 { + t.Fatalf("bad: %#v", eval) + } + if eval.Status != structs.EvalStatusPending { + t.Fatalf("bad: %#v", eval) + } +} diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index f14cab699..2541e5f80 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -71,6 +71,9 @@ func TestJobEndpoint_Register(t *testing.T) { if eval.JobModifyIndex != resp.JobModifyIndex { t.Fatalf("bad: %#v", eval) } + if eval.Status != structs.EvalStatusPending { + t.Fatalf("bad: %#v", eval) + } } func TestJobEndpoint_Register_Existing(t *testing.T) { @@ -152,6 +155,9 @@ func TestJobEndpoint_Register_Existing(t *testing.T) { if eval.JobModifyIndex != resp.JobModifyIndex { t.Fatalf("bad: %#v", eval) } + if eval.Status != structs.EvalStatusPending { + t.Fatalf("bad: %#v", eval) + } } func TestJobEndpoint_Deregister(t *testing.T) { @@ -223,6 +229,9 @@ func TestJobEndpoint_Deregister(t *testing.T) { if eval.JobModifyIndex != resp2.JobModifyIndex { t.Fatalf("bad: %#v", eval) } + if eval.Status != structs.EvalStatusPending { + t.Fatalf("bad: %#v", eval) + } } func TestJobEndpoint_GetJob(t *testing.T) { diff --git a/nomad/state_store_test.go b/nomad/state_store_test.go index 69a84e18b..b52db81a1 100644 --- a/nomad/state_store_test.go +++ b/nomad/state_store_test.go @@ -139,7 +139,9 @@ func mockAlloc() *structs.Allocation { }, }, }, + Job: mockJob(), } + alloc.JobID = alloc.Job.ID return alloc } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a7546b581..d1bb8c91f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -225,6 +225,14 @@ type JobDeregisterResponse struct { QueryMeta } +// NodeUpdateResponse is used to respond to a node update +type NodeUpdateResponse struct { + EvalIDs []string + EvalCreateIndex uint64 + NodeModifyIndex uint64 + QueryMeta +} + // SingleNodeResponse is used to return a single node type SingleNodeResponse struct { Node *Node @@ -253,9 +261,34 @@ const ( NodeStatusInit = "initializing" NodeStatusReady = "ready" NodeStatusMaint = "maintenance" + NodeStatusDrain = "drain" NodeStatusDown = "down" ) +// ShouldEvaluateNode checks if a given node status should trigger an +// evaluation. Some states don't require any further action. +func ShouldEvaluateNode(status string) bool { + switch status { + case NodeStatusInit, NodeStatusReady, NodeStatusMaint: + return false + case NodeStatusDrain, NodeStatusDown: + return true + default: + panic(fmt.Sprintf("unhandled node status %s", status)) + } +} + +// ValidNodeStatus is used to check if a node status is valid +func ValidNodeStatus(status string) bool { + switch status { + case NodeStatusInit, NodeStatusReady, + NodeStatusMaint, NodeStatusDrain, NodeStatusDown: + return true + default: + return false + } +} + // Node is a representation of a schedulable client node type Node struct { // ID is a unique identifier for the node. It can be constructed @@ -624,6 +657,7 @@ const ( const ( EvalTriggerJobRegister = "job-register" EvalTriggerJobDeregister = "job-deregister" + EvalTriggerNodeUpdate = "node-update" ) // Evaluation is used anytime we need to apply business logic as a result @@ -656,6 +690,13 @@ type Evaluation struct { // the evaluation was created JobModifyIndex uint64 + // NodeID is the node that was affected triggering the evaluation. + NodeID string + + // NodeModifyIndex is the modify index of the node at the time + // the evaluation was created + NodeModifyIndex uint64 + // Status of the evaluation Status string