From 9f76b6a9a0ac9407ccfcbd42a7ea6ec51dd8d2d5 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 6 Sep 2015 19:55:38 -0700 Subject: [PATCH] nomad: adding FSM support for node drain update --- nomad/fsm.go | 16 ++++++++++++++++ nomad/fsm_test.go | 41 ++++++++++++++++++++++++++++++++++++++++ nomad/structs/structs.go | 10 +++++++++- 3 files changed, 66 insertions(+), 1 deletion(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 24d08f6ea..94caf1b8a 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -120,6 +120,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyDeregisterNode(buf[1:], log.Index) case structs.NodeUpdateStatusRequestType: return n.applyStatusUpdate(buf[1:], log.Index) + case structs.NodeUpdateDrainRequestType: + return n.applyDrainUpdate(buf[1:], log.Index) case structs.JobRegisterRequestType: return n.applyRegisterJob(buf[1:], log.Index) case structs.JobDeregisterRequestType: @@ -184,6 +186,20 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} { return nil } +func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "node_drain_update"}, time.Now()) + var req structs.NodeUpdateDrainRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.UpdateNodeDrain(index, req.NodeID, req.Drain); err != nil { + n.logger.Printf("[ERR] nomad.fsm: UpdateNodeDrain failed: %v", err) + return err + } + return nil +} + func (n *nomadFSM) applyRegisterJob(buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "register_job"}, time.Now()) var req structs.JobRegisterRequest diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index fe714b7fb..278595bea 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -178,6 +178,47 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { } } +func TestFSM_UpdateNodeDrain(t *testing.T) { + fsm := testFSM(t) + + node := mock.Node() + req := structs.NodeRegisterRequest{ + Node: node, + } + buf, err := structs.Encode(structs.NodeRegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + req2 := structs.NodeUpdateDrainRequest{ + NodeID: node.ID, + Drain: true, + } + buf, err = structs.Encode(structs.NodeUpdateDrainRequestType, req2) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are NOT registered + node, err = fsm.State().GetNodeByID(req.Node.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if !node.Drain { + t.Fatalf("bad node: %#v", node) + } +} + func TestFSM_RegisterJob(t *testing.T) { fsm := testFSM(t) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e1200bc91..7af1a253d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -19,6 +19,7 @@ const ( NodeRegisterRequestType MessageType = iota NodeDeregisterRequestType NodeUpdateStatusRequestType + NodeUpdateDrainRequestType JobRegisterRequestType JobDeregisterRequestType EvalUpdateRequestType @@ -128,7 +129,7 @@ type NodeDeregisterRequest struct { WriteRequest } -// UpdateStatusRequest is used for Client.UpdateStatus endpoint +// NodeUpdateStatusRequest is used for Client.UpdateStatus endpoint // to update the status of a node. type NodeUpdateStatusRequest struct { NodeID string @@ -136,6 +137,13 @@ type NodeUpdateStatusRequest struct { WriteRequest } +// NodeUpdateDrainRequest is used for updatin the drain status +type NodeUpdateDrainRequest struct { + NodeID string + Drain bool + WriteRequest +} + // NodeEvaluateRequest is used to re-evaluate the ndoe type NodeEvaluateRequest struct { NodeID string