mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 02:15:43 +03:00
nomad: make struct names more specific to avoid confusion
This commit is contained in:
@@ -14,7 +14,7 @@ type Client struct {
|
||||
}
|
||||
|
||||
// Register is used to upsert a client that is available for scheduling
|
||||
func (c *Client) Register(args *structs.RegisterRequest, reply *structs.GenericResponse) error {
|
||||
func (c *Client) Register(args *structs.NodeRegisterRequest, reply *structs.GenericResponse) error {
|
||||
if done, err := c.srv.forward("Client.Register", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
@@ -40,7 +40,7 @@ func (c *Client) Register(args *structs.RegisterRequest, reply *structs.GenericR
|
||||
}
|
||||
|
||||
// Commit this update via Raft
|
||||
_, index, err := c.srv.raftApply(structs.RegisterRequestType, args)
|
||||
_, index, err := c.srv.raftApply(structs.NodeRegisterRequestType, args)
|
||||
if err != nil {
|
||||
c.srv.logger.Printf("[ERR] nomad.client: Register failed: %v", err)
|
||||
return err
|
||||
@@ -53,7 +53,7 @@ func (c *Client) Register(args *structs.RegisterRequest, reply *structs.GenericR
|
||||
|
||||
// Deregister is used to remove a client from the client. If a client should
|
||||
// just be made unavailable for scheduling, a status update is prefered.
|
||||
func (c *Client) Deregister(args *structs.DeregisterRequest, reply *structs.GenericResponse) error {
|
||||
func (c *Client) Deregister(args *structs.NodeDeregisterRequest, reply *structs.GenericResponse) error {
|
||||
if done, err := c.srv.forward("Client.Deregister", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
@@ -65,7 +65,7 @@ func (c *Client) Deregister(args *structs.DeregisterRequest, reply *structs.Gene
|
||||
}
|
||||
|
||||
// Commit this update via Raft
|
||||
_, index, err := c.srv.raftApply(structs.DeregisterRequestType, args)
|
||||
_, index, err := c.srv.raftApply(structs.NodeDeregisterRequestType, args)
|
||||
if err != nil {
|
||||
c.srv.logger.Printf("[ERR] nomad.client: Deregister failed: %v", err)
|
||||
return err
|
||||
@@ -77,7 +77,7 @@ func (c *Client) Deregister(args *structs.DeregisterRequest, reply *structs.Gene
|
||||
}
|
||||
|
||||
// UpdateStatus is used to update the status of a client node
|
||||
func (c *Client) UpdateStatus(args *structs.UpdateStatusRequest, reply *structs.GenericResponse) error {
|
||||
func (c *Client) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.GenericResponse) error {
|
||||
if done, err := c.srv.forward("Client.UpdateStatus", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ func TestClientEndpoint_Register(t *testing.T) {
|
||||
|
||||
// Create the register request
|
||||
node := mockNode()
|
||||
req := &structs.RegisterRequest{
|
||||
req := &structs.NodeRegisterRequest{
|
||||
Node: node,
|
||||
WriteRequest: structs.WriteRequest{Region: "region1"},
|
||||
}
|
||||
@@ -53,7 +53,7 @@ func TestClientEndpoint_Deregister(t *testing.T) {
|
||||
|
||||
// Create the register request
|
||||
node := mockNode()
|
||||
reg := &structs.RegisterRequest{
|
||||
reg := &structs.NodeRegisterRequest{
|
||||
Node: node,
|
||||
WriteRequest: structs.WriteRequest{Region: "region1"},
|
||||
}
|
||||
@@ -65,7 +65,7 @@ func TestClientEndpoint_Deregister(t *testing.T) {
|
||||
}
|
||||
|
||||
// Deregister
|
||||
dereg := &structs.DeregisterRequest{
|
||||
dereg := &structs.NodeDeregisterRequest{
|
||||
NodeID: node.ID,
|
||||
WriteRequest: structs.WriteRequest{Region: "region1"},
|
||||
}
|
||||
@@ -96,7 +96,7 @@ func TestClientEndpoint_UpdateStatus(t *testing.T) {
|
||||
|
||||
// Create the register request
|
||||
node := mockNode()
|
||||
reg := &structs.RegisterRequest{
|
||||
reg := &structs.NodeRegisterRequest{
|
||||
Node: node,
|
||||
WriteRequest: structs.WriteRequest{Region: "region1"},
|
||||
}
|
||||
@@ -108,7 +108,7 @@ func TestClientEndpoint_UpdateStatus(t *testing.T) {
|
||||
}
|
||||
|
||||
// Update the status
|
||||
dereg := &structs.UpdateStatusRequest{
|
||||
dereg := &structs.NodeUpdateStatusRequest{
|
||||
NodeID: node.ID,
|
||||
Status: structs.NodeStatusReady,
|
||||
WriteRequest: structs.WriteRequest{Region: "region1"},
|
||||
@@ -143,7 +143,7 @@ func TestClientEndpoint_GetNode(t *testing.T) {
|
||||
|
||||
// Create the register request
|
||||
node := mockNode()
|
||||
reg := &structs.RegisterRequest{
|
||||
reg := &structs.NodeRegisterRequest{
|
||||
Node: node,
|
||||
WriteRequest: structs.WriteRequest{Region: "region1"},
|
||||
}
|
||||
|
||||
57
nomad/fsm.go
57
nomad/fsm.go
@@ -88,12 +88,16 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
|
||||
}
|
||||
|
||||
switch msgType {
|
||||
case structs.RegisterRequestType:
|
||||
return n.decodeRegister(buf[1:], log.Index)
|
||||
case structs.DeregisterRequestType:
|
||||
return n.applyDeregister(buf[1:], log.Index)
|
||||
case structs.NodeRegisterRequestType:
|
||||
return n.applyRegisterNode(buf[1:], log.Index)
|
||||
case structs.NodeDeregisterRequestType:
|
||||
return n.applyDeregisterNode(buf[1:], log.Index)
|
||||
case structs.NodeUpdateStatusRequestType:
|
||||
return n.applyStatusUpdate(buf[1:], log.Index)
|
||||
case structs.JobRegisterRequestType:
|
||||
return n.applyRegisterJob(buf[1:], log.Index)
|
||||
case structs.JobDeregisterRequestType:
|
||||
return n.applyDeregisterJob(buf[1:], log.Index)
|
||||
default:
|
||||
if ignoreUnknown {
|
||||
n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
|
||||
@@ -104,16 +108,13 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
|
||||
}
|
||||
}
|
||||
|
||||
func (n *nomadFSM) decodeRegister(buf []byte, index uint64) interface{} {
|
||||
var req structs.RegisterRequest
|
||||
func (n *nomadFSM) applyRegisterNode(buf []byte, index uint64) interface{} {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_node"}, time.Now())
|
||||
var req structs.NodeRegisterRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
return n.applyRegister(&req, index)
|
||||
}
|
||||
|
||||
func (n *nomadFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "register"}, time.Now())
|
||||
if err := n.state.RegisterNode(index, req.Node); err != nil {
|
||||
n.logger.Printf("[ERR] nomad.fsm: RegisterNode failed: %v", err)
|
||||
return err
|
||||
@@ -121,9 +122,9 @@ func (n *nomadFSM) applyRegister(req *structs.RegisterRequest, index uint64) int
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nomadFSM) applyDeregister(buf []byte, index uint64) interface{} {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister"}, time.Now())
|
||||
var req structs.DeregisterRequest
|
||||
func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_node"}, time.Now())
|
||||
var req structs.NodeDeregisterRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
@@ -137,7 +138,7 @@ func (n *nomadFSM) applyDeregister(buf []byte, index uint64) interface{} {
|
||||
|
||||
func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_status_update"}, time.Now())
|
||||
var req structs.UpdateStatusRequest
|
||||
var req structs.NodeUpdateStatusRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
@@ -149,6 +150,34 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nomadFSM) applyRegisterJob(buf []byte, index uint64) interface{} {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_job"}, time.Now())
|
||||
var req structs.JobRegisterRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
|
||||
if err := n.state.RegisterJob(index, req.Job); err != nil {
|
||||
n.logger.Printf("[ERR] nomad.fsm: RegisterJob failed: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_job"}, time.Now())
|
||||
var req structs.JobDeregisterRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
|
||||
if err := n.state.DeregisterJob(index, req.JobName); err != nil {
|
||||
n.logger.Printf("[ERR] nomad.fsm: DeregisterJob failed: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||
// Create a new snapshot
|
||||
snap, err := n.state.Snapshot()
|
||||
|
||||
@@ -51,10 +51,10 @@ func makeLog(buf []byte) *raft.Log {
|
||||
func TestFSM_RegisterNode(t *testing.T) {
|
||||
fsm := testFSM(t)
|
||||
|
||||
req := structs.RegisterRequest{
|
||||
req := structs.NodeRegisterRequest{
|
||||
Node: mockNode(),
|
||||
}
|
||||
buf, err := structs.Encode(structs.RegisterRequestType, req)
|
||||
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -81,10 +81,10 @@ func TestFSM_DeregisterNode(t *testing.T) {
|
||||
fsm := testFSM(t)
|
||||
|
||||
node := mockNode()
|
||||
req := structs.RegisterRequest{
|
||||
req := structs.NodeRegisterRequest{
|
||||
Node: node,
|
||||
}
|
||||
buf, err := structs.Encode(structs.RegisterRequestType, req)
|
||||
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -94,10 +94,10 @@ func TestFSM_DeregisterNode(t *testing.T) {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
req2 := structs.DeregisterRequest{
|
||||
req2 := structs.NodeDeregisterRequest{
|
||||
NodeID: node.ID,
|
||||
}
|
||||
buf, err = structs.Encode(structs.DeregisterRequestType, req2)
|
||||
buf, err = structs.Encode(structs.NodeDeregisterRequestType, req2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -121,10 +121,10 @@ func TestFSM_UpdateNodeStatus(t *testing.T) {
|
||||
fsm := testFSM(t)
|
||||
|
||||
node := mockNode()
|
||||
req := structs.RegisterRequest{
|
||||
req := structs.NodeRegisterRequest{
|
||||
Node: node,
|
||||
}
|
||||
buf, err := structs.Encode(structs.RegisterRequestType, req)
|
||||
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -134,7 +134,7 @@ func TestFSM_UpdateNodeStatus(t *testing.T) {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
req2 := structs.UpdateStatusRequest{
|
||||
req2 := structs.NodeUpdateStatusRequest{
|
||||
NodeID: node.ID,
|
||||
Status: structs.NodeStatusReady,
|
||||
}
|
||||
|
||||
@@ -16,9 +16,11 @@ var (
|
||||
type MessageType uint8
|
||||
|
||||
const (
|
||||
RegisterRequestType MessageType = iota
|
||||
DeregisterRequestType
|
||||
NodeRegisterRequestType MessageType = iota
|
||||
NodeDeregisterRequestType
|
||||
NodeUpdateStatusRequestType
|
||||
JobRegisterRequestType
|
||||
JobDeregisterRequestType
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -108,23 +110,23 @@ type WriteMeta struct {
|
||||
Index uint64
|
||||
}
|
||||
|
||||
// RegisterRequest is used for Client.Register endpoint
|
||||
// NodeRegisterRequest is used for Client.Register endpoint
|
||||
// to register a node as being a schedulable entity.
|
||||
type RegisterRequest struct {
|
||||
type NodeRegisterRequest struct {
|
||||
Node *Node
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// DeregisterRequest is used for Client.Deregister endpoint
|
||||
// NodeDeregisterRequest is used for Client.Deregister endpoint
|
||||
// to deregister a node as being a schedulable entity.
|
||||
type DeregisterRequest struct {
|
||||
type NodeDeregisterRequest struct {
|
||||
NodeID string
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// UpdateStatusRequest is used for Client.UpdateStatus endpoint
|
||||
// to update the status of a node.
|
||||
type UpdateStatusRequest struct {
|
||||
type NodeUpdateStatusRequest struct {
|
||||
NodeID string
|
||||
Status string
|
||||
WriteRequest
|
||||
@@ -136,6 +138,20 @@ type NodeSpecificRequest struct {
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// JobRegisterRequest is used for Job.Register endpoint
|
||||
// to register a job as being a schedulable entity.
|
||||
type JobRegisterRequest struct {
|
||||
Job *Job
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// JobDeregisterRequest is used for Job.Deregister endpoint
|
||||
// to deregister a job as being a schedulable entity.
|
||||
type JobDeregisterRequest struct {
|
||||
JobName string
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// GenericResponse is used to respond to a request where no
|
||||
// specific response information is needed.
|
||||
type GenericResponse struct {
|
||||
|
||||
Reference in New Issue
Block a user