From d565fb3b54f1594dbdf87a75dce137e27df9eb32 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 3 Jul 2015 18:41:36 -0700 Subject: [PATCH] nomad: FSM support for node updates --- nomad/client_endpoint.go | 2 +- nomad/fsm.go | 52 +++++++++++++++ nomad/fsm_test.go | 139 +++++++++++++++++++++++++++++++++++++++ nomad/structs/structs.go | 22 ++++++- 4 files changed, 212 insertions(+), 3 deletions(-) create mode 100644 nomad/fsm_test.go diff --git a/nomad/client_endpoint.go b/nomad/client_endpoint.go index fb2477e38..8bfc62560 100644 --- a/nomad/client_endpoint.go +++ b/nomad/client_endpoint.go @@ -14,7 +14,7 @@ type Client struct { } // Register is used to upsert a client that is available for scheduling -func (c *Client) Register(args *structs.RegisterRequest, reply *structs.RegisterResponse) error { +func (c *Client) Register(args *structs.RegisterRequest, reply *structs.GenericResponse) error { if done, err := c.srv.forward("Client.Register", args, args, reply); done { return err } diff --git a/nomad/fsm.go b/nomad/fsm.go index 52c9fbfc2..1a9b72123 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -67,6 +67,12 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { } switch msgType { + case structs.RegisterRequestType: + return n.decodeRegister(buf[1:], log.Index) + case structs.DeregisterRequestType: + return n.applyDeregister(buf[1:], log.Index) + case structs.NodeUpdateStatusRequestType: + return n.applyStatusUpdate(buf[1:], log.Index) default: if ignoreUnknown { n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) @@ -76,6 +82,52 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { } } } + +func (n *nomadFSM) decodeRegister(buf []byte, index uint64) interface{} { + var req structs.RegisterRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + return n.applyRegister(&req, index) +} + +func (n *nomadFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "register"}, time.Now()) + if err := n.state.RegisterNode(index, req.Node); err != nil { + n.logger.Printf("[ERR] nomad.fsm: RegisterNode failed: %v", err) + return err + } + return nil +} + +func (n *nomadFSM) applyDeregister(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister"}, time.Now()) + var req structs.DeregisterRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.DeregisterNode(index, req.NodeID); err != nil { + n.logger.Printf("[ERR] nomad.fsm: DeregisterNode failed: %v", err) + return err + } + return nil +} + +func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "node_status_update"}, time.Now()) + var req structs.UpdateStatusRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status); err != nil { + n.logger.Printf("[ERR] nomad.fsm: UpdateNodeStatus failed: %v", err) + return err + } + return nil +} + func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { // Create a new snapshot snap, err := n.state.Snapshot() diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go new file mode 100644 index 000000000..a2376ea68 --- /dev/null +++ b/nomad/fsm_test.go @@ -0,0 +1,139 @@ +package nomad + +import ( + "os" + "testing" + + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/raft" +) + +func testFSM(t *testing.T) *nomadFSM { + fsm, err := NewFSM(os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + if fsm == nil { + t.Fatalf("missing fsm") + } + return fsm +} + +func makeLog(buf []byte) *raft.Log { + return &raft.Log{ + Index: 1, + Term: 1, + Type: raft.LogCommand, + Data: buf, + } +} + +func TestFSM_RegisterNode(t *testing.T) { + fsm := testFSM(t) + + req := structs.RegisterRequest{ + Node: mockNode(), + } + buf, err := structs.Encode(structs.RegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + node, err := fsm.State().GetNodeByID(req.Node.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if node == nil { + t.Fatalf("not found!") + } + if node.CreateIndex != 1 { + t.Fatalf("bad index: %d", node.CreateIndex) + } +} + +func TestFSM_DeregisterNode(t *testing.T) { + fsm := testFSM(t) + + node := mockNode() + req := structs.RegisterRequest{ + Node: node, + } + buf, err := structs.Encode(structs.RegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + req2 := structs.DeregisterRequest{ + NodeID: node.ID, + } + buf, err = structs.Encode(structs.DeregisterRequestType, req2) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are NOT registered + node, err = fsm.State().GetNodeByID(req.Node.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if node != nil { + t.Fatalf("node found!") + } +} + +func TestFSM_UpdateNodeStatus(t *testing.T) { + fsm := testFSM(t) + + node := mockNode() + req := structs.RegisterRequest{ + Node: node, + } + buf, err := structs.Encode(structs.RegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + req2 := structs.UpdateStatusRequest{ + NodeID: node.ID, + Status: structs.NodeStatusReady, + } + buf, err = structs.Encode(structs.NodeUpdateStatusRequestType, req2) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are NOT registered + node, err = fsm.State().GetNodeByID(req.Node.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if node.Status != structs.NodeStatusReady { + t.Fatalf("bad node: %#v", node) + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 62644468b..e2bc1763a 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -17,6 +17,8 @@ type MessageType uint8 const ( RegisterRequestType MessageType = iota + DeregisterRequestType + NodeUpdateStatusRequestType ) const ( @@ -113,8 +115,24 @@ type RegisterRequest struct { WriteRequest } -// RegisterResponse is used to respond to a register request -type RegisterResponse struct { +// DeregisterRequest is used for Client.Deregister endpoint +// to deregister a node as being a schedulable entity. +type DeregisterRequest struct { + NodeID string + WriteRequest +} + +// UpdateStatusRequest is used for Client.UpdateStatus endpoint +// to update the status of a node. +type UpdateStatusRequest struct { + NodeID string + Status string + WriteRequest +} + +// GenericResponse is used to respond to a request where no +// specific response information is needed. +type GenericResponse struct { WriteMeta }