mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 17:35:43 +03:00
node_endpoint preserve both messages as rpcs and in raft
This commit is contained in:
@@ -245,65 +245,93 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply
|
||||
return nil
|
||||
}
|
||||
|
||||
// Deregister is the deprecated single deregistration endpoint
|
||||
// Deregister is used to remove a client from the cluster. If a client should
|
||||
// just be made unavailable for scheduling, a status update is preferred.
|
||||
func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.NodeUpdateResponse) error {
|
||||
return n.DeregisterBatch(&structs.NodeBatchDeregisterRequest{
|
||||
NodeIDs: []string{args.NodeID},
|
||||
WriteRequest: args.WriteRequest,
|
||||
}, reply)
|
||||
}
|
||||
|
||||
// DeregisterBatch is used to remove client nodes from the cluster. If a client should just
|
||||
// be made unavailable for scheduling, a status update is preferred.
|
||||
func (n *Node) DeregisterBatch(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error {
|
||||
if done, err := n.srv.forward("Node.BatchDeregister", args, args, reply); done {
|
||||
if done, err := n.srv.forward("Node.Deregister", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now())
|
||||
|
||||
// Check node permissions
|
||||
if args.NodeID == "" {
|
||||
return fmt.Errorf("missing node ID for client deregistration")
|
||||
}
|
||||
|
||||
// deregister takes a batch
|
||||
repack := &structs.NodeBatchDeregisterRequest{
|
||||
NodeIDs: []string{args.NodeID},
|
||||
WriteRequest: args.WriteRequest,
|
||||
}
|
||||
|
||||
return n.deregister(repack, reply, func() (interface{}, uint64, error) {
|
||||
return n.srv.raftApply(structs.NodeDeregisterRequestType, args)
|
||||
})
|
||||
}
|
||||
|
||||
// BatchDeregister is used to remove client nodes from the cluster.
|
||||
func (n *Node) BatchDeregister(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error {
|
||||
if done, err := n.srv.forward("Node.BatchDeregister", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "client", "batch_deregister"}, time.Now())
|
||||
|
||||
if len(args.NodeIDs) == 0 {
|
||||
return fmt.Errorf("missing node IDs for client deregistration")
|
||||
}
|
||||
|
||||
return n.deregister(args, reply, func() (interface{}, uint64, error) {
|
||||
return n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args)
|
||||
})
|
||||
}
|
||||
|
||||
// deregister takes a raftMessage closure, to support both Deregister and BatchDeregister
|
||||
func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
|
||||
reply *structs.NodeUpdateResponse,
|
||||
raftApplyFn func() (interface{}, uint64, error),
|
||||
) error {
|
||||
// Check request permissions
|
||||
if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
|
||||
return err
|
||||
} else if aclObj != nil && !aclObj.AllowNodeWrite() {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
|
||||
// Verify the arguments
|
||||
if len(args.NodeIDs) == 0 {
|
||||
return fmt.Errorf("missing node IDs for client deregistration")
|
||||
}
|
||||
|
||||
// Open state handles
|
||||
// Look for the node
|
||||
snap, err := n.srv.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
|
||||
// Assert that the state contains the nodes
|
||||
for _, nodeID := range args.NodeIDs {
|
||||
node, err := snap.NodeByID(ws, nodeID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("node lookup failed: %s: %v", nodeID, err)
|
||||
return err
|
||||
}
|
||||
if node == nil {
|
||||
return fmt.Errorf("node not found: %s", nodeID)
|
||||
return fmt.Errorf("node not found")
|
||||
}
|
||||
}
|
||||
|
||||
// Commit this update to Raft, before we clear the heartbeatTimer so that failure
|
||||
// leaves the node running
|
||||
_, index, err := n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args)
|
||||
// Commit this update via Raft
|
||||
_, index, err := raftApplyFn()
|
||||
if err != nil {
|
||||
n.logger.Error("deregister failed", "error", err)
|
||||
n.logger.Error("raft message failed", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
for _, nodeID := range args.NodeIDs {
|
||||
// Clear the heartbeat timer if any
|
||||
n.srv.clearHeartbeatTimer(nodeID)
|
||||
|
||||
// If there are any Vault accessors on the node, revoke them
|
||||
// Create the evaluations for this node
|
||||
evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index)
|
||||
if err != nil {
|
||||
n.logger.Error("eval creation failed", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Determine if there are any Vault accessors on the node
|
||||
accessors, err := snap.VaultAccessorsByNode(ws, nodeID)
|
||||
if err != nil {
|
||||
n.logger.Error("looking up accessors for node failed", "node_id", nodeID, "error", err)
|
||||
@@ -318,24 +346,15 @@ func (n *Node) DeregisterBatch(args *structs.NodeBatchDeregisterRequest, reply *
|
||||
}
|
||||
}
|
||||
|
||||
// Create the evaluations for these nodes
|
||||
evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index)
|
||||
if err != nil {
|
||||
n.logger.Error("eval creation failed", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
reply.EvalIDs = append(reply.EvalIDs, evalIDs...)
|
||||
// Set the reply evalIndex only the first time
|
||||
// Set the reply eval create index just the first time
|
||||
if reply.EvalCreateIndex == 0 {
|
||||
reply.EvalCreateIndex = evalIndex
|
||||
}
|
||||
}
|
||||
|
||||
// Setup the reply
|
||||
reply.NodeModifyIndex = index
|
||||
reply.Index = index
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user