Support for new scheduler config API, first use case is to disable preemption

This commit is contained in:
Preetha Appan
2018-09-27 23:27:38 -05:00
parent 51c5bae393
commit 784b96c104
13 changed files with 539 additions and 0 deletions

View File

@@ -1,5 +1,7 @@
package api
import "strconv"
// Operator can be used to perform low-level operator tasks for Nomad.
type Operator struct {
c *Client
@@ -106,3 +108,46 @@ func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
resp.Body.Close()
return nil
}
type SchedulerConfiguration struct {
// EnablePreemption specifies whether to enable eviction of lower
// priority jobs to place higher priority jobs.
EnablePreemption bool
// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
}
// SchedulerGetConfiguration is used to query the current Scheduler configuration.
func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfiguration, *QueryMeta, error) {
var resp SchedulerConfiguration
qm, err := op.c.query("/v1/operator/scheduler/config", &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}
// SchedulerSetConfiguration is used to set the current Scheduler configuration.
func (op *Operator) SchedulerSetConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*WriteMeta, error) {
var out bool
wm, err := op.c.write("/v1/operator/scheduler/config", conf, &out, q)
if err != nil {
return nil, err
}
return wm, nil
}
// SchedulerCASConfiguration is used to perform a Check-And-Set update on the
// Scheduler configuration. The ModifyIndex value will be respected. Returns
// true on success or false on failures.
func (op *Operator) SchedulerCASConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (bool, *WriteMeta, error) {
var out bool
wm, err := op.c.write("/v1/operator/scheduler/config?cas="+strconv.FormatUint(conf.ModifyIndex, 10), conf, &out, q)
if err != nil {
return false, nil, err
}
return out, wm, nil
}

View File

@@ -3,6 +3,9 @@ package api
import (
"strings"
"testing"
"github.com/hashicorp/consul/testutil/retry"
"github.com/stretchr/testify/require"
)
func TestOperator_RaftGetConfiguration(t *testing.T) {
@@ -51,3 +54,66 @@ func TestOperator_RaftRemovePeerByID(t *testing.T) {
t.Fatalf("err: %v", err)
}
}
func TestAPI_OperatorSchedulerGetSetConfiguration(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
operator := c.Operator()
var config *SchedulerConfiguration
retry.Run(t, func(r *retry.R) {
var err error
config, _, err = operator.SchedulerGetConfiguration(nil)
r.Check(err)
})
require.False(config.EnablePreemption)
// Change a config setting
newConf := &SchedulerConfiguration{EnablePreemption: true}
_, err := operator.SchedulerSetConfiguration(newConf, nil)
require.Nil(err)
config, _, err = operator.SchedulerGetConfiguration(nil)
require.Nil(err)
require.True(config.EnablePreemption)
}
func TestAPI_OperatorSchedulerCASConfiguration(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
operator := c.Operator()
var config *SchedulerConfiguration
retry.Run(t, func(r *retry.R) {
var err error
config, _, err = operator.SchedulerGetConfiguration(nil)
r.Check(err)
})
require.False(config.EnablePreemption)
// Pass an invalid ModifyIndex
{
newConf := &SchedulerConfiguration{
EnablePreemption: true,
ModifyIndex: config.ModifyIndex - 1,
}
resp, _, err := operator.SchedulerCASConfiguration(newConf, nil)
require.Nil(err)
require.False(resp)
}
// Pass a valid ModifyIndex
{
newConf := &SchedulerConfiguration{
EnablePreemption: true,
ModifyIndex: config.ModifyIndex,
}
resp, _, err := operator.SchedulerCASConfiguration(newConf, nil)
require.Nil(err)
require.True(resp)
}
}

View File

@@ -194,6 +194,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries))
s.mux.HandleFunc("/v1/operator/scheduler/config", s.wrap(s.OperatorSchedulerConfiguration))
if uiEnabled {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()}))))
} else {

View File

@@ -208,3 +208,67 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re
return out, nil
}
// OperatorSchedulerConfiguration is used to inspect the current Scheduler configuration.
// This supports the stale query mode in case the cluster doesn't have a leader.
func (s *HTTPServer) OperatorSchedulerConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Switch on the method
switch req.Method {
case "GET":
var args structs.GenericRequest
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
return nil, nil
}
var reply structs.SchedulerConfiguration
if err := s.agent.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
return nil, err
}
out := api.SchedulerConfiguration{
EnablePreemption: reply.EnablePreemption,
CreateIndex: reply.CreateIndex,
ModifyIndex: reply.ModifyIndex,
}
return out, nil
case "PUT":
var args structs.SchedulerSetConfigRequest
s.parseWriteRequest(req, &args.WriteRequest)
var conf api.SchedulerConfiguration
if err := decodeBody(req, &conf); err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing autopilot config: %v", err))
}
args.Config = structs.SchedulerConfiguration{
EnablePreemption: conf.EnablePreemption,
}
// Check for cas value
params := req.URL.Query()
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
if err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err))
}
args.Config.ModifyIndex = casVal
args.CAS = true
}
var reply bool
if err := s.agent.RPC("Operator.SchedulerSetConfiguration", &args, &reply); err != nil {
return nil, err
}
// Only use the out value if this was a CAS
if !args.CAS {
return true, nil
}
return reply, nil
default:
return nil, CodedError(404, ErrInvalidMethod)
}
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestHTTP_OperatorRaftConfiguration(t *testing.T) {
@@ -257,3 +258,94 @@ func TestOperator_ServerHealth_Unhealthy(t *testing.T) {
})
})
}
func TestOperator_SchedulerGetConfiguration(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer(nil)
req, _ := http.NewRequest("GET", "/v1/operator/scheduler/config", body)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
out, ok := obj.(api.SchedulerConfiguration)
require.True(ok)
require.False(out.EnablePreemption)
})
}
func TestOperator_SchedulerSetConfiguration(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer([]byte(`{"EnablePreemption": true}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/config", body)
resp := httptest.NewRecorder()
_, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: s.Config.Region,
},
}
var reply structs.SchedulerConfiguration
err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply)
require.Nil(err)
require.True(reply.EnablePreemption)
})
}
func TestOperator_SchedulerCASConfiguration(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer([]byte(`{"EnablePreemption": true}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/config", body)
resp := httptest.NewRecorder()
_, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: s.Config.Region,
},
}
var reply structs.SchedulerConfiguration
if err := s.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
require.True(reply.EnablePreemption)
// Create a CAS request, bad index
{
buf := bytes.NewBuffer([]byte(`{"EnablePreemption": true}`))
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/config?cas=%d", reply.ModifyIndex-1), buf)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.False(obj.(bool))
}
// Create a CAS request, good index
{
buf := bytes.NewBuffer([]byte(`{"EnablePreemption": true}`))
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/config?cas=%d", reply.ModifyIndex), buf)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.True(obj.(bool))
}
// Verify the update
if err := s.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
require.True(reply.EnablePreemption)
})
}

View File

@@ -247,6 +247,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyNodeEligibilityUpdate(buf[1:], log.Index)
case structs.BatchNodeUpdateDrainRequestType:
return n.applyBatchDrainUpdate(buf[1:], log.Index)
case structs.SchedulerConfigRequestType:
return n.applySchedulerConfigUpdate(buf[1:], log.Index)
}
// Check enterprise only message types.
@@ -1833,6 +1835,23 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink,
return nil
}
func (n *nomadFSM) applySchedulerConfigUpdate(buf []byte, index uint64) interface{} {
var req structs.SchedulerSetConfigRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "scheduler-config"}, time.Now())
if req.CAS {
act, err := n.state.SchedulerCASConfig(index, req.Config.ModifyIndex, &req.Config)
if err != nil {
return err
}
return act
}
return n.state.SchedulerSetConfig(index, &req.Config)
}
// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.

View File

@@ -2825,3 +2825,47 @@ func TestFSM_Autopilot(t *testing.T) {
t.Fatalf("bad: %v", config.CleanupDeadServers)
}
}
func TestFSM_SchedulerConfig(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
require := require.New(t)
// Set the autopilot config using a request.
req := structs.SchedulerSetConfigRequest{
Config: structs.SchedulerConfiguration{
EnablePreemption: true,
},
}
buf, err := structs.Encode(structs.SchedulerConfigRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
// Verify key is set directly in the state store.
_, config, err := fsm.state.SchedulerConfig()
require.Nil(err)
require.Equal(config.EnablePreemption, req.Config.EnablePreemption)
// Now use CAS and provide an old index
req.CAS = true
req.Config.EnablePreemption = false
req.Config.ModifyIndex = config.ModifyIndex - 1
buf, err = structs.Encode(structs.SchedulerConfigRequestType, req)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
_, config, err = fsm.state.SchedulerConfig()
require.Nil(err)
// Verify that preemption is still enabled
require.True(config.EnablePreemption)
}

View File

@@ -187,6 +187,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
s.getOrCreateAutopilotConfig()
s.autopilot.Start()
// Initialize scheduler configuration
s.getOrCreateSchedulerConfig()
// Enable the plan queue, since we are now the leader
s.planQueue.SetEnabled(true)
@@ -1230,3 +1233,25 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig {
return config
}
// getOrCreateSchedulerConfig is used to get the scheduler config, initializing it if necessary
func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration {
state := s.fsm.State()
_, config, err := state.SchedulerConfig()
if err != nil {
s.logger.Named("core").Error("failed to get scheduler config", "error", err)
return nil
}
if config != nil {
return config
}
config = &structs.SchedulerConfiguration{EnablePreemption: false}
req := structs.SchedulerSetConfigRequest{Config: *config}
if _, _, err = s.raftApply(structs.SchedulerConfigRequestType, req); err != nil {
s.logger.Named("core").Error("failed to initialize config", "error", err)
return nil
}
return config
}

View File

@@ -284,3 +284,64 @@ func (op *Operator) ServerHealth(args *structs.GenericRequest, reply *autopilot.
return nil
}
// SchedulerSetConfiguration is used to set the current Scheduler configuration.
func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRequest, reply *bool) error {
if done, err := op.srv.forward("Operator.SchedulerSetConfiguration", args, args, reply); done {
return err
}
// This action requires operator write access.
rule, err := op.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
if rule != nil && !rule.AllowOperatorWrite() {
return structs.ErrPermissionDenied
}
// Apply the update
resp, _, err := op.srv.raftApply(structs.SchedulerConfigRequestType, args)
if err != nil {
op.logger.Error("failed applying Scheduler configuration", "error", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Check if the return type is a bool.
if respBool, ok := resp.(bool); ok {
*reply = respBool
}
return nil
}
// SchedulerGetConfiguration is used to retrieve the current Scheduler configuration.
func (op *Operator) SchedulerGetConfiguration(args *structs.GenericRequest, reply *structs.SchedulerConfiguration) error {
if done, err := op.srv.forward("Operator.SchedulerGetConfiguration", args, args, reply); done {
return err
}
// This action requires operator read access.
rule, err := op.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
if rule != nil && !rule.AllowOperatorRead() {
return structs.ErrPermissionDenied
}
state := op.srv.fsm.State()
_, config, err := state.SchedulerConfig()
if err != nil {
return err
}
if config == nil {
return fmt.Errorf("scheduler config not initialized yet")
}
*reply = *config
return nil
}

View File

@@ -44,6 +44,7 @@ func init() {
aclPolicyTableSchema,
aclTokenTableSchema,
autopilotConfigTableSchema,
schedulerConfigTableSchema,
}...)
}
@@ -599,3 +600,21 @@ func aclTokenTableSchema() *memdb.TableSchema {
},
}
}
// schedulerConfigTableSchema returns the MemDB schema for the scheduler config table.
// This table is used to store configuration options for the scheduler
func schedulerConfigTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "scheduler_config",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
AllowMissing: true,
Unique: true,
Indexer: &memdb.ConditionalIndex{
Conditional: func(obj interface{}) (bool, error) { return true, nil },
},
},
},
}
}

View File

@@ -3991,3 +3991,81 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) {
}
}
}
// SchedulerConfig is used to get the current Scheduler configuration.
func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the scheduler config
c, err := tx.First("scheduler_config", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed scheduler config lookup: %s", err)
}
config, ok := c.(*structs.SchedulerConfiguration)
if !ok {
return 0, nil, nil
}
return config.ModifyIndex, config, nil
}
// SchedulerSetConfig is used to set the current Scheduler configuration.
func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerConfiguration) error {
tx := s.db.Txn(true)
defer tx.Abort()
s.schedulerSetConfigTxn(idx, tx, config)
tx.Commit()
return nil
}
// SchedulerCASConfig is used to try updating the scheduler configuration with a
// given Raft index. If the CAS index specified is not equal to the last observed index
// for the config, then the call is a noop,
func (s *StateStore) SchedulerCASConfig(idx, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
// Check for an existing config
existing, err := tx.First("scheduler_config", "id")
if err != nil {
return false, fmt.Errorf("failed scheduler config lookup: %s", err)
}
// If the existing index does not match the provided CAS
// index arg, then we shouldn't update anything and can safely
// return early here.
e, ok := existing.(*structs.SchedulerConfiguration)
if !ok || e.ModifyIndex != cidx {
return false, nil
}
s.schedulerSetConfigTxn(idx, tx, config)
tx.Commit()
return true, nil
}
func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.SchedulerConfiguration) error {
// Check for an existing config
existing, err := tx.First("scheduler_config", "id")
if err != nil {
return fmt.Errorf("failed scheduler config lookup: %s", err)
}
// Set the indexes.
if existing != nil {
config.CreateIndex = existing.(*structs.SchedulerConfiguration).CreateIndex
} else {
config.CreateIndex = idx
}
config.ModifyIndex = idx
if err := tx.Insert("scheduler_config", config); err != nil {
return fmt.Errorf("failed updating scheduler config: %s", err)
}
return nil
}

View File

@@ -119,3 +119,26 @@ type AutopilotConfig struct {
CreateIndex uint64
ModifyIndex uint64
}
type SchedulerConfiguration struct {
// EnablePreemption specifies whether to enable eviction of lower
// priority jobs to place higher priority jobs.
EnablePreemption bool
// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
}
// SchedulerSetConfigRequest is used by the Operator endpoint to update the
// current Scheduler configuration of the cluster.
type SchedulerSetConfigRequest struct {
// Config is the new Scheduler configuration to use.
Config SchedulerConfiguration
// CAS controls whether to use check-and-set semantics for this request.
CAS bool
// WriteRequest holds the ACL token to go along with this request.
WriteRequest
}

View File

@@ -81,6 +81,7 @@ const (
AllocUpdateDesiredTransitionRequestType
NodeUpdateEligibilityRequestType
BatchNodeUpdateDrainRequestType
SchedulerConfigRequestType
)
const (