From 784b96c104373bbf0005ed541d37e1411becb4ea Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 27 Sep 2018 23:27:38 -0500 Subject: [PATCH] Support for new scheduler config API, first use case is to disable preemption --- api/operator.go | 45 ++++++++++++ api/operator_test.go | 66 ++++++++++++++++++ command/agent/http.go | 2 + command/agent/operator_endpoint.go | 64 +++++++++++++++++ command/agent/operator_endpoint_test.go | 92 +++++++++++++++++++++++++ nomad/fsm.go | 19 +++++ nomad/fsm_test.go | 44 ++++++++++++ nomad/leader.go | 25 +++++++ nomad/operator_endpoint.go | 61 ++++++++++++++++ nomad/state/schema.go | 19 +++++ nomad/state/state_store.go | 78 +++++++++++++++++++++ nomad/structs/operator.go | 23 +++++++ nomad/structs/structs.go | 1 + 13 files changed, 539 insertions(+) diff --git a/api/operator.go b/api/operator.go index be2ba8005..85a28fe8d 100644 --- a/api/operator.go +++ b/api/operator.go @@ -1,5 +1,7 @@ package api +import "strconv" + // Operator can be used to perform low-level operator tasks for Nomad. type Operator struct { c *Client @@ -106,3 +108,46 @@ func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error { resp.Body.Close() return nil } + +type SchedulerConfiguration struct { + // EnablePreemption specifies whether to enable eviction of lower + // priority jobs to place higher priority jobs. + EnablePreemption bool + + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. + CreateIndex uint64 + ModifyIndex uint64 +} + +// SchedulerGetConfiguration is used to query the current Scheduler configuration. +func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfiguration, *QueryMeta, error) { + var resp SchedulerConfiguration + qm, err := op.c.query("/v1/operator/scheduler/config", &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, qm, nil +} + +// SchedulerSetConfiguration is used to set the current Scheduler configuration. +func (op *Operator) SchedulerSetConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*WriteMeta, error) { + var out bool + wm, err := op.c.write("/v1/operator/scheduler/config", conf, &out, q) + if err != nil { + return nil, err + } + return wm, nil +} + +// SchedulerCASConfiguration is used to perform a Check-And-Set update on the +// Scheduler configuration. The ModifyIndex value will be respected. Returns +// true on success or false on failures. +func (op *Operator) SchedulerCASConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (bool, *WriteMeta, error) { + var out bool + wm, err := op.c.write("/v1/operator/scheduler/config?cas="+strconv.FormatUint(conf.ModifyIndex, 10), conf, &out, q) + if err != nil { + return false, nil, err + } + + return out, wm, nil +} diff --git a/api/operator_test.go b/api/operator_test.go index 5b13fc66c..277d85afe 100644 --- a/api/operator_test.go +++ b/api/operator_test.go @@ -3,6 +3,9 @@ package api import ( "strings" "testing" + + "github.com/hashicorp/consul/testutil/retry" + "github.com/stretchr/testify/require" ) func TestOperator_RaftGetConfiguration(t *testing.T) { @@ -51,3 +54,66 @@ func TestOperator_RaftRemovePeerByID(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestAPI_OperatorSchedulerGetSetConfiguration(t *testing.T) { + t.Parallel() + require := require.New(t) + c, s := makeClient(t, nil, nil) + defer s.Stop() + + operator := c.Operator() + var config *SchedulerConfiguration + retry.Run(t, func(r *retry.R) { + var err error + config, _, err = operator.SchedulerGetConfiguration(nil) + r.Check(err) + }) + require.False(config.EnablePreemption) + + // Change a config setting + newConf := &SchedulerConfiguration{EnablePreemption: true} + _, err := operator.SchedulerSetConfiguration(newConf, nil) + require.Nil(err) + + config, _, err = operator.SchedulerGetConfiguration(nil) + require.Nil(err) + require.True(config.EnablePreemption) +} + +func TestAPI_OperatorSchedulerCASConfiguration(t *testing.T) { + t.Parallel() + require := require.New(t) + c, s := makeClient(t, nil, nil) + defer s.Stop() + + operator := c.Operator() + var config *SchedulerConfiguration + retry.Run(t, func(r *retry.R) { + var err error + config, _, err = operator.SchedulerGetConfiguration(nil) + r.Check(err) + }) + require.False(config.EnablePreemption) + + // Pass an invalid ModifyIndex + { + newConf := &SchedulerConfiguration{ + EnablePreemption: true, + ModifyIndex: config.ModifyIndex - 1, + } + resp, _, err := operator.SchedulerCASConfiguration(newConf, nil) + require.Nil(err) + require.False(resp) + } + + // Pass a valid ModifyIndex + { + newConf := &SchedulerConfiguration{ + EnablePreemption: true, + ModifyIndex: config.ModifyIndex, + } + resp, _, err := operator.SchedulerCASConfiguration(newConf, nil) + require.Nil(err) + require.True(resp) + } +} diff --git a/command/agent/http.go b/command/agent/http.go index 857f5f8bc..d1bb0e74a 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -194,6 +194,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest)) s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries)) + s.mux.HandleFunc("/v1/operator/scheduler/config", s.wrap(s.OperatorSchedulerConfiguration)) + if uiEnabled { s.mux.Handle("/ui/", http.StripPrefix("/ui/", handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()})))) } else { diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index 02b25c218..9e9ebe9b3 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -208,3 +208,67 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re return out, nil } + +// OperatorSchedulerConfiguration is used to inspect the current Scheduler configuration. +// This supports the stale query mode in case the cluster doesn't have a leader. +func (s *HTTPServer) OperatorSchedulerConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Switch on the method + switch req.Method { + case "GET": + var args structs.GenericRequest + if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done { + return nil, nil + } + + var reply structs.SchedulerConfiguration + if err := s.agent.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil { + return nil, err + } + + out := api.SchedulerConfiguration{ + EnablePreemption: reply.EnablePreemption, + CreateIndex: reply.CreateIndex, + ModifyIndex: reply.ModifyIndex, + } + + return out, nil + + case "PUT": + var args structs.SchedulerSetConfigRequest + s.parseWriteRequest(req, &args.WriteRequest) + + var conf api.SchedulerConfiguration + if err := decodeBody(req, &conf); err != nil { + return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing autopilot config: %v", err)) + } + + args.Config = structs.SchedulerConfiguration{ + EnablePreemption: conf.EnablePreemption, + } + + // Check for cas value + params := req.URL.Query() + if _, ok := params["cas"]; ok { + casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64) + if err != nil { + return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err)) + } + args.Config.ModifyIndex = casVal + args.CAS = true + } + + var reply bool + if err := s.agent.RPC("Operator.SchedulerSetConfiguration", &args, &reply); err != nil { + return nil, err + } + + // Only use the out value if this was a CAS + if !args.CAS { + return true, nil + } + return reply, nil + + default: + return nil, CodedError(404, ErrInvalidMethod) + } +} diff --git a/command/agent/operator_endpoint_test.go b/command/agent/operator_endpoint_test.go index 2d8486765..2bf82d790 100644 --- a/command/agent/operator_endpoint_test.go +++ b/command/agent/operator_endpoint_test.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestHTTP_OperatorRaftConfiguration(t *testing.T) { @@ -257,3 +258,94 @@ func TestOperator_ServerHealth_Unhealthy(t *testing.T) { }) }) } + +func TestOperator_SchedulerGetConfiguration(t *testing.T) { + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + require := require.New(t) + body := bytes.NewBuffer(nil) + req, _ := http.NewRequest("GET", "/v1/operator/scheduler/config", body) + resp := httptest.NewRecorder() + obj, err := s.Server.OperatorSchedulerConfiguration(resp, req) + require.Nil(err) + require.Equal(200, resp.Code) + out, ok := obj.(api.SchedulerConfiguration) + require.True(ok) + require.False(out.EnablePreemption) + }) +} + +func TestOperator_SchedulerSetConfiguration(t *testing.T) { + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + require := require.New(t) + body := bytes.NewBuffer([]byte(`{"EnablePreemption": true}`)) + req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/config", body) + resp := httptest.NewRecorder() + _, err := s.Server.OperatorSchedulerConfiguration(resp, req) + require.Nil(err) + require.Equal(200, resp.Code) + + args := structs.GenericRequest{ + QueryOptions: structs.QueryOptions{ + Region: s.Config.Region, + }, + } + + var reply structs.SchedulerConfiguration + err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply) + require.Nil(err) + require.True(reply.EnablePreemption) + }) +} + +func TestOperator_SchedulerCASConfiguration(t *testing.T) { + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + require := require.New(t) + body := bytes.NewBuffer([]byte(`{"EnablePreemption": true}`)) + req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/config", body) + resp := httptest.NewRecorder() + _, err := s.Server.OperatorSchedulerConfiguration(resp, req) + require.Nil(err) + require.Equal(200, resp.Code) + + args := structs.GenericRequest{ + QueryOptions: structs.QueryOptions{ + Region: s.Config.Region, + }, + } + + var reply structs.SchedulerConfiguration + if err := s.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil { + t.Fatalf("err: %v", err) + } + require.True(reply.EnablePreemption) + + // Create a CAS request, bad index + { + buf := bytes.NewBuffer([]byte(`{"EnablePreemption": true}`)) + req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/config?cas=%d", reply.ModifyIndex-1), buf) + resp := httptest.NewRecorder() + obj, err := s.Server.OperatorSchedulerConfiguration(resp, req) + require.Nil(err) + require.False(obj.(bool)) + } + + // Create a CAS request, good index + { + buf := bytes.NewBuffer([]byte(`{"EnablePreemption": true}`)) + req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/config?cas=%d", reply.ModifyIndex), buf) + resp := httptest.NewRecorder() + obj, err := s.Server.OperatorSchedulerConfiguration(resp, req) + require.Nil(err) + require.True(obj.(bool)) + } + + // Verify the update + if err := s.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil { + t.Fatalf("err: %v", err) + } + require.True(reply.EnablePreemption) + }) +} diff --git a/nomad/fsm.go b/nomad/fsm.go index ba02bf063..bba9d0e72 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -247,6 +247,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyNodeEligibilityUpdate(buf[1:], log.Index) case structs.BatchNodeUpdateDrainRequestType: return n.applyBatchDrainUpdate(buf[1:], log.Index) + case structs.SchedulerConfigRequestType: + return n.applySchedulerConfigUpdate(buf[1:], log.Index) } // Check enterprise only message types. @@ -1833,6 +1835,23 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink, return nil } +func (n *nomadFSM) applySchedulerConfigUpdate(buf []byte, index uint64) interface{} { + var req structs.SchedulerSetConfigRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSince([]string{"nomad", "fsm", "scheduler-config"}, time.Now()) + + if req.CAS { + act, err := n.state.SchedulerCASConfig(index, req.Config.ModifyIndex, &req.Config) + if err != nil { + return err + } + return act + } + return n.state.SchedulerSetConfig(index, &req.Config) +} + // Release is a no-op, as we just need to GC the pointer // to the state store snapshot. There is nothing to explicitly // cleanup. diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index c8802bf9b..48f50271a 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2825,3 +2825,47 @@ func TestFSM_Autopilot(t *testing.T) { t.Fatalf("bad: %v", config.CleanupDeadServers) } } + +func TestFSM_SchedulerConfig(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + + require := require.New(t) + + // Set the autopilot config using a request. + req := structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + EnablePreemption: true, + }, + } + buf, err := structs.Encode(structs.SchedulerConfigRequestType, req) + require.Nil(err) + + resp := fsm.Apply(makeLog(buf)) + if _, ok := resp.(error); ok { + t.Fatalf("bad: %v", resp) + } + + // Verify key is set directly in the state store. + _, config, err := fsm.state.SchedulerConfig() + require.Nil(err) + + require.Equal(config.EnablePreemption, req.Config.EnablePreemption) + + // Now use CAS and provide an old index + req.CAS = true + req.Config.EnablePreemption = false + req.Config.ModifyIndex = config.ModifyIndex - 1 + buf, err = structs.Encode(structs.SchedulerConfigRequestType, req) + require.Nil(err) + + resp = fsm.Apply(makeLog(buf)) + if _, ok := resp.(error); ok { + t.Fatalf("bad: %v", resp) + } + + _, config, err = fsm.state.SchedulerConfig() + require.Nil(err) + // Verify that preemption is still enabled + require.True(config.EnablePreemption) +} diff --git a/nomad/leader.go b/nomad/leader.go index ef0e622cf..f7e8cd989 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -187,6 +187,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { s.getOrCreateAutopilotConfig() s.autopilot.Start() + // Initialize scheduler configuration + s.getOrCreateSchedulerConfig() + // Enable the plan queue, since we are now the leader s.planQueue.SetEnabled(true) @@ -1230,3 +1233,25 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig { return config } + +// getOrCreateSchedulerConfig is used to get the scheduler config, initializing it if necessary +func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration { + state := s.fsm.State() + _, config, err := state.SchedulerConfig() + if err != nil { + s.logger.Named("core").Error("failed to get scheduler config", "error", err) + return nil + } + if config != nil { + return config + } + + config = &structs.SchedulerConfiguration{EnablePreemption: false} + req := structs.SchedulerSetConfigRequest{Config: *config} + if _, _, err = s.raftApply(structs.SchedulerConfigRequestType, req); err != nil { + s.logger.Named("core").Error("failed to initialize config", "error", err) + return nil + } + + return config +} diff --git a/nomad/operator_endpoint.go b/nomad/operator_endpoint.go index 39615b3fa..a04f12843 100644 --- a/nomad/operator_endpoint.go +++ b/nomad/operator_endpoint.go @@ -284,3 +284,64 @@ func (op *Operator) ServerHealth(args *structs.GenericRequest, reply *autopilot. return nil } + +// SchedulerSetConfiguration is used to set the current Scheduler configuration. +func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRequest, reply *bool) error { + if done, err := op.srv.forward("Operator.SchedulerSetConfiguration", args, args, reply); done { + return err + } + + // This action requires operator write access. + rule, err := op.srv.ResolveToken(args.AuthToken) + if err != nil { + return err + } + if rule != nil && !rule.AllowOperatorWrite() { + return structs.ErrPermissionDenied + } + + // Apply the update + resp, _, err := op.srv.raftApply(structs.SchedulerConfigRequestType, args) + if err != nil { + op.logger.Error("failed applying Scheduler configuration", "error", err) + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + + // Check if the return type is a bool. + if respBool, ok := resp.(bool); ok { + *reply = respBool + } + return nil +} + +// SchedulerGetConfiguration is used to retrieve the current Scheduler configuration. +func (op *Operator) SchedulerGetConfiguration(args *structs.GenericRequest, reply *structs.SchedulerConfiguration) error { + if done, err := op.srv.forward("Operator.SchedulerGetConfiguration", args, args, reply); done { + return err + } + + // This action requires operator read access. + rule, err := op.srv.ResolveToken(args.AuthToken) + if err != nil { + return err + } + if rule != nil && !rule.AllowOperatorRead() { + return structs.ErrPermissionDenied + } + + state := op.srv.fsm.State() + _, config, err := state.SchedulerConfig() + if err != nil { + return err + } + if config == nil { + return fmt.Errorf("scheduler config not initialized yet") + } + + *reply = *config + + return nil +} diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 8b77de603..7c76c0359 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -44,6 +44,7 @@ func init() { aclPolicyTableSchema, aclTokenTableSchema, autopilotConfigTableSchema, + schedulerConfigTableSchema, }...) } @@ -599,3 +600,21 @@ func aclTokenTableSchema() *memdb.TableSchema { }, } } + +// schedulerConfigTableSchema returns the MemDB schema for the scheduler config table. +// This table is used to store configuration options for the scheduler +func schedulerConfigTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: "scheduler_config", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: true, + Unique: true, + Indexer: &memdb.ConditionalIndex{ + Conditional: func(obj interface{}) (bool, error) { return true, nil }, + }, + }, + }, + } +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 71db85d57..b228cfe10 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3991,3 +3991,81 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) { } } } + +// SchedulerConfig is used to get the current Scheduler configuration. +func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the scheduler config + c, err := tx.First("scheduler_config", "id") + if err != nil { + return 0, nil, fmt.Errorf("failed scheduler config lookup: %s", err) + } + + config, ok := c.(*structs.SchedulerConfiguration) + if !ok { + return 0, nil, nil + } + + return config.ModifyIndex, config, nil +} + +// SchedulerSetConfig is used to set the current Scheduler configuration. +func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerConfiguration) error { + tx := s.db.Txn(true) + defer tx.Abort() + + s.schedulerSetConfigTxn(idx, tx, config) + + tx.Commit() + return nil +} + +// SchedulerCASConfig is used to try updating the scheduler configuration with a +// given Raft index. If the CAS index specified is not equal to the last observed index +// for the config, then the call is a noop, +func (s *StateStore) SchedulerCASConfig(idx, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Check for an existing config + existing, err := tx.First("scheduler_config", "id") + if err != nil { + return false, fmt.Errorf("failed scheduler config lookup: %s", err) + } + + // If the existing index does not match the provided CAS + // index arg, then we shouldn't update anything and can safely + // return early here. + e, ok := existing.(*structs.SchedulerConfiguration) + if !ok || e.ModifyIndex != cidx { + return false, nil + } + + s.schedulerSetConfigTxn(idx, tx, config) + + tx.Commit() + return true, nil +} + +func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.SchedulerConfiguration) error { + // Check for an existing config + existing, err := tx.First("scheduler_config", "id") + if err != nil { + return fmt.Errorf("failed scheduler config lookup: %s", err) + } + + // Set the indexes. + if existing != nil { + config.CreateIndex = existing.(*structs.SchedulerConfiguration).CreateIndex + } else { + config.CreateIndex = idx + } + config.ModifyIndex = idx + + if err := tx.Insert("scheduler_config", config); err != nil { + return fmt.Errorf("failed updating scheduler config: %s", err) + } + return nil +} diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index ecd7f97d4..23d797916 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -119,3 +119,26 @@ type AutopilotConfig struct { CreateIndex uint64 ModifyIndex uint64 } + +type SchedulerConfiguration struct { + // EnablePreemption specifies whether to enable eviction of lower + // priority jobs to place higher priority jobs. + EnablePreemption bool + + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. + CreateIndex uint64 + ModifyIndex uint64 +} + +// SchedulerSetConfigRequest is used by the Operator endpoint to update the +// current Scheduler configuration of the cluster. +type SchedulerSetConfigRequest struct { + // Config is the new Scheduler configuration to use. + Config SchedulerConfiguration + + // CAS controls whether to use check-and-set semantics for this request. + CAS bool + + // WriteRequest holds the ACL token to go along with this request. + WriteRequest +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 868514cf9..b1c241dee 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -81,6 +81,7 @@ const ( AllocUpdateDesiredTransitionRequestType NodeUpdateEligibilityRequestType BatchNodeUpdateDrainRequestType + SchedulerConfigRequestType ) const (