From aff85141ec5ecf4c880c839e71f76204d6b69918 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 6 Sep 2015 20:00:12 -0700 Subject: [PATCH] nomad: adding client drain endpoint --- nomad/client_endpoint.go | 53 +++++++++++++++++++++++++++++++++++ nomad/client_endpoint_test.go | 44 +++++++++++++++++++++++++++++ nomad/structs/structs.go | 8 ++++++ 3 files changed, 105 insertions(+) diff --git a/nomad/client_endpoint.go b/nomad/client_endpoint.go index 3a362ec80..dd58d9d5d 100644 --- a/nomad/client_endpoint.go +++ b/nomad/client_endpoint.go @@ -179,6 +179,59 @@ func (c *ClientEndpoint) UpdateStatus(args *structs.NodeUpdateStatusRequest, rep return nil } +// UpdateDrain is used to update the drain mode of a client node +func (c *ClientEndpoint) UpdateDrain(args *structs.NodeUpdateDrainRequest, + reply *structs.NodeDrainUpdateResponse) error { + if done, err := c.srv.forward("Client.UpdateDrain", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "client", "update_drain"}, time.Now()) + + // Verify the arguments + if args.NodeID == "" { + return fmt.Errorf("missing node ID for drain update") + } + + // Look for the node + snap, err := c.srv.fsm.State().Snapshot() + if err != nil { + return err + } + node, err := snap.GetNodeByID(args.NodeID) + if err != nil { + return err + } + if node == nil { + return fmt.Errorf("node not found") + } + + // Commit this update via Raft + var index uint64 + if node.Drain != args.Drain { + _, index, err = c.srv.raftApply(structs.NodeUpdateDrainRequestType, args) + if err != nil { + c.srv.logger.Printf("[ERR] nomad.client: drain update failed: %v", err) + return err + } + reply.NodeModifyIndex = index + } + + // Check if we should trigger evaluations + if args.Drain { + evalIDs, evalIndex, err := c.createNodeEvals(args.NodeID, index) + if err != nil { + c.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) + return err + } + reply.EvalIDs = evalIDs + reply.EvalCreateIndex = evalIndex + } + + // Set the reply index + reply.Index = index + return nil +} + // Evaluate is used to force a re-evaluation of the node func (c *ClientEndpoint) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUpdateResponse) error { if done, err := c.srv.forward("Client.Evaluate", args, args, reply); done { diff --git a/nomad/client_endpoint_test.go b/nomad/client_endpoint_test.go index 6debab14c..448fcaa6d 100644 --- a/nomad/client_endpoint_test.go +++ b/nomad/client_endpoint_test.go @@ -195,6 +195,50 @@ func TestClientEndpoint_UpdateStatus_HeartbeatOnly(t *testing.T) { } } +func TestClientEndpoint_UpdateDrain(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "region1"}, + } + + // Fetch the response + var resp structs.NodeUpdateResponse + if err := msgpackrpc.CallWithCodec(codec, "Client.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Update the status + dereg := &structs.NodeUpdateDrainRequest{ + NodeID: node.ID, + Drain: true, + WriteRequest: structs.WriteRequest{Region: "region1"}, + } + var resp2 structs.NodeDrainUpdateResponse + if err := msgpackrpc.CallWithCodec(codec, "Client.UpdateDrain", dereg, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + if resp2.Index == 0 { + t.Fatalf("bad index: %d", resp2.Index) + } + + // Check for the node in the FSM + state := s1.fsm.State() + out, err := state.GetNodeByID(node.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if !out.Drain { + t.Fatalf("bad: %#v", out) + } +} + func TestClientEndpoint_GetNode(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7af1a253d..8412b9037 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -307,6 +307,14 @@ type NodeUpdateResponse struct { QueryMeta } +// NodeDrainUpdateResponse is used to respond to a node drain update +type NodeDrainUpdateResponse struct { + EvalIDs []string + EvalCreateIndex uint64 + NodeModifyIndex uint64 + QueryMeta +} + // NodeAllocsResponse is used to return allocs for a single node type NodeAllocsResponse struct { Allocs []*Allocation