mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
This PR implements a new "System Batch" scheduler type. Jobs can make use of this new scheduler by setting their type to 'sysbatch'. Like the name implies, sysbatch can be thought of as a hybrid between system and batch jobs - it is for running short lived jobs intended to run on every compatible node in the cluster. As with batch jobs, sysbatch jobs can also be periodic and/or parameterized dispatch jobs. A sysbatch job is considered complete when it has been run on all compatible nodes until reaching a terminal state (success or failed on retries). Feasibility and preemption are governed the same as with system jobs. In this PR, the update stanza is not yet supported. The update stanza is sill limited in functionality for the underlying system scheduler, and is not useful yet for sysbatch jobs. Further work in #4740 will improve support for the update stanza and deployments. Closes #2527
468 lines
13 KiB
Go
468 lines
13 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"strings"
|
|
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
|
"github.com/hashicorp/go-msgpack/codec"
|
|
"github.com/hashicorp/nomad/api"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/raft"
|
|
)
|
|
|
|
// OperatorRequest is used route operator/raft API requests to the implementing
|
|
// functions.
|
|
func (s *HTTPServer) OperatorRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
path := strings.TrimPrefix(req.URL.Path, "/v1/operator/raft/")
|
|
switch {
|
|
case strings.HasPrefix(path, "configuration"):
|
|
return s.OperatorRaftConfiguration(resp, req)
|
|
case strings.HasPrefix(path, "peer"):
|
|
return s.OperatorRaftPeer(resp, req)
|
|
default:
|
|
return nil, CodedError(404, ErrInvalidMethod)
|
|
}
|
|
}
|
|
|
|
// OperatorRaftConfiguration is used to inspect the current Raft configuration.
|
|
// This supports the stale query mode in case the cluster doesn't have a leader.
|
|
func (s *HTTPServer) OperatorRaftConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
if req.Method != "GET" {
|
|
resp.WriteHeader(http.StatusMethodNotAllowed)
|
|
return nil, nil
|
|
}
|
|
|
|
var args structs.GenericRequest
|
|
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
|
|
return nil, nil
|
|
}
|
|
|
|
var reply structs.RaftConfigurationResponse
|
|
if err := s.agent.RPC("Operator.RaftGetConfiguration", &args, &reply); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return reply, nil
|
|
}
|
|
|
|
// OperatorRaftPeer supports actions on Raft peers. Currently we only support
|
|
// removing peers by address.
|
|
func (s *HTTPServer) OperatorRaftPeer(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
if req.Method != "DELETE" {
|
|
return nil, CodedError(404, ErrInvalidMethod)
|
|
}
|
|
|
|
params := req.URL.Query()
|
|
_, hasID := params["id"]
|
|
_, hasAddress := params["address"]
|
|
|
|
if !hasID && !hasAddress {
|
|
return nil, CodedError(http.StatusBadRequest, "Must specify either ?id with the server's ID or ?address with IP:port of peer to remove")
|
|
}
|
|
if hasID && hasAddress {
|
|
return nil, CodedError(http.StatusBadRequest, "Must specify only one of ?id or ?address")
|
|
}
|
|
|
|
if hasID {
|
|
var args structs.RaftPeerByIDRequest
|
|
s.parseWriteRequest(req, &args.WriteRequest)
|
|
|
|
var reply struct{}
|
|
args.ID = raft.ServerID(params.Get("id"))
|
|
if err := s.agent.RPC("Operator.RaftRemovePeerByID", &args, &reply); err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
var args structs.RaftPeerByAddressRequest
|
|
s.parseWriteRequest(req, &args.WriteRequest)
|
|
|
|
var reply struct{}
|
|
args.Address = raft.ServerAddress(params.Get("address"))
|
|
if err := s.agent.RPC("Operator.RaftRemovePeerByAddress", &args, &reply); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
// OperatorAutopilotConfiguration is used to inspect the current Autopilot configuration.
|
|
// This supports the stale query mode in case the cluster doesn't have a leader.
|
|
func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Switch on the method
|
|
switch req.Method {
|
|
case "GET":
|
|
var args structs.GenericRequest
|
|
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
|
|
return nil, nil
|
|
}
|
|
|
|
var reply structs.AutopilotConfig
|
|
if err := s.agent.RPC("Operator.AutopilotGetConfiguration", &args, &reply); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
out := api.AutopilotConfiguration{
|
|
CleanupDeadServers: reply.CleanupDeadServers,
|
|
LastContactThreshold: reply.LastContactThreshold,
|
|
MaxTrailingLogs: reply.MaxTrailingLogs,
|
|
MinQuorum: reply.MinQuorum,
|
|
ServerStabilizationTime: reply.ServerStabilizationTime,
|
|
EnableRedundancyZones: reply.EnableRedundancyZones,
|
|
DisableUpgradeMigration: reply.DisableUpgradeMigration,
|
|
EnableCustomUpgrades: reply.EnableCustomUpgrades,
|
|
CreateIndex: reply.CreateIndex,
|
|
ModifyIndex: reply.ModifyIndex,
|
|
}
|
|
|
|
return out, nil
|
|
|
|
case "PUT":
|
|
var args structs.AutopilotSetConfigRequest
|
|
s.parseWriteRequest(req, &args.WriteRequest)
|
|
|
|
var conf api.AutopilotConfiguration
|
|
if err := decodeBody(req, &conf); err != nil {
|
|
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing autopilot config: %v", err))
|
|
}
|
|
|
|
args.Config = structs.AutopilotConfig{
|
|
CleanupDeadServers: conf.CleanupDeadServers,
|
|
LastContactThreshold: conf.LastContactThreshold,
|
|
MaxTrailingLogs: conf.MaxTrailingLogs,
|
|
MinQuorum: conf.MinQuorum,
|
|
ServerStabilizationTime: conf.ServerStabilizationTime,
|
|
EnableRedundancyZones: conf.EnableRedundancyZones,
|
|
DisableUpgradeMigration: conf.DisableUpgradeMigration,
|
|
EnableCustomUpgrades: conf.EnableCustomUpgrades,
|
|
}
|
|
|
|
// Check for cas value
|
|
params := req.URL.Query()
|
|
if _, ok := params["cas"]; ok {
|
|
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
|
|
if err != nil {
|
|
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err))
|
|
}
|
|
args.Config.ModifyIndex = casVal
|
|
args.CAS = true
|
|
}
|
|
|
|
var reply bool
|
|
if err := s.agent.RPC("Operator.AutopilotSetConfiguration", &args, &reply); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Only use the out value if this was a CAS
|
|
if !args.CAS {
|
|
return true, nil
|
|
}
|
|
return reply, nil
|
|
|
|
default:
|
|
return nil, CodedError(404, ErrInvalidMethod)
|
|
}
|
|
}
|
|
|
|
// OperatorServerHealth is used to get the health of the servers in the given Region.
|
|
func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
if req.Method != "GET" {
|
|
return nil, CodedError(404, ErrInvalidMethod)
|
|
}
|
|
|
|
var args structs.GenericRequest
|
|
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
|
|
return nil, nil
|
|
}
|
|
|
|
var reply autopilot.OperatorHealthReply
|
|
if err := s.agent.RPC("Operator.ServerHealth", &args, &reply); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Reply with status 429 if something is unhealthy
|
|
if !reply.Healthy {
|
|
resp.WriteHeader(http.StatusTooManyRequests)
|
|
}
|
|
|
|
out := &api.OperatorHealthReply{
|
|
Healthy: reply.Healthy,
|
|
FailureTolerance: reply.FailureTolerance,
|
|
}
|
|
for _, server := range reply.Servers {
|
|
out.Servers = append(out.Servers, api.ServerHealth{
|
|
ID: server.ID,
|
|
Name: server.Name,
|
|
Address: server.Address,
|
|
Version: server.Version,
|
|
Leader: server.Leader,
|
|
SerfStatus: server.SerfStatus.String(),
|
|
LastContact: server.LastContact,
|
|
LastTerm: server.LastTerm,
|
|
LastIndex: server.LastIndex,
|
|
Healthy: server.Healthy,
|
|
Voter: server.Voter,
|
|
StableSince: server.StableSince.Round(time.Second).UTC(),
|
|
})
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// OperatorSchedulerConfiguration is used to inspect the current Scheduler configuration.
|
|
// This supports the stale query mode in case the cluster doesn't have a leader.
|
|
func (s *HTTPServer) OperatorSchedulerConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Switch on the method
|
|
switch req.Method {
|
|
case "GET":
|
|
return s.schedulerGetConfig(resp, req)
|
|
|
|
case "PUT", "POST":
|
|
return s.schedulerUpdateConfig(resp, req)
|
|
|
|
default:
|
|
return nil, CodedError(405, ErrInvalidMethod)
|
|
}
|
|
}
|
|
|
|
func (s *HTTPServer) schedulerGetConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var args structs.GenericRequest
|
|
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
|
|
return nil, nil
|
|
}
|
|
|
|
var reply structs.SchedulerConfigurationResponse
|
|
if err := s.agent.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
|
|
return nil, err
|
|
}
|
|
setMeta(resp, &reply.QueryMeta)
|
|
|
|
return reply, nil
|
|
}
|
|
|
|
func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var args structs.SchedulerSetConfigRequest
|
|
s.parseWriteRequest(req, &args.WriteRequest)
|
|
|
|
var conf api.SchedulerConfiguration
|
|
if err := decodeBody(req, &conf); err != nil {
|
|
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing scheduler config: %v", err))
|
|
}
|
|
|
|
args.Config = structs.SchedulerConfiguration{
|
|
SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm),
|
|
MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled,
|
|
PreemptionConfig: structs.PreemptionConfig{
|
|
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
|
|
SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled,
|
|
BatchSchedulerEnabled: conf.PreemptionConfig.BatchSchedulerEnabled,
|
|
ServiceSchedulerEnabled: conf.PreemptionConfig.ServiceSchedulerEnabled},
|
|
}
|
|
|
|
if err := args.Config.Validate(); err != nil {
|
|
return nil, CodedError(http.StatusBadRequest, err.Error())
|
|
}
|
|
|
|
// Check for cas value
|
|
params := req.URL.Query()
|
|
if _, ok := params["cas"]; ok {
|
|
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
|
|
if err != nil {
|
|
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err))
|
|
}
|
|
args.Config.ModifyIndex = casVal
|
|
args.CAS = true
|
|
}
|
|
|
|
var reply structs.SchedulerSetConfigurationResponse
|
|
if err := s.agent.RPC("Operator.SchedulerSetConfiguration", &args, &reply); err != nil {
|
|
return nil, err
|
|
}
|
|
setIndex(resp, reply.Index)
|
|
return reply, nil
|
|
}
|
|
|
|
func (s *HTTPServer) SnapshotRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
switch req.Method {
|
|
case "GET":
|
|
return s.snapshotSaveRequest(resp, req)
|
|
case "PUT", "POST":
|
|
return s.snapshotRestoreRequest(resp, req)
|
|
default:
|
|
return nil, CodedError(405, ErrInvalidMethod)
|
|
}
|
|
|
|
}
|
|
|
|
func (s *HTTPServer) snapshotSaveRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
args := &structs.SnapshotSaveRequest{}
|
|
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
|
|
return nil, nil
|
|
}
|
|
|
|
var handler structs.StreamingRpcHandler
|
|
var handlerErr error
|
|
|
|
if server := s.agent.Server(); server != nil {
|
|
handler, handlerErr = server.StreamingRpcHandler("Operator.SnapshotSave")
|
|
} else if client := s.agent.Client(); client != nil {
|
|
handler, handlerErr = client.RemoteStreamingRpcHandler("Operator.SnapshotSave")
|
|
} else {
|
|
handlerErr = fmt.Errorf("misconfigured connection")
|
|
}
|
|
|
|
if handlerErr != nil {
|
|
return nil, CodedError(500, handlerErr.Error())
|
|
}
|
|
|
|
httpPipe, handlerPipe := net.Pipe()
|
|
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
|
|
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
|
|
|
|
// Create a goroutine that closes the pipe if the connection closes.
|
|
ctx, cancel := context.WithCancel(req.Context())
|
|
defer cancel()
|
|
go func() {
|
|
<-ctx.Done()
|
|
httpPipe.Close()
|
|
}()
|
|
|
|
errCh := make(chan HTTPCodedError, 2)
|
|
go func() {
|
|
defer cancel()
|
|
|
|
// Send the request
|
|
if err := encoder.Encode(args); err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
|
|
var res structs.SnapshotSaveResponse
|
|
if err := decoder.Decode(&res); err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
|
|
if res.ErrorMsg != "" {
|
|
errCh <- CodedError(res.ErrorCode, res.ErrorMsg)
|
|
return
|
|
}
|
|
|
|
resp.Header().Add("Digest", res.SnapshotChecksum)
|
|
|
|
_, err := io.Copy(resp, httpPipe)
|
|
if err != nil &&
|
|
err != io.EOF &&
|
|
!strings.Contains(err.Error(), "closed") &&
|
|
!strings.Contains(err.Error(), "EOF") {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
|
|
errCh <- nil
|
|
}()
|
|
|
|
handler(handlerPipe)
|
|
cancel()
|
|
codedErr := <-errCh
|
|
|
|
return nil, codedErr
|
|
}
|
|
|
|
func (s *HTTPServer) snapshotRestoreRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
args := &structs.SnapshotRestoreRequest{}
|
|
s.parseWriteRequest(req, &args.WriteRequest)
|
|
|
|
var handler structs.StreamingRpcHandler
|
|
var handlerErr error
|
|
|
|
if server := s.agent.Server(); server != nil {
|
|
handler, handlerErr = server.StreamingRpcHandler("Operator.SnapshotRestore")
|
|
} else if client := s.agent.Client(); client != nil {
|
|
handler, handlerErr = client.RemoteStreamingRpcHandler("Operator.SnapshotRestore")
|
|
} else {
|
|
handlerErr = fmt.Errorf("misconfigured connection")
|
|
}
|
|
|
|
if handlerErr != nil {
|
|
return nil, CodedError(500, handlerErr.Error())
|
|
}
|
|
|
|
httpPipe, handlerPipe := net.Pipe()
|
|
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
|
|
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
|
|
|
|
// Create a goroutine that closes the pipe if the connection closes.
|
|
ctx, cancel := context.WithCancel(req.Context())
|
|
defer cancel()
|
|
go func() {
|
|
<-ctx.Done()
|
|
httpPipe.Close()
|
|
}()
|
|
|
|
errCh := make(chan HTTPCodedError, 2)
|
|
go func() {
|
|
defer cancel()
|
|
|
|
// Send the request
|
|
if err := encoder.Encode(args); err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
var wrapper cstructs.StreamErrWrapper
|
|
bytes := make([]byte, 1024)
|
|
|
|
for {
|
|
n, err := req.Body.Read(bytes)
|
|
if n > 0 {
|
|
wrapper.Payload = bytes[:n]
|
|
err := encoder.Encode(wrapper)
|
|
if err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
}
|
|
if err != nil {
|
|
wrapper.Payload = nil
|
|
wrapper.Error = &cstructs.RpcError{Message: err.Error()}
|
|
err := encoder.Encode(wrapper)
|
|
if err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
var res structs.SnapshotRestoreResponse
|
|
if err := decoder.Decode(&res); err != nil {
|
|
errCh <- CodedError(500, err.Error())
|
|
return
|
|
}
|
|
|
|
if res.ErrorMsg != "" {
|
|
errCh <- CodedError(res.ErrorCode, res.ErrorMsg)
|
|
return
|
|
}
|
|
|
|
errCh <- nil
|
|
}()
|
|
|
|
handler(handlerPipe)
|
|
cancel()
|
|
codedErr := <-errCh
|
|
|
|
return nil, codedErr
|
|
}
|