diff --git a/nomad/client_endpoint.go b/nomad/client_endpoint.go index 611d2bbd6..aecd16495 100644 --- a/nomad/client_endpoint.go +++ b/nomad/client_endpoint.go @@ -14,7 +14,7 @@ type Client struct { } // Register is used to upsert a client that is available for scheduling -func (c *Client) Register(args *structs.RegisterRequest, reply *structs.GenericResponse) error { +func (c *Client) Register(args *structs.NodeRegisterRequest, reply *structs.GenericResponse) error { if done, err := c.srv.forward("Client.Register", args, args, reply); done { return err } @@ -40,7 +40,7 @@ func (c *Client) Register(args *structs.RegisterRequest, reply *structs.GenericR } // Commit this update via Raft - _, index, err := c.srv.raftApply(structs.RegisterRequestType, args) + _, index, err := c.srv.raftApply(structs.NodeRegisterRequestType, args) if err != nil { c.srv.logger.Printf("[ERR] nomad.client: Register failed: %v", err) return err @@ -53,7 +53,7 @@ func (c *Client) Register(args *structs.RegisterRequest, reply *structs.GenericR // Deregister is used to remove a client from the client. If a client should // just be made unavailable for scheduling, a status update is prefered. -func (c *Client) Deregister(args *structs.DeregisterRequest, reply *structs.GenericResponse) error { +func (c *Client) Deregister(args *structs.NodeDeregisterRequest, reply *structs.GenericResponse) error { if done, err := c.srv.forward("Client.Deregister", args, args, reply); done { return err } @@ -65,7 +65,7 @@ func (c *Client) Deregister(args *structs.DeregisterRequest, reply *structs.Gene } // Commit this update via Raft - _, index, err := c.srv.raftApply(structs.DeregisterRequestType, args) + _, index, err := c.srv.raftApply(structs.NodeDeregisterRequestType, args) if err != nil { c.srv.logger.Printf("[ERR] nomad.client: Deregister failed: %v", err) return err @@ -77,7 +77,7 @@ func (c *Client) Deregister(args *structs.DeregisterRequest, reply *structs.Gene } // UpdateStatus is used to update the status of a client node -func (c *Client) UpdateStatus(args *structs.UpdateStatusRequest, reply *structs.GenericResponse) error { +func (c *Client) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.GenericResponse) error { if done, err := c.srv.forward("Client.UpdateStatus", args, args, reply); done { return err } diff --git a/nomad/client_endpoint_test.go b/nomad/client_endpoint_test.go index 9e6a4d6b2..f56cc415d 100644 --- a/nomad/client_endpoint_test.go +++ b/nomad/client_endpoint_test.go @@ -17,7 +17,7 @@ func TestClientEndpoint_Register(t *testing.T) { // Create the register request node := mockNode() - req := &structs.RegisterRequest{ + req := &structs.NodeRegisterRequest{ Node: node, WriteRequest: structs.WriteRequest{Region: "region1"}, } @@ -53,7 +53,7 @@ func TestClientEndpoint_Deregister(t *testing.T) { // Create the register request node := mockNode() - reg := &structs.RegisterRequest{ + reg := &structs.NodeRegisterRequest{ Node: node, WriteRequest: structs.WriteRequest{Region: "region1"}, } @@ -65,7 +65,7 @@ func TestClientEndpoint_Deregister(t *testing.T) { } // Deregister - dereg := &structs.DeregisterRequest{ + dereg := &structs.NodeDeregisterRequest{ NodeID: node.ID, WriteRequest: structs.WriteRequest{Region: "region1"}, } @@ -96,7 +96,7 @@ func TestClientEndpoint_UpdateStatus(t *testing.T) { // Create the register request node := mockNode() - reg := &structs.RegisterRequest{ + reg := &structs.NodeRegisterRequest{ Node: node, WriteRequest: structs.WriteRequest{Region: "region1"}, } @@ -108,7 +108,7 @@ func TestClientEndpoint_UpdateStatus(t *testing.T) { } // Update the status - dereg := &structs.UpdateStatusRequest{ + dereg := &structs.NodeUpdateStatusRequest{ NodeID: node.ID, Status: structs.NodeStatusReady, WriteRequest: structs.WriteRequest{Region: "region1"}, @@ -143,7 +143,7 @@ func TestClientEndpoint_GetNode(t *testing.T) { // Create the register request node := mockNode() - reg := &structs.RegisterRequest{ + reg := &structs.NodeRegisterRequest{ Node: node, WriteRequest: structs.WriteRequest{Region: "region1"}, } diff --git a/nomad/fsm.go b/nomad/fsm.go index 5247223bd..1c7c69dd5 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -88,12 +88,16 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { } switch msgType { - case structs.RegisterRequestType: - return n.decodeRegister(buf[1:], log.Index) - case structs.DeregisterRequestType: - return n.applyDeregister(buf[1:], log.Index) + case structs.NodeRegisterRequestType: + return n.applyRegisterNode(buf[1:], log.Index) + case structs.NodeDeregisterRequestType: + return n.applyDeregisterNode(buf[1:], log.Index) case structs.NodeUpdateStatusRequestType: return n.applyStatusUpdate(buf[1:], log.Index) + case structs.JobRegisterRequestType: + return n.applyRegisterJob(buf[1:], log.Index) + case structs.JobDeregisterRequestType: + return n.applyDeregisterJob(buf[1:], log.Index) default: if ignoreUnknown { n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) @@ -104,16 +108,13 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { } } -func (n *nomadFSM) decodeRegister(buf []byte, index uint64) interface{} { - var req structs.RegisterRequest +func (n *nomadFSM) applyRegisterNode(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "register_node"}, time.Now()) + var req structs.NodeRegisterRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - return n.applyRegister(&req, index) -} -func (n *nomadFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} { - defer metrics.MeasureSince([]string{"nomad", "fsm", "register"}, time.Now()) if err := n.state.RegisterNode(index, req.Node); err != nil { n.logger.Printf("[ERR] nomad.fsm: RegisterNode failed: %v", err) return err @@ -121,9 +122,9 @@ func (n *nomadFSM) applyRegister(req *structs.RegisterRequest, index uint64) int return nil } -func (n *nomadFSM) applyDeregister(buf []byte, index uint64) interface{} { - defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister"}, time.Now()) - var req structs.DeregisterRequest +func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_node"}, time.Now()) + var req structs.NodeDeregisterRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } @@ -137,7 +138,7 @@ func (n *nomadFSM) applyDeregister(buf []byte, index uint64) interface{} { func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "node_status_update"}, time.Now()) - var req structs.UpdateStatusRequest + var req structs.NodeUpdateStatusRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } @@ -149,6 +150,34 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} { return nil } +func (n *nomadFSM) applyRegisterJob(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "register_job"}, time.Now()) + var req structs.JobRegisterRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.RegisterJob(index, req.Job); err != nil { + n.logger.Printf("[ERR] nomad.fsm: RegisterJob failed: %v", err) + return err + } + return nil +} + +func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_job"}, time.Now()) + var req structs.JobDeregisterRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.DeregisterJob(index, req.JobName); err != nil { + n.logger.Printf("[ERR] nomad.fsm: DeregisterJob failed: %v", err) + return err + } + return nil +} + func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { // Create a new snapshot snap, err := n.state.Snapshot() diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 82db0fcc8..609fa9fff 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -51,10 +51,10 @@ func makeLog(buf []byte) *raft.Log { func TestFSM_RegisterNode(t *testing.T) { fsm := testFSM(t) - req := structs.RegisterRequest{ + req := structs.NodeRegisterRequest{ Node: mockNode(), } - buf, err := structs.Encode(structs.RegisterRequestType, req) + buf, err := structs.Encode(structs.NodeRegisterRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -81,10 +81,10 @@ func TestFSM_DeregisterNode(t *testing.T) { fsm := testFSM(t) node := mockNode() - req := structs.RegisterRequest{ + req := structs.NodeRegisterRequest{ Node: node, } - buf, err := structs.Encode(structs.RegisterRequestType, req) + buf, err := structs.Encode(structs.NodeRegisterRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -94,10 +94,10 @@ func TestFSM_DeregisterNode(t *testing.T) { t.Fatalf("resp: %v", resp) } - req2 := structs.DeregisterRequest{ + req2 := structs.NodeDeregisterRequest{ NodeID: node.ID, } - buf, err = structs.Encode(structs.DeregisterRequestType, req2) + buf, err = structs.Encode(structs.NodeDeregisterRequestType, req2) if err != nil { t.Fatalf("err: %v", err) } @@ -121,10 +121,10 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { fsm := testFSM(t) node := mockNode() - req := structs.RegisterRequest{ + req := structs.NodeRegisterRequest{ Node: node, } - buf, err := structs.Encode(structs.RegisterRequestType, req) + buf, err := structs.Encode(structs.NodeRegisterRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -134,7 +134,7 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { t.Fatalf("resp: %v", resp) } - req2 := structs.UpdateStatusRequest{ + req2 := structs.NodeUpdateStatusRequest{ NodeID: node.ID, Status: structs.NodeStatusReady, } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index cf911650b..1201c6839 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -16,9 +16,11 @@ var ( type MessageType uint8 const ( - RegisterRequestType MessageType = iota - DeregisterRequestType + NodeRegisterRequestType MessageType = iota + NodeDeregisterRequestType NodeUpdateStatusRequestType + JobRegisterRequestType + JobDeregisterRequestType ) const ( @@ -108,23 +110,23 @@ type WriteMeta struct { Index uint64 } -// RegisterRequest is used for Client.Register endpoint +// NodeRegisterRequest is used for Client.Register endpoint // to register a node as being a schedulable entity. -type RegisterRequest struct { +type NodeRegisterRequest struct { Node *Node WriteRequest } -// DeregisterRequest is used for Client.Deregister endpoint +// NodeDeregisterRequest is used for Client.Deregister endpoint // to deregister a node as being a schedulable entity. -type DeregisterRequest struct { +type NodeDeregisterRequest struct { NodeID string WriteRequest } // UpdateStatusRequest is used for Client.UpdateStatus endpoint // to update the status of a node. -type UpdateStatusRequest struct { +type NodeUpdateStatusRequest struct { NodeID string Status string WriteRequest @@ -136,6 +138,20 @@ type NodeSpecificRequest struct { WriteRequest } +// JobRegisterRequest is used for Job.Register endpoint +// to register a job as being a schedulable entity. +type JobRegisterRequest struct { + Job *Job + WriteRequest +} + +// JobDeregisterRequest is used for Job.Deregister endpoint +// to deregister a job as being a schedulable entity. +type JobDeregisterRequest struct { + JobName string + WriteRequest +} + // GenericResponse is used to respond to a request where no // specific response information is needed. type GenericResponse struct {