nomad: FSM support for node updates

This commit is contained in:
Armon Dadgar
2015-07-03 18:41:36 -07:00
parent c569016b52
commit d565fb3b54
4 changed files with 212 additions and 3 deletions

View File

@@ -14,7 +14,7 @@ type Client struct {
}
// Register is used to upsert a client that is available for scheduling
func (c *Client) Register(args *structs.RegisterRequest, reply *structs.RegisterResponse) error {
func (c *Client) Register(args *structs.RegisterRequest, reply *structs.GenericResponse) error {
if done, err := c.srv.forward("Client.Register", args, args, reply); done {
return err
}

View File

@@ -67,6 +67,12 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
}
switch msgType {
case structs.RegisterRequestType:
return n.decodeRegister(buf[1:], log.Index)
case structs.DeregisterRequestType:
return n.applyDeregister(buf[1:], log.Index)
case structs.NodeUpdateStatusRequestType:
return n.applyStatusUpdate(buf[1:], log.Index)
default:
if ignoreUnknown {
n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
@@ -76,6 +82,52 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
}
}
}
func (n *nomadFSM) decodeRegister(buf []byte, index uint64) interface{} {
var req structs.RegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
return n.applyRegister(&req, index)
}
func (n *nomadFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "register"}, time.Now())
if err := n.state.RegisterNode(index, req.Node); err != nil {
n.logger.Printf("[ERR] nomad.fsm: RegisterNode failed: %v", err)
return err
}
return nil
}
func (n *nomadFSM) applyDeregister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister"}, time.Now())
var req structs.DeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeregisterNode(index, req.NodeID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: DeregisterNode failed: %v", err)
return err
}
return nil
}
func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_status_update"}, time.Now())
var req structs.UpdateStatusRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateNodeStatus failed: %v", err)
return err
}
return nil
}
func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
// Create a new snapshot
snap, err := n.state.Snapshot()

139
nomad/fsm_test.go Normal file
View File

@@ -0,0 +1,139 @@
package nomad
import (
"os"
"testing"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
)
func testFSM(t *testing.T) *nomadFSM {
fsm, err := NewFSM(os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
if fsm == nil {
t.Fatalf("missing fsm")
}
return fsm
}
func makeLog(buf []byte) *raft.Log {
return &raft.Log{
Index: 1,
Term: 1,
Type: raft.LogCommand,
Data: buf,
}
}
func TestFSM_RegisterNode(t *testing.T) {
fsm := testFSM(t)
req := structs.RegisterRequest{
Node: mockNode(),
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
node, err := fsm.State().GetNodeByID(req.Node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
t.Fatalf("not found!")
}
if node.CreateIndex != 1 {
t.Fatalf("bad index: %d", node.CreateIndex)
}
}
func TestFSM_DeregisterNode(t *testing.T) {
fsm := testFSM(t)
node := mockNode()
req := structs.RegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
req2 := structs.DeregisterRequest{
NodeID: node.ID,
}
buf, err = structs.Encode(structs.DeregisterRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are NOT registered
node, err = fsm.State().GetNodeByID(req.Node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
t.Fatalf("node found!")
}
}
func TestFSM_UpdateNodeStatus(t *testing.T) {
fsm := testFSM(t)
node := mockNode()
req := structs.RegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
req2 := structs.UpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
}
buf, err = structs.Encode(structs.NodeUpdateStatusRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are NOT registered
node, err = fsm.State().GetNodeByID(req.Node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if node.Status != structs.NodeStatusReady {
t.Fatalf("bad node: %#v", node)
}
}

View File

@@ -17,6 +17,8 @@ type MessageType uint8
const (
RegisterRequestType MessageType = iota
DeregisterRequestType
NodeUpdateStatusRequestType
)
const (
@@ -113,8 +115,24 @@ type RegisterRequest struct {
WriteRequest
}
// RegisterResponse is used to respond to a register request
type RegisterResponse struct {
// DeregisterRequest is used for Client.Deregister endpoint
// to deregister a node as being a schedulable entity.
type DeregisterRequest struct {
NodeID string
WriteRequest
}
// UpdateStatusRequest is used for Client.UpdateStatus endpoint
// to update the status of a node.
type UpdateStatusRequest struct {
NodeID string
Status string
WriteRequest
}
// GenericResponse is used to respond to a request where no
// specific response information is needed.
type GenericResponse struct {
WriteMeta
}